异步编程:流式处理队列

举报
码乐 发表于 2023/12/23 14:48:14 2023/12/23
【摘要】 简介处理流式任务是现代互联网的重要任务,比如队列,比如internet的视频流数据。 这里简单讨论如何处理这些任务,有哪些工具可用。 1 队列处理在本文档中 “协程” 可用来表示两个紧密关联的概念:协程函数: 定义形式为 async def 的函数;协程对象: 调用 协程函数 所返回的对象。asyncio 也支持旧式的 基于生成器的 协程。 1.1 首先看一个:线程添加和取出from qu...

简介

处理流式任务是现代互联网的重要任务,比如队列,比如internet的视频流数据。 这里简单讨论如何处理这些任务,有哪些工具可用。

1 队列处理

在本文档中 “协程” 可用来表示两个紧密关联的概念:

协程函数: 定义形式为 async def 的函数;

协程对象: 调用 协程函数 所返回的对象。

asyncio 也支持旧式的 基于生成器的 协程。

1.1 首先看一个:线程添加和取出

from queue import Queue
>>> a = Queue()

生成者进程

>>> Thread(target=producer, args=(a, 10)).start()  

消费者进程

>>> Thread(target=consumer, args=(a,)).start()   

当执行任务较多的时候,使用异步可以提前返回一些计算内容,其他执行时间较长的任务可以在其他进程继续执行。

2 asyncio 异步io库

2.0 概念

python中可等待对象包括以下几类:

  • 协程 async/await

  • 任务 task
    asyncio.create_task() #封装协程为一个任务 该协程会被自动调度执行

  • Future

Future是一种特殊的低层级 可等待对象,表示一个异步操作的最终结果。

当一个Future对象被等待,意味着协程将保持直到该Future对象 在其他地方完成操作。

在asyncio 需要Future对象以便允许通过async/await 使用基于回调代码。

通常没有必要在应用层 创建 Future对象。

Future对象有时会 由库和某些asyncio API暴露给用户 用作可等对象。

2.0.1 协程与任务

https://docs.python.org/zh-cn/3/library/asyncio-task.html#coroutine

  • asyncio.run 运行 asyncio程序

执行 coroutine coro 并返回结果。

此函数 将运行传入的 协程,负责管理asyncio事件循环。
终结异步 生成器,并关闭线程池。

当有其他 asyncio 事件循环 在同一线程时,函数不能被调用
debug为True时,事件循环将以调式模式运行

此函数 总是创建一个新的事件循环并在结束时关闭。
它应该被当作asyncio程序主入口,理想情况只被调用一次。

  • task创建任务 asyncio.create_task(coro, *, name=None)

将coro协程封装为 Task并调度其执行,返回Task对象。

name不为None时,它将使用Task.set_name() 设为任务名称。

该任务在 get_running_loop() 返回的循环中执行。

如果当前线程没有在运行的循环则引发RuntimeError。

  • 3.7 以上的才支持,<3.7的版本需要 底层的 asyncio.ensure_future()函数。

    task = asyncio.create_task(coro())
    

等效

                     task = asyncio.ensure_future(coro())

休眠

    	coroutine asyncio.sleep(delay, result=None)

阻塞 delay 指定秒数

如果指定了 result,则当协程完成时将其返回给调用者。
sleep() 总是会挂起当前任务,以允许其他任务运行。

将delay 设为0 将提供一个 经优化的路径,以运行其他任务运行。
这可供长期运行的函数使用,以避免在函数调用全过程中阻塞事件 循环。

并行运行任务

    	awaitable asyncio.gather(*aws, return_exception=False)

并发运行 aws序列中的可等待对象。

如果aws的某个可等待对象为协程,它将自动被作为一个任务的调度。

如果所有可等待对象都成功完成,结果将是一个由所有返回值聚合的列表,顺序与 aws可等待对象一致。

如果return_exceptions 为Fasle,所引发的异常将立即传播给等待gather()任务。

aws序列中其他可等待对象 不会被取消并继续运行。

如果aws 序列中的任一task 或 Future对象被取消,它将被引发CancelledError 一样处理。

在此情况下gather调用 不会被取消。

这是为了防止一个已提交的 Task/Future被取消导致其他Tasks/Future也被取消

如果 return_exception=False 则在gather()被标记已完成时,取消它不会取消 任何已提交的可等待对象。

如: 在一个异常传播给调用者之后,gather可标记为已完成,

因此, 在从gather捕获一个(由可等待对象所引发的)异常之后调用gather.cancel() 将不会 取消任何其他可等待对象。

  • 3.10 版本 已取消该参数 return_exception。

