python中异步任务的终止

举报
kwan的解忧杂货铺 发表于 2024/09/01 23:00:16 2024/09/01
【摘要】 一.需求说明 1.需求我有一个很耗时的异步任务,我希望能通过接口的方式终止这个异步任务,在 python 中如何实现呢? 2.实现方式可以使用 asyncio 相关的 api 进行实现创建并运行异步任务取消异步任务api 地址 3.asyncio 介绍asyncio 是 Python 标准库中的一个模块,用于编写单线程的并发代码。它使用 async/await 语法来编写异步代码,允许你以...

一.需求说明

1.需求

我有一个很耗时的异步任务,我希望能通过接口的方式终止这个异步任务,在 python 中如何实现呢?

2.实现方式

可以使用 asyncio 相关的 api 进行实现

  1. 创建并运行异步任务
  2. 取消异步任务
  3. api 地址

3.asyncio 介绍

asyncio 是 Python 标准库中的一个模块,用于编写单线程的并发代码。它使用 async/await 语法来编写异步代码,允许你以非阻塞的方式执行 I/O 操作,如网络请求、文件读写等。

以下是 asyncio 的一些关键特性:

  1. 事件循环(Event Loop)asyncio 使用事件循环来管理所有异步操作。事件循环是一个运行在后台的无限循环,它不断地检查是否有新的事件(如 I/O 完成、定时器到期等)需要处理。
  2. 协程(Coroutines):协程是 asyncio 中的并发单元,它们是使用 async def 定义的函数。协程在遇到 await 表达式时会暂停执行,直到等待的任务完成,然后继续执行。
  3. 任务(Tasks):任务是协程的执行单元。你可以使用 asyncio.create_task() 将协程包装成任务,然后将其放入事件循环中执行。
  4. FuturesFuture 是一个低级别的并发机制,它代表了一个尚未完成的操作。Future 可以被等待,当操作完成时,它会通知等待者。
  5. 同步原语asyncio 提供了同步原语,如锁(Lock)、事件(Event)、条件变量(Condition)和信号量(Semaphore),用于在异步代码中处理共享资源的同步。
  6. 流(Streams)asyncio 还提供了流(如 StreamReaderStreamWriter),它们用于处理网络通信中的二进制数据流。
  7. 子进程asyncio 允许你异步地创建和管理子进程。
  8. 超时和取消:你可以为异步操作设置超时,或者在必要时取消正在进行的操作。

二.实现步骤

1.异步终止任务

from typing import Any

from manager.graph.manager import GraphManager
from manager.schemas.manager import SchemasManager
from utils import logger
from fastapi import APIRouter
from manager.graph.query_graph import LocalQueryGraph
import time
import asyncio

GraphRouter = APIRouter(prefix="/test", tags=["测试管理"])
global task_map
task_map: dict[Any, Any] = {}

class GraphRouterMap:

    # 长时间运行的异步任务
    @staticmethod
    async def long_running_task():
        try:
            while True:
                # 模拟任务运行
                print("长时间运行的任务正在运行.....")
                await asyncio.sleep(2)
        except asyncio.CancelledError:
            print("任务被取消")

    @staticmethod
    @GraphRouter.get("/start", summary="启动任务", description="启动任务")
    async def start_task():
        # 启动长时间任务,并返回任务对象
        run_id = time.strftime("%Y%m%d-%H%M%S")
        task = asyncio.create_task(GraphRouterMap.long_running_task())
        task_map[run_id] = task
        return run_id

    @staticmethod
    @GraphRouter.get("/stop", summary="中止任务", description="中止任务")
    async def stop_task(run_id: str):
        task = task_map.get(run_id)
        task.cancel()
        return {"message": "任务取消请求已发送"}
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。