FastAPI 异步后台任务阻塞其他请求如何处理?
1写在前面
-
工作中遇到,有大佬做了解答,简单整理 -
阻塞的主要原因是 网络IO 密集型
和CPU 密集型
是两个不同的概念,ASGI
更多的是面向 网络/IO 密集型的非阻塞处理,不适用 CPU 密集型 -
理解不足小伙伴帮忙指正
对每个人而言,真正的职责只有一个:找到自我。然后在心中坚守其一生,全心全意,永不停息。所有其它的路都是不完整的,是人的逃避方式,是对大众理想的懦弱回归,是随波逐流,是对内心的恐惧 ——赫尔曼·黑塞《德米安》
在使用 FastAPI
做 web
服务的时候, 使用 BackgroundTasks
执行CPU密集型或者 IO 密集型任务,会阻塞当前 web 服务的所有接口。
@app.get('/face_recognition_activated')
async def face_recognition_activated(level : int,background_tasks: BackgroundTasks,token: bool = Depends(get_current_token)):
"""
@Time : 2023/10/20 03:28:11
@Author : liruilonger@gmail.com
@Version : 1.0
@Desc : 开始进行人脸识别
"""
# 提取人脸库数据
background_tasks.add_task(face_recognition, data = {
"dir_name":"A205_20237test4",
"class_code": "A0205",
"real_real_id": [2747,2745,345435]
})
return {"status": 200,"message": "人脸识别开始了 🚀🚀🚀🚀🚀" }
对应的后台任务
async def face_recognition(c_data):
"""
@Time : 2023/10/20 05:09:23
@Author : liruilonger@gmail.com
@Version : 1.0
@Desc : 人脸识别
"""
logging.info("开始人脸数据库特征提取!")
....................
pbar_i = tqdm(
range(0, len(face_list)),
desc="特征载入内存:👀👀👀",
mininterval=0.001,
smoothing=0.0001,
colour='green',
)
for idx in pbar_i:
face_id = face_list[idx]
face_id = face_id.decode('utf-8')
face_score_key = class_code_account_period +"_face_score"
faces = pkl.rc.zrange(face_score_key + ":" + face_id, 0, -1)
meta_dates = []
for hallmark_id in faces:
face_recognition_key = class_code_account_period + "_face_recognition"
face_r = pkl.rc.hget(
face_recognition_key + ":" + face_id, hallmark_id)
face_path_key = class_code_account_period + ':Path'
f_path = pkl.rc.hget(face_path_key, face_id)
meta_date = {"hallmark_id": hallmark_id.decode('utf-8'),
"face_r": pickle.loads(face_r),
"f_path": f_path.decode('utf-8'),
"mass": 500,
}
#logging.info(meta_date)
meta_dates.append(meta_date)
checks.append({"face_id": face_id, "meta_dates": meta_dates})
logging.info("构建内存人脸库完成")
# 开始人脸识别
logging.info("开始人脸识别..")
r_p = RedisClient(1)
logging.info("人脸识别后台任务启动......") #
consumer_task = asyncio.create_task(AdafaceFaceIdentification.consumer_facial_feature_clustering(global_object,checks,r_p,class_code_account_period,c_data))
await asyncio.gather(consumer_task)
对于这种情况,这是因为 对应的 后台任务被定义为 async
, 意味着 fastapi
会在 asyncio
事件循环中运行它。并且因为 对应后台任务的某一环节是同步的(即不等待某些 IO
或者是网络请求,而是进行计算)只要它正在运行,它就会阻塞事件循环。
这有在涉及异步IO
和网络操作
的情况下,asyncio
才不会阻塞,能够以非阻塞的方式运行,从而充分利用系统资源并提高应用程序的并发性能。
解决这个问题的几种方法:
-
使用更多的工人(例如 uvicorn main:app --workers 4
)。这将允许最多 4 个 后台任务 并行。 -
将任务重写为不是 async
(即将其定义为def task(data)
: ... 等)。然后 starlette 将在单独的线程中运行它。 -
使用 fastapi.concurrency.run_in_threadpool
,这也将在单独的线程中运行它。像这样:
from fastapi.concurrency import run_in_threadpool
async def task(data):
otherdata = await db.fetch("some sql")
newdata = await run_in_threadpool(lambda: somelongcomputation(data, otherdata))
await db.execute("some sql", newdata)
-
或者直接使用 asyncios
的run_in_executor
(其中run_in_threadpool
在后台使用):
import asyncio
async def task(data):
otherdata = await db.fetch("some sql")
loop = asyncio.get_running_loop()
newdata = await loop.run_in_executor(None, lambda: somelongcomputation(data, otherdata))
await db.execute("some sql", newdata)
-
自己生成一个单独的线程/进程。例如使用 concurrent.futures
-
使用更重的东西,如芹菜。 (也在 此处 的 fastapi 文档中提到)。
2博文部分内容参考
© 文中涉及参考链接内容版权归原作者所有,如有侵权请告知 :)
https://segmentfault.com/q/1010000043296883
https://stackoverflow.com/questions/63169865/how-to-do-multiprocessing-in-fastapi/63171013#63171013
© 2018-2023 liruilonger@gmail.com, All rights reserved. 保持署名-非商用-相同方式共享(CC BY-NC-SA 4.0)
- 点赞
- 收藏
- 关注作者
评论(0)