python使用celery-group优化大数据处理速度
【摘要】 celery-group的使用 配置为子任务配置独立的celery进程from kombu import Exchange, Queuedefault_exchange = Exchange('default', type='direct')concurrency_exchange = Exchange('concurrency', type='direct')app.conf.task_...
celery-group的使用
配置
-
为子任务配置独立的celery进程
from kombu import Exchange, Queue default_exchange = Exchange('default', type='direct') concurrency_exchange = Exchange('concurrency', type='direct') app.conf.task_queues = ( Queue('default', default_exchange, routing_key='default'), Queue('concurrency', concurrency_exchange, routing_key='concurrency'), ) app.conf.task_default_queue = 'default' app.conf.task_default_exchange = 'default' app.conf.task_default_routing_key = 'default'
使用
-
在异步任务中使用指定的进程,分割出多个子任务
from celery import group from celery.result import allow_join_result a = [1, 2, 3, 4] @app.task(bind=True, queue='concurrency') def add(a): if len(a) > 1: g1 = add2.s(a[0:int(len(a) / 2)]) g2 = add2.s(a[int(len(a) / 2):]) g = group([g1, g2]) res = g.apply_async() if result.ready(): # 是否执行完毕 if result.successful(): # 是否所有子任务都成功了 with allow_join_result(): r = res.get() # 获取所有值, 返回值为一个列表 [[1, 2], [3, 4]] @app.task(bind=True, queue='concurrency') def add2(a): return a
-
g.apply_async()支持操作
命令 说明 successful() 返回True如果全部顺利完成子任务(例如,没有提出一个例外)。 failed() True如果任何子任务失败,则返回。 waiting() True如果任何子任务尚未准备就绪,则返回。 ready() True如果所有子任务都准备就绪,则返回。 completed_count() 返回完成的子任务数。 revoke() 撤消所有子任务。 join() 收集所有子任务的结果,并以与调用它们相同的顺序(作为列表)返回它们。
需要注意的问题
异步数据量达到上限
-
celery-task的result默认类型为BLOB, 而blob类型最大能容纳65KB的数据, 异步任务的数据量过大时会导致异步任务报错
# 返回的数据自动被截取 Warning: (1265, "Data truncated for column 'result' at row 1") # 数据不完整导致解析报错 EOFError('Ran out of input',)
-
所以使用celery进程的时候尽量在独立的进程中处理完数据,不要让返回值超出范围
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)