python使用celery-group优化大数据处理速度

举报
时光不写 发表于 2022/02/22 19:37:57 2022/02/22
【摘要】 celery-group的使用 配置为子任务配置独立的celery进程from kombu import Exchange, Queuedefault_exchange = Exchange('default', type='direct')concurrency_exchange = Exchange('concurrency', type='direct')app.conf.task_...

celery-group的使用

配置

  • 为子任务配置独立的celery进程

    from kombu import Exchange, Queue
    default_exchange = Exchange('default', type='direct')
    concurrency_exchange = Exchange('concurrency', type='direct')
    
    app.conf.task_queues = (
        Queue('default', default_exchange, routing_key='default'),
        Queue('concurrency', concurrency_exchange, routing_key='concurrency'),
    )
    app.conf.task_default_queue = 'default'
    app.conf.task_default_exchange = 'default'
    app.conf.task_default_routing_key = 'default'
    

使用

  • 在异步任务中使用指定的进程,分割出多个子任务

    from celery import group
    from celery.result import allow_join_result
    a = [1, 2, 3, 4]
    
    @app.task(bind=True, queue='concurrency')
    def add(a):
        if len(a) > 1:
            g1 = add2.s(a[0:int(len(a) / 2)])
            g2 = add2.s(a[int(len(a) / 2):])
            g = group([g1, g2])
            res = g.apply_async()
            if result.ready():  # 是否执行完毕
                if result.successful():  # 是否所有子任务都成功了
                    with allow_join_result():
                        r = res.get()    # 获取所有值, 返回值为一个列表  [[1, 2], [3, 4]]
    
    
    @app.task(bind=True, queue='concurrency')
    def add2(a):
        return a
    
  • g.apply_async()支持操作

    命令 说明
    successful() 返回True如果全部顺利完成子任务(例如,没有提出一个例外)。
    failed() True如果任何子任务失败,则返回。
    waiting() True如果任何子任务尚未准备就绪,则返回。
    ready() True如果所有子任务都准备就绪,则返回。
    completed_count() 返回完成的子任务数。
    revoke() 撤消所有子任务。
    join() 收集所有子任务的结果,并以与调用它们相同的顺序(作为列表)返回它们。

需要注意的问题

异步数据量达到上限

  • celery-task的result默认类型为BLOB, 而blob类型最大能容纳65KB的数据, 异步任务的数据量过大时会导致异步任务报错

    # 返回的数据自动被截取
    Warning: (1265, "Data truncated for column 'result' at row 1")
    # 数据不完整导致解析报错
    EOFError('Ran out of input',)
    
  • 所以使用celery进程的时候尽量在独立的进程中处理完数据,不要让返回值超出范围


参考链接:python使用celery-group优化大数据处理速度

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。