FastAPI 异步后台任务阻塞其他请求如何处理?

举报
山河已无恙 发表于 2023/11/27 09:27:14 2023/11/27
【摘要】 1写在前面工作中遇到,有大佬做了解答,简单整理阻塞的主要原因是 网络IO 密集型和 CPU 密集型是两个不同的概念, ASGI 更多的是面向 网络/IO 密集型的非阻塞处理,不适用 CPU 密集型理解不足小伙伴帮忙指正 对每个人而言,真正的职责只有一个:找到自我。然后在心中坚守其一生,全心全意,永不停息。所有其它的路都是不完整的,是人的逃避方式,是对大众理想的懦弱回归,是随波逐流,是对内心的...

1写在前面


  • 工作中遇到,有大佬做了解答,简单整理
  • 阻塞的主要原因是 网络IO 密集型CPU 密集型是两个不同的概念, ASGI 更多的是面向 网络/IO 密集型的非阻塞处理,不适用 CPU 密集型
  • 理解不足小伙伴帮忙指正

对每个人而言,真正的职责只有一个:找到自我。然后在心中坚守其一生,全心全意,永不停息。所有其它的路都是不完整的,是人的逃避方式,是对大众理想的懦弱回归,是随波逐流,是对内心的恐惧 ——赫尔曼·黑塞《德米安》


在使用 FastAPIweb 服务的时候, 使用 BackgroundTasks 执行CPU密集型或者 IO 密集型任务,会阻塞当前 web 服务的所有接口。

@app.get('/face_recognition_activated')
async def   face_recognition_activated(level : int,background_tasks: BackgroundTasks,token: bool = Depends(get_current_token)):
    """
    @Time    :   2023/10/20 03:28:11
    @Author  :   liruilonger@gmail.com
    @Version :   1.0
    @Desc    :   开始进行人脸识别
    """
    # 提取人脸库数据
    
    background_tasks.add_task(face_recognition, data = {
            "dir_name":"A205_20237test4",
            "class_code": "A0205",
            "real_real_id": [2747,2745,345435]
            })
    return {"status": 200,"message": "人脸识别开始了 🚀🚀🚀🚀🚀" }

对应的后台任务

async def face_recognition(c_data):
    """
    @Time    :   2023/10/20 05:09:23
    @Author  :   liruilonger@gmail.com
    @Version :   1.0
    @Desc    :   人脸识别
    """
    logging.info("开始人脸数据库特征提取!")
     ....................
    pbar_i = tqdm(
        range(0, len(face_list)),
        desc="特征载入内存:👀👀👀",
        mininterval=0.001,
        smoothing=0.0001,
        colour='green',
    )
    for idx in pbar_i:
        face_id = face_list[idx]
        face_id = face_id.decode('utf-8')
        face_score_key = class_code_account_period +"_face_score" 
        faces = pkl.rc.zrange(face_score_key + ":" + face_id, 0, -1)
        meta_dates = []
        for hallmark_id in faces:
            face_recognition_key = class_code_account_period + "_face_recognition"
            face_r = pkl.rc.hget(
                face_recognition_key + ":" + face_id, hallmark_id)
            face_path_key = class_code_account_period + ':Path'
            f_path = pkl.rc.hget(face_path_key, face_id)
            
            meta_date = {"hallmark_id": hallmark_id.decode('utf-8'),
                         "face_r": pickle.loads(face_r),
                         "f_path": f_path.decode('utf-8'),
                         "mass": 500,
                         }
            #logging.info(meta_date)
            meta_dates.append(meta_date)
        checks.append({"face_id": face_id, "meta_dates": meta_dates})

    logging.info("构建内存人脸库完成")
    # 开始人脸识别
    logging.info("开始人脸识别..")
    r_p = RedisClient(1)
    logging.info("人脸识别后台任务启动......") # 
    consumer_task = asyncio.create_task(AdafaceFaceIdentification.consumer_facial_feature_clustering(global_object,checks,r_p,class_code_account_period,c_data))
    await asyncio.gather(consumer_task)

对于这种情况,这是因为 对应的 后台任务被定义为 async , 意味着 fastapi 会在 asyncio 事件循环中运行它。并且因为 对应后台任务的某一环节是同步的(即不等待某些 IO或者是网络请求,而是进行计算)只要它正在运行,它就会阻塞事件循环。

这有在涉及异步IO网络操作的情况下,asyncio 才不会阻塞,能够以非阻塞的方式运行,从而充分利用系统资源并提高应用程序的并发性能。

解决这个问题的几种方法:

  • 使用更多的工人(例如 uvicorn main:app --workers 4 )。这将允许最多 4 个 后台任务 并行。
  • 将任务重写为不是 async (即将其定义为 def task(data): ... 等)。然后 starlette 将在单独的线程中运行它。
  • 使用 fastapi.concurrency.run_in_threadpool ,这也将在单独的线程中运行它。像这样:
   from fastapi.concurrency import run_in_threadpool
  async def task(data):
      otherdata = await db.fetch("some sql")
      newdata = await run_in_threadpool(lambda: somelongcomputation(data, otherdata))
      await db.execute("some sql", newdata)
  • 或者直接使用 asynciosrun_in_executor (其中 run_in_threadpool 在后台使用):
 import asyncio
async def task(data):
    otherdata = await db.fetch("some sql")
    loop = asyncio.get_running_loop()
    newdata = await loop.run_in_executor(None, lambda: somelongcomputation(data, otherdata))
    await db.execute("some sql", newdata)

  • 自己生成一个单独的线程/进程。例如使用 concurrent.futures
  • 使用更重的东西,如芹菜。 (也在 此处 的 fastapi 文档中提到)。

2博文部分内容参考

© 文中涉及参考链接内容版权归原作者所有,如有侵权请告知 :)


https://segmentfault.com/q/1010000043296883

https://stackoverflow.com/questions/63169865/how-to-do-multiprocessing-in-fastapi/63171013#63171013


© 2018-2023 liruilonger@gmail.com, All rights reserved. 保持署名-非商用-相同方式共享(CC BY-NC-SA 4.0)

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。