Celery + Flower + FastAPI + RabbitMQ ,Python实现异步消息队列和监控
@Author:Runsen
消息队列
消息队列让应用程序在用户请求之外异步执行称为任务的工作。如果应用程序需要在后台执行工作,它会将任务添加到任务队列中。这些任务稍后由工作服务执行。
Celery
Celery 是一个分布式任务队列,可帮助在后台异步执行大量进程/消息并进行实时处理。芹菜由三个主要成分组成:
- Celery 客户端
- 消息代理
- Celery 工人
下图显示了组件之间如何交互的简化图。我们将使用 FastAPI 作为我们的 Celery 客户端和 RabbitMQ 作为消息代理。
-
将Celery 客户将运行FastAPI应用程序,并会发出消息/后台作业的RabbitMQ。
-
RabbitMQ 将作为消息代理来调解客户端和工作线程之间的消息。
-
RabbitMQ 收到客户端的消息后,会通过将消息发送给 celery worker 来启动客户端任务。
-
一个celery 工人被认为是将实现在任何Web服务器的请求的异步后台任务。可以有多个工人一次执行/完成许多任务。
-
celery 将确保每个 worker 一次只执行一个任务,并且每个任务只由一个 worker 分配。
FastAPI 是一个现代、快速(高性能)的 Web 框架,
安装 fastapi包
pip install fastapi
- 1
安装 celery 包
pip install celery
- 1
我们还需要安装 ASGI 服务器来运行我们的 FastAPI 应用程序。
pip install uvicorn
- 1
在我们的本地机器上运行 RabbitMQ 的最简单方法之一是使用 Docker。
Docker安装查看:https://docs.docker.com/get-docker/
运行以下命令即可通过终端中的 docker 启动 RabbitMQ 映像。
docker run -d --name some-rabbit -p 4369:4369 -p 5671:5671 -p 5672:5672 -p 15672:15672 rabbitmq:3
- 1
如果没有安装RabbitMQ 映像,会直接下载安装。
创建 Celery Worker 任务
现在,一旦消息代理RabbitMQ 运行,就可以创建Worker 程序。
这是因为 celery Worker 会侦听消息代理以执行排队的任务。
在这一部分,我们将为 celery worker 创建任务。创建一个文件celery_worker.py:
from time import sleep
from celery import Celery
from celery.utils.log import get_task_logger
#初始Celery
celery = Celery('tasks', broker='amqp://guest:guest@127.0.0.1:5672//')
#创建记录器-启用以在任务记录器上显示消息
celery_log = get_task_logger(__name__)
#创建订单-与芹菜异步运行
#长时间运行任务的示例流程
@celery.task
def create_order(name, quantity):
# 每1个订单5秒
complete_time_per_item = 5
# 根据订购的物品数量不断增加
sleep(complete_time_per_item * quantity)
celery_log.info(f"Order Complete!")
return {"message": f"Hi {name}, Your order has completed!",
"order_quantity": quantity}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
新建一个model.py
from pydantic import BaseModel
#请求主体的订单类模型
class Order(BaseModel):
customer_name: str
order_quantity: int
- 1
- 2
- 3
- 4
- 5
然后,创建一个名为main.py的新文件:
from fastapi import FastAPI
from celery_worker import create_order
from model import Order
# 创建FastAPI应用程序
app = FastAPI()
# 创建订单
@app.post('/order')
def add_order(order: Order):
# 使用delay=() 方法调用芹菜任务
create_order.delay(order.customer_name, order.order_quantity)
return {"message": "Order Received! Thank you for your patience."}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
运行 FastAPI 应用程序:
uvicorn main:app --reload
- 1
访问http://localhost:8000/docs
以查看 Swagger 文档中正在运行的 FastAPI。
celery -A celery_worker.celery worker --loglevel=info
- 1
可以通过插入请求正文输入来测试我们的端点。这是请求正文的示例输入:
单击“Execute”以从端点获取响应,将看到以下结果:
有没有什么有效的方法来监控后台任务?
我们可以使用Flower 监控工具来监控我们所有的Celery 工人。
pip install flower
- 1
然后,flower在我们的本地机器上运行服务器:
celery flower -A celery_worker.celery --broker:amqp://localhost//
- 1
访问http://localhost:5555/。
Flower 服务器并单击导航栏上的“Tasks”选项卡来监控我们的 celery 工人。
文章来源: maoli.blog.csdn.net,作者:刘润森!,版权归原作者所有,如需转载,请联系作者。
原文链接:maoli.blog.csdn.net/article/details/119960438
- 点赞
- 收藏
- 关注作者
评论(0)