Python Celery 库详解
Celery 是一个基于分布式消息传递的任务队列,用于异步处理任务。它可以与各种消息代理(如RabbitMQ、Redis等)配合使用,支持任务调度、消息传递等功能。本教程将介绍如何使用 Celery 库来创建和管理异步任务。
安装 Celery
首先,我们需要安装 Celery。你可以通过 pip 来安装 Celery:
bashCopy codepip install celery
创建 Celery 应用
在使用 Celery 之前,我们需要创建一个 Celery 应用。创建一个名为 celery_app.py
的文件,并在其中定义 Celery 应用:
pythonCopy codefrom celery import Celery
# 创建 Celery 应用实例
app = Celery('tasks', broker='redis://localhost:6379/0')
# 定义任务
@app.task
def add(x, y):
return x + y
在这个示例中,我们使用 Redis 作为消息代理,你也可以选择其他的消息代理,如 RabbitMQ。
定义任务
在 Celery 应用中,任务是通过装饰器 @app.task
来定义的。下面是一个简单的任务示例:
pythonCopy code@app.task
def add(x, y):
return x + y
这个任务接受两个参数 x
和 y
,并返回它们的和。
启动 Worker
要执行任务,我们需要启动 Celery worker。在命令行中执行以下命令:
bashCopy codecelery -A celery_app worker --loglevel=info
这将启动一个 Celery worker 来处理任务。
调用任务
在其他地方调用 Celery 任务非常简单。只需导入任务函数并调用它即可:
pythonCopy codefrom celery_app import add
result = add.delay(4, 5)
print(result.get())
在这个示例中,我们调用了之前定义的 add
任务,并传递了参数 4 和 5。delay()
方法用于将任务放入队列中,然后我们使用 get()
方法来获取任务的结果。
异步执行任务
Celery 的主要优势之一是它可以异步执行任务。这意味着任务可以在后台执行,而不会阻塞主程序的执行。下面是一个异步执行任务的示例:
pythonCopy codefrom celery_app import add
result = add.delay(4, 5)
# 执行其他操作
print("其他操作执行中...")
# 等待任务完成并获取结果
print(result.get())
在这个示例中,任务被放入队列后,程序可以继续执行其他操作,而不必等待任务完成。
监控任务状态
有时候,我们需要监控任务的状态,以便知道任务是成功完成、失败还是正在执行中。Celery 提供了状态监控的功能。下面是一个示例:
pythonCopy codefrom celery_app import add
result = add.delay(4, 5)
# 监控任务状态
while not result.ready():
print("任务正在执行中...")
# 可选:获取任务状态
print("任务状态:", result.status)
print("任务完成")
print("任务结果:", result.get())
在这个示例中,我们使用 result.ready()
方法来检查任务是否完成。如果任务完成,我们可以使用 result.get()
方法来获取任务的结果。
错误处理
当任务执行出错时,我们可以捕获异常并处理。下面是一个示例:
pythonCopy codefrom celery.exceptions import SoftTimeLimitExceeded
try:
result = add.delay(4, "invalid")
print(result.get())
except SoftTimeLimitExceeded as e:
print("任务执行超时:", e)
except Exception as e:
print("任务执行出错:", e)
在这个示例中,我们捕获了 SoftTimeLimitExceeded
异常和其他异常,并打印出错误消息。
结束 Worker
当不再需要 Celery worker 时,我们可以通过发送中断信号来结束它。在命令行中按下 Ctrl + C
即可结束 Celery worker。
监控任务状态
有时候,我们需要监控任务的状态,以便知道任务是成功完成、失败还是正在执行中。Celery 提供了状态监控的功能。下面是一个示例:
pythonCopy codefrom celery_app import add
result = add.delay(4, 5)
# 监控任务状态
while not result.ready():
print("任务正在执行中...")
# 可选:获取任务状态
print("任务状态:", result.status)
print("任务完成")
print("任务结果:", result.get())
在这个示例中,我们使用 result.ready()
方法来检查任务是否完成。如果任务完成,我们可以使用 result.get()
方法来获取任务的结果。
错误处理
当任务执行出错时,我们可以捕获异常并处理。下面是一个示例:
pythonCopy codefrom celery.exceptions import SoftTimeLimitExceeded
try:
result = add.delay(4, "invalid")
print(result.get())
except SoftTimeLimitExceeded as e:
print("任务执行超时:", e)
except Exception as e:
print("任务执行出错:", e)
在这个示例中,我们捕获了 SoftTimeLimitExceeded
异常和其他异常,并打印出错误消息。
结束 Worker
当不再需要 Celery worker 时,我们可以通过发送中断信号来结束它。在命令行中按下 Ctrl + C
即可结束 Celery worker。
任务结果处理
Celery 支持异步执行任务,并在任务执行完成后返回结果。你可以对任务结果进行处理,比如存储到数据库、发送通知等。下面是一个示例:
pythonCopy codefrom celery_app import add
result = add.delay(4, 5)
# 可选:等待任务完成
result.wait()
# 处理任务结果
if result.successful():
print("任务成功完成")
print("任务结果:", result.result)
# 在这里可以将结果存储到数据库或发送通知等
else:
print("任务执行失败")
print("任务异常:", result.result)
在这个示例中,我们使用 result.successful()
方法来检查任务是否成功完成,并根据结果进行相应的处理。
设置任务超时
有时候,任务可能会因为某些原因长时间执行而导致超时。为了避免任务执行时间过长,可以设置任务的超时时间。下面是一个示例:
pythonCopy codefrom celery.exceptions import SoftTimeLimitExceeded
try:
result = add.apply_async((4, 5), soft_time_limit=10)
print(result.get())
except SoftTimeLimitExceeded as e:
print("任务执行超时:", e)
except Exception as e:
print("任务执行出错:", e)
在这个示例中,我们使用 apply_async()
方法来启动任务,并通过 soft_time_limit
参数设置任务的软超时时间为 10 秒。如果任务在指定的时间内未完成,则会抛出 SoftTimeLimitExceeded
异常。
高级特性
除了上述介绍的基本功能外,Celery 还提供了许多高级特性,如定时任务、任务重试、任务链、分布式任务等。你可以根据实际需求选择使用这些特性。以下是一些高级特性的简单介绍:
- 定时任务:Celery 支持定时执行任务,可以使用
@app.task
装饰器的eta
参数或apply_async()
方法的eta
参数来设置任务的执行时间。 - 任务重试:Celery 允许你在任务执行失败时自动重试任务。你可以使用
@app.task
装饰器的retry
参数来配置任务的重试策略。 - 任务链:Celery 允许你将多个任务组合成一个任务链,其中一个任务的输出作为下一个任务的输入。你可以使用
chain
或group
组合任务。 - 分布式任务:Celery 支持分布式任务,可以将任务分发到多台计算机上执行,从而提高任务执行的效率和并发性。
- 点赞
- 收藏
- 关注作者
评论(0)