Celery + Flower + FastAPI + RabbitMQ ,Python实现异步消息队列和监控

毛利 发表于 2021/08/28 23:52:31 2021/08/28
【摘要】 @Author:Runsen 消息队列 消息队列让应用程序在用户请求之外异步执行称为任务的工作。如果应用程序需要在后台执行工作,它会将任务添加到任务队列中。这些任务稍后由工作服务执行。 Celery...

@Author:Runsen

消息队列

消息队列让应用程序在用户请求之外异步执行称为任务的工作。如果应用程序需要在后台执行工作,它会将任务添加到任务队列中。这些任务稍后由工作服务执行。

Celery

Celery 是一个分布式任务队列,可帮助在后台异步执行大量进程/消息并进行实时处理。芹菜由三个主要成分组成:

  • Celery 客户端
  • 消息代理
  • Celery 工人

下图显示了组件之间如何交互的简化图。我们将使用 FastAPI 作为我们的 Celery 客户端和 RabbitMQ 作为消息代理。

  • 将Celery 客户将运行FastAPI应用程序,并会发出消息/后台作业的RabbitMQ。

  • RabbitMQ 将作为消息代理来调解客户端和工作线程之间的消息。

  • RabbitMQ 收到客户端的消息后,会通过将消息发送给 celery worker 来启动客户端任务。

  • 一个celery 工人被认为是将实现在任何Web服务器的请求的异步后台任务。可以有多个工人一次执行/完成许多任务。

  • celery 将确保每个 worker 一次只执行一个任务,并且每个任务只由一个 worker 分配。

FastAPI 是一个现代、快速(高性能)的 Web 框架,

安装 fastapi包

pip install fastapi

  
 
  • 1

安装 celery 包

pip install celery

  
 
  • 1

我们还需要安装 ASGI 服务器来运行我们的 FastAPI 应用程序。

pip install uvicorn

  
 
  • 1

在我们的本地机器上运行 RabbitMQ 的最简单方法之一是使用 Docker。

Docker安装查看:https://docs.docker.com/get-docker/

运行以下命令即可通过终端中的 docker 启动 RabbitMQ 映像。

docker run -d --name some-rabbit -p 4369:4369 -p 5671:5671 -p 5672:5672 -p 15672:15672 rabbitmq:3

  
 
  • 1

如果没有安装RabbitMQ 映像,会直接下载安装。

创建 Celery Worker 任务

现在,一旦消息代理RabbitMQ 运行,就可以创建Worker 程序。

这是因为 celery Worker 会侦听消息代理以执行排队的任务。

在这一部分,我们将为 celery worker 创建任务。创建一个文件celery_worker.py:

from time import sleep
from celery import Celery
from celery.utils.log import get_task_logger
#初始Celery
celery = Celery('tasks', broker='amqp://guest:guest@127.0.0.1:5672//')
#创建记录器-启用以在任务记录器上显示消息
celery_log = get_task_logger(__name__)
#创建订单-与芹菜异步运行
#长时间运行任务的示例流程
@celery.task
def create_order(name, quantity):
    # 每1个订单5秒
    complete_time_per_item = 5
    # 根据订购的物品数量不断增加
    sleep(complete_time_per_item * quantity)
    
    celery_log.info(f"Order Complete!")
    return {"message": f"Hi {name}, Your order has completed!",
            "order_quantity": quantity}

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

新建一个model.py

from pydantic import BaseModel
#请求主体的订单类模型
class Order(BaseModel):
    customer_name: str
    order_quantity: int

  
 
  • 1
  • 2
  • 3
  • 4
  • 5

然后,创建一个名为main.py的新文件:

from fastapi import FastAPI
from celery_worker import create_order
from model import Order
# 创建FastAPI应用程序
app = FastAPI()
# 创建订单
@app.post('/order')
def add_order(order: Order):
    # 使用delay=() 方法调用芹菜任务
    create_order.delay(order.customer_name, order.order_quantity)
    return {"message": "Order Received! Thank you for your patience."}

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

运行 FastAPI 应用程序:

uvicorn main:app --reload

  
 
  • 1

访问http://localhost:8000/docs

以查看 Swagger 文档中正在运行的 FastAPI。

celery -A celery_worker.celery worker --loglevel=info

  
 
  • 1

可以通过插入请求正文输入来测试我们的端点。这是请求正文的示例输入:

单击“Execute”以从端点获取响应,将看到以下结果:

有没有什么有效的方法来监控后台任务?

我们可以使用Flower 监控工具来监控我们所有的Celery 工人。

pip install flower

  
 
  • 1

然后,flower在我们的本地机器上运行服务器:

celery flower -A celery_worker.celery --broker:amqp://localhost//

  
 
  • 1

访问http://localhost:5555/。

Flower 服务器并单击导航栏上的“Tasks”选项卡来监控我们的 celery 工人。

文章来源: maoli.blog.csdn.net,作者:刘润森!,版权归原作者所有,如需转载,请联系作者。

原文链接:maoli.blog.csdn.net/article/details/119960438

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件至:cloudbbs@huaweicloud.com进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容。
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。