如果未提供位置参数或者并非所有位置参数均为 Future 类对象并且没有正在运行的事件循环则会发出弃用警告。

屏蔽取消操作

    	asyncio.shield(aw)

保护一个可等待对象 防止其被取消,如果aw是一个协程,它将自动被作为任务调度。

    		res = await shield(something())

相当于

    		res = await something()

不同之处在于如果 保护它的协程被取消,在something() 运行中的任务不会被取消。从something()的角度看来。

取消操作并没有发生,然而其调用这已被取消,因此,await表达式仍然会引发 CancelledError。

如果 通过其他方式 取消something() 如内部操作,shield也将被取消。

如果希望 完全忽略 取消操作,不推荐,则shield() 函数需要一个 try/except代码段

    		try:
    			res = await shiedl(something())
    		except CancelledError:
    		    res = None
  • 3.10 已删除,如果await 不是Future 类对象,并且没有正在运行的事件循环,将发生弃用警告。

超时

    	coroutine asyncio.wait_for(aw, timeout)

等待 aw 可等待对象完成,指定 timeout 秒数后超时。

如果aw是一个协程,自动被作为任务调度。

timeout可以为None,也可以为 float 或 int 型整数表示的等待秒数。

如果为None,则必须到完成为止。
如果超时,任务将取消并引发 asyncio.TimeoutError。

要避免任务 取消,可以假设 shield()。

  • 简单等待

      	coroutine asyncio.wait(aws, *, timeout=None, return_when=ALL_COMPLETED)
    

并发运行aws 可迭代对象中的可等待对象 并进入阻塞直到满足 return_when所指定条件。

aws 可迭代对象不可为空

返回 Task/Future集合 (done, pending)

此函数不会引发 asyncio.TimeoutError 当超时发生时,未完成的Future 或 Task 将在指定秒数后被返回。

与 wait_for() 不同,wait() 在超时发生时不会取消可等待对象。

wait() 会自动以任务的形式调度协程,之后将以 (done, pending) 集合形式返回显式创建的任务对象。

正确写法

   async def foo():
	return 42


    task = asyncio.create_task(foo())
	done, pending = await asyncio.wait({task})

if task in done:
	print("done")

完成处理

 as_completed(aws,*,timeout=None)

并发地运行aws 可迭代对象的可等待对象,返回一个协程迭代器。

返回的每个协程 可被等待以从剩余可等待对象的 可迭代对象中获取最早的结果。

如果所有Future对象完成前发生超时则引发asyncio.TimeoutError。

    	for  coro in as_completed(aws):
    	     earliest_result = await coro

在线程运行

    	coroutine asyncio.to_thread(func, /, *args, **kwargs)

不同的线程中异步运行函数 func。

向此函数提供的任何 *args, ** kwargs 会被直接传给func。

并且,当前 contextvars.Context将被传播,允许在不同线程访问来自事件循环的上下文变量。

返回一个可等待以获取 func 的最终结果的协程,主要用于执行在其他情况下会阻塞事件循环的IO密集型函数 方法。

  • 跨线程调度

      	asyncio.run_coroutine_threadsafe(coro, loop)
    

向指定事件循环 提交一个线程。 线程安全。

返回一个 concurrent.futures.Future以等待来自其他OS线程的结果。

此函数应该从另一个OS线程调用,非事件循环运行所在线程。

  • 内省
    返回当前运行的Task实例,如果没有正在运行的任务则返回None。
    如果loop未None则使用 get_running_loop()获取当前事件循环。

    asyncio.current_task(loop=None)

返回事件循环所在运行的未完成Task对象集合。
如果loop 为Nong,则使用get_running_loop()获取当前事件。

       	asyncio.all_tasks(loop=None)
  • Task对象
    asyncio.Task()

一个与Future类似 的对象,可运行python协程,非线程安全

  • 取消task

     async def cancel_me():
         print('cancel_me(): before sleep')
    
     try:
         # Wait for 1 hour
         await asyncio.sleep(5)
     except asyncio.CancelledError:
         print('cancel_me(): cancel sleep')
         raise
     finally:
         print('cancel_me(): after sleep')
    
     async def main9():
         # Create a "cancel_me" Task
         task = asyncio.create_task(cancel_me())
    
         # Wait for 1 second
         await asyncio.sleep(1)
    
         task.cancel()
         try:
             await task
         except asyncio.CancelledError:
             print("main(): cancel_me is cancelled now")
         finally:
             print(task.cancelled())
    
  • 基于生成器 的协程

    基于生成器的协程是 async/await语法前身,它们是使用 yuield from 创建的

    使用 @asyncio.coroutine装饰

2.0.2 流

  • Stream 函数

下面的高级 asyncio 函数可以用来创建和处理流:

	coroutine asyncio.open_connection(host=None, port=None, *, limit=None, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None, happy_eyeballs_delay=None, interleave=None)

建立网络连接并返回一对 (reader, writer) 对象。

返回的 reader 和 writer 对象是 StreamReader 和 StreamWriter 类的实例。

    coroutine asyncio.start_server(client_connected_cb, host=None, port=None, *, limit=None, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, ssl_handshake_timeout=None, start_serving=True)

启动套接字服务。
当一个新的客户端连接被建立时,回调函数 client_connected_cb 会被调用。

该函数会接收到一对参数 (reader, writer) ,reader是类 StreamReader 的实例,而writer是类 StreamWriter 的实例。

  • Unix 套接字

     coroutine asyncio.open_unix_connection(path=None, *, limit=None, ssl=None, sock=None, server_hostname=None, ssl_handshake_timeout=None)
    

建立一个 Unix 套接字连接并返回 (reader, writer) 这对返回值。

与 open_connection() 相似,但是是在 Unix 套接字上的操作。

请看文档 loop.create_unix_connection().

在 3.10 版更改: Removed the loop parameter.

	coroutine asyncio.start_unix_server(client_connected_cb, path=None, *, limit=None, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, start_serving=True)

启动一个 Unix 套接字服务。

StreamReader
	class asyncio.StreamReader

这个类表示一个读取器对象,该对象提供api以便于从IO流中读取数据。

不推荐直接实例化 StreamReader 对象,建议使用 open_connection() 和 start_server() 来获取 StreamReader 实例。

	coroutine read(n=- 1)

至多读取 n 个byte。 如果没有设置 n , 则自动置为 -1 , -1时表示读至 EOF 并返回所有读取的byte。

如果读到EOF,且内部缓冲区为空,则返回一个空的 bytes 对象。

	coroutine readline()

读取一行,其中“行”指的是以 \n 结尾的字节序列。

如果读到EOF而没有找到 \n ,该方法返回部分读取的数据。

如果读到EOF,且内部缓冲区为空,则返回一个空的 bytes 对象。

StreamWriter
	class asyncio.StreamWriter

这个类表示一个写入器对象,该对象提供api以便于写数据至IO流中。

不建议直接实例化 StreamWriter;而应改用 open_connection() 和 start_server()。

	write(data)

此方法会尝试立即将 data 写入到下层的套接字。 如果写入失败,数据会被排入内部写缓冲队列直到可以被发送。

此方法应当与 drain() 方法一起使用:

	stream.write(data)
	await stream.drain()
	writelines(data)

此方法会立即尝试将一个字节串列表(或任何可迭代对象)写入到下层的套接字。

如果写入失败,数据会被排入内部写缓冲队列直到可以被发送。

此方法应当与 drain() 方法一起使用:

	stream.writelines(lines)
	await stream.drain()
	close()

此方法会关闭流以及下层的套接字。

此方法应与 wait_closed() 方法一起使用:

	stream.close()
	await stream.wait_closed()

注册一个打开的套接字以等待使用流的数据。

使用低层级协议以及 loop.create_connection() 方法的 注册一个打开的套接字以等待使用协议的数据 示例。

	https://docs.python.org/zh-cn/3/library/asyncio-protocol.html#asyncio-example-create-connection

使用低层级的 loop.add_reader() 方法来监视文件描述符的 监视文件描述符以读取事件 示例。

使用 open_connection() 函数实现等待直到套接字接收到数据的协程:

	import asyncio
	import socket

	async def wait_for_data():
	    # Get a reference to the current event loop because
	    # we want to access low-level APIs.
	    loop = asyncio.get_running_loop()

	    # Create a pair of connected sockets.
	    rsock, wsock = socket.socketpair()

	    # Register the open socket to wait for data.
	    reader, writer = await asyncio.open_connection(sock=rsock)

	    # Simulate the reception of data from the network
	    loop.call_soon(wsock.send, 'abc'.encode())

	    # Wait for data
	    data = await reader.read(100)

	    # Got data, we are done: close the socket
	    print("Received:", data.decode())
	    writer.close()

	    # Close the second socket
	    wsock.close()

3 小结

本节介绍python的异步支持,异步包引入较晚,因此有部分功能仍在变动中。

比如在3.10支持PEGs 规范的解析器之后,取消了loop :Removed the loop parameter. 并且在await如果对象不可等待时,将返回错误。

下一节我们继续这个话题。

【版权声明】本文为华为云社区用户原创内容,未经允许不得转载,如需转载请自行联系原作者进行授权。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。