异步编程:流式处理队列
简介
处理流式任务是现代互联网的重要任务,比如队列,比如internet的视频流数据。 这里简单讨论如何处理这些任务,有哪些工具可用。
1 队列处理
在本文档中 “协程” 可用来表示两个紧密关联的概念:
协程函数: 定义形式为 async def 的函数;
协程对象: 调用 协程函数 所返回的对象。
asyncio 也支持旧式的 基于生成器的 协程。
1.1 首先看一个:线程添加和取出
from queue import Queue
>>> a = Queue()
生成者进程
>>> Thread(target=producer, args=(a, 10)).start()
消费者进程
>>> Thread(target=consumer, args=(a,)).start()
当执行任务较多的时候,使用异步可以提前返回一些计算内容,其他执行时间较长的任务可以在其他进程继续执行。
2 asyncio 异步io库
2.0 概念
python中可等待对象包括以下几类:
-
协程 async/await
-
任务 task
asyncio.create_task() #封装协程为一个任务 该协程会被自动调度执行 -
Future
Future是一种特殊的低层级 可等待对象,表示一个异步操作的最终结果。
当一个Future对象被等待,意味着协程将保持直到该Future对象 在其他地方完成操作。
在asyncio 需要Future对象以便允许通过async/await 使用基于回调代码。
通常没有必要在应用层 创建 Future对象。
Future对象有时会 由库和某些asyncio API暴露给用户 用作可等对象。
2.0.1 协程与任务
https://docs.python.org/zh-cn/3/library/asyncio-task.html#coroutine
- asyncio.run 运行 asyncio程序
执行 coroutine coro 并返回结果。
此函数 将运行传入的 协程,负责管理asyncio事件循环。
终结异步 生成器,并关闭线程池。
当有其他 asyncio 事件循环 在同一线程时,函数不能被调用
debug为True时,事件循环将以调式模式运行
此函数 总是创建一个新的事件循环并在结束时关闭。
它应该被当作asyncio程序主入口,理想情况只被调用一次。
- task创建任务 asyncio.create_task(coro, *, name=None)
将coro协程封装为 Task并调度其执行,返回Task对象。
name不为None时,它将使用Task.set_name() 设为任务名称。
该任务在 get_running_loop() 返回的循环中执行。
如果当前线程没有在运行的循环则引发RuntimeError。
-
3.7 以上的才支持,<3.7的版本需要 底层的 asyncio.ensure_future()函数。
task = asyncio.create_task(coro())
等效
task = asyncio.ensure_future(coro())
休眠
coroutine asyncio.sleep(delay, result=None)
阻塞 delay 指定秒数
如果指定了 result,则当协程完成时将其返回给调用者。
sleep() 总是会挂起当前任务,以允许其他任务运行。
将delay 设为0 将提供一个 经优化的路径,以运行其他任务运行。
这可供长期运行的函数使用,以避免在函数调用全过程中阻塞事件 循环。
并行运行任务
awaitable asyncio.gather(*aws, return_exception=False)
并发运行 aws序列中的可等待对象。
如果aws的某个可等待对象为协程,它将自动被作为一个任务的调度。
如果所有可等待对象都成功完成,结果将是一个由所有返回值聚合的列表,顺序与 aws可等待对象一致。
如果return_exceptions 为Fasle,所引发的异常将立即传播给等待gather()任务。
aws序列中其他可等待对象 不会被取消并继续运行。
如果aws 序列中的任一task 或 Future对象被取消,它将被引发CancelledError 一样处理。
在此情况下gather调用 不会被取消。
这是为了防止一个已提交的 Task/Future被取消导致其他Tasks/Future也被取消
如果 return_exception=False 则在gather()被标记已完成时,取消它不会取消 任何已提交的可等待对象。
如: 在一个异常传播给调用者之后,gather可标记为已完成,
因此, 在从gather捕获一个(由可等待对象所引发的)异常之后调用gather.cancel() 将不会 取消任何其他可等待对象。
- 3.10 版本 已取消该参数 return_exception。
如果未提供位置参数或者并非所有位置参数均为 Future 类对象并且没有正在运行的事件循环则会发出弃用警告。
屏蔽取消操作
asyncio.shield(aw)
保护一个可等待对象 防止其被取消,如果aw是一个协程,它将自动被作为任务调度。
res = await shield(something())
相当于
res = await something()
不同之处在于如果 保护它的协程被取消,在something() 运行中的任务不会被取消。从something()的角度看来。
取消操作并没有发生,然而其调用这已被取消,因此,await表达式仍然会引发 CancelledError。
如果 通过其他方式 取消something() 如内部操作,shield也将被取消。
如果希望 完全忽略 取消操作,不推荐,则shield() 函数需要一个 try/except代码段
try:
res = await shiedl(something())
except CancelledError:
res = None
- 3.10 已删除,如果await 不是Future 类对象,并且没有正在运行的事件循环,将发生弃用警告。
超时
coroutine asyncio.wait_for(aw, timeout)
等待 aw 可等待对象完成,指定 timeout 秒数后超时。
如果aw是一个协程,自动被作为任务调度。
timeout可以为None,也可以为 float 或 int 型整数表示的等待秒数。
如果为None,则必须到完成为止。
如果超时,任务将取消并引发 asyncio.TimeoutError。
要避免任务 取消,可以假设 shield()。
-
简单等待
coroutine asyncio.wait(aws, *, timeout=None, return_when=ALL_COMPLETED)
并发运行aws 可迭代对象中的可等待对象 并进入阻塞直到满足 return_when所指定条件。
aws 可迭代对象不可为空
返回 Task/Future集合 (done, pending)
此函数不会引发 asyncio.TimeoutError 当超时发生时,未完成的Future 或 Task 将在指定秒数后被返回。
与 wait_for() 不同,wait() 在超时发生时不会取消可等待对象。
wait() 会自动以任务的形式调度协程,之后将以 (done, pending) 集合形式返回显式创建的任务对象。
正确写法
async def foo():
return 42
task = asyncio.create_task(foo())
done, pending = await asyncio.wait({task})
if task in done:
print("done")
完成处理
as_completed(aws,*,timeout=None)
并发地运行aws 可迭代对象的可等待对象,返回一个协程迭代器。
返回的每个协程 可被等待以从剩余可等待对象的 可迭代对象中获取最早的结果。
如果所有Future对象完成前发生超时则引发asyncio.TimeoutError。
for coro in as_completed(aws):
earliest_result = await coro
在线程运行
coroutine asyncio.to_thread(func, /, *args, **kwargs)
不同的线程中异步运行函数 func。
向此函数提供的任何 *args, ** kwargs 会被直接传给func。
并且,当前 contextvars.Context将被传播,允许在不同线程访问来自事件循环的上下文变量。
返回一个可等待以获取 func 的最终结果的协程,主要用于执行在其他情况下会阻塞事件循环的IO密集型函数 方法。
-
跨线程调度
asyncio.run_coroutine_threadsafe(coro, loop)
向指定事件循环 提交一个线程。 线程安全。
返回一个 concurrent.futures.Future以等待来自其他OS线程的结果。
此函数应该从另一个OS线程调用,非事件循环运行所在线程。
-
内省
返回当前运行的Task实例,如果没有正在运行的任务则返回None。
如果loop未None则使用 get_running_loop()获取当前事件循环。asyncio.current_task(loop=None)
返回事件循环所在运行的未完成Task对象集合。
如果loop 为Nong,则使用get_running_loop()获取当前事件。
asyncio.all_tasks(loop=None)
- Task对象
asyncio.Task()
一个与Future类似 的对象,可运行python协程,非线程安全
-
取消task
async def cancel_me(): print('cancel_me(): before sleep') try: # Wait for 1 hour await asyncio.sleep(5) except asyncio.CancelledError: print('cancel_me(): cancel sleep') raise finally: print('cancel_me(): after sleep') async def main9(): # Create a "cancel_me" Task task = asyncio.create_task(cancel_me()) # Wait for 1 second await asyncio.sleep(1) task.cancel() try: await task except asyncio.CancelledError: print("main(): cancel_me is cancelled now") finally: print(task.cancelled())
-
基于生成器 的协程
基于生成器的协程是 async/await语法前身,它们是使用 yuield from 创建的
使用 @asyncio.coroutine装饰
2.0.2 流
- Stream 函数
下面的高级 asyncio 函数可以用来创建和处理流:
coroutine asyncio.open_connection(host=None, port=None, *, limit=None, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None, happy_eyeballs_delay=None, interleave=None)
建立网络连接并返回一对 (reader, writer) 对象。
返回的 reader 和 writer 对象是 StreamReader 和 StreamWriter 类的实例。
coroutine asyncio.start_server(client_connected_cb, host=None, port=None, *, limit=None, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, ssl_handshake_timeout=None, start_serving=True)
启动套接字服务。
当一个新的客户端连接被建立时,回调函数 client_connected_cb 会被调用。
该函数会接收到一对参数 (reader, writer) ,reader是类 StreamReader 的实例,而writer是类 StreamWriter 的实例。
-
Unix 套接字
coroutine asyncio.open_unix_connection(path=None, *, limit=None, ssl=None, sock=None, server_hostname=None, ssl_handshake_timeout=None)
建立一个 Unix 套接字连接并返回 (reader, writer) 这对返回值。
与 open_connection() 相似,但是是在 Unix 套接字上的操作。
请看文档 loop.create_unix_connection().
在 3.10 版更改: Removed the loop parameter.
coroutine asyncio.start_unix_server(client_connected_cb, path=None, *, limit=None, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, start_serving=True)
启动一个 Unix 套接字服务。
StreamReader
class asyncio.StreamReader
这个类表示一个读取器对象,该对象提供api以便于从IO流中读取数据。
不推荐直接实例化 StreamReader 对象,建议使用 open_connection() 和 start_server() 来获取 StreamReader 实例。
coroutine read(n=- 1)
至多读取 n 个byte。 如果没有设置 n , 则自动置为 -1 , -1时表示读至 EOF 并返回所有读取的byte。
如果读到EOF,且内部缓冲区为空,则返回一个空的 bytes 对象。
coroutine readline()
读取一行,其中“行”指的是以 \n 结尾的字节序列。
如果读到EOF而没有找到 \n ,该方法返回部分读取的数据。
如果读到EOF,且内部缓冲区为空,则返回一个空的 bytes 对象。
StreamWriter
class asyncio.StreamWriter
这个类表示一个写入器对象,该对象提供api以便于写数据至IO流中。
不建议直接实例化 StreamWriter;而应改用 open_connection() 和 start_server()。
write(data)
此方法会尝试立即将 data 写入到下层的套接字。 如果写入失败,数据会被排入内部写缓冲队列直到可以被发送。
此方法应当与 drain() 方法一起使用:
stream.write(data)
await stream.drain()
writelines(data)
此方法会立即尝试将一个字节串列表(或任何可迭代对象)写入到下层的套接字。
如果写入失败,数据会被排入内部写缓冲队列直到可以被发送。
此方法应当与 drain() 方法一起使用:
stream.writelines(lines)
await stream.drain()
close()
此方法会关闭流以及下层的套接字。
此方法应与 wait_closed() 方法一起使用:
stream.close()
await stream.wait_closed()
注册一个打开的套接字以等待使用流的数据。
使用低层级协议以及 loop.create_connection() 方法的 注册一个打开的套接字以等待使用协议的数据 示例。
https://docs.python.org/zh-cn/3/library/asyncio-protocol.html#asyncio-example-create-connection
使用低层级的 loop.add_reader() 方法来监视文件描述符的 监视文件描述符以读取事件 示例。
使用 open_connection() 函数实现等待直到套接字接收到数据的协程:
import asyncio
import socket
async def wait_for_data():
# Get a reference to the current event loop because
# we want to access low-level APIs.
loop = asyncio.get_running_loop()
# Create a pair of connected sockets.
rsock, wsock = socket.socketpair()
# Register the open socket to wait for data.
reader, writer = await asyncio.open_connection(sock=rsock)
# Simulate the reception of data from the network
loop.call_soon(wsock.send, 'abc'.encode())
# Wait for data
data = await reader.read(100)
# Got data, we are done: close the socket
print("Received:", data.decode())
writer.close()
# Close the second socket
wsock.close()
3 小结
本节介绍python的异步支持,异步包引入较晚,因此有部分功能仍在变动中。
比如在3.10支持PEGs 规范的解析器之后,取消了loop :Removed the loop parameter. 并且在await如果对象不可等待时,将返回错误。
下一节我们继续这个话题。
- 点赞
- 收藏
- 关注作者
评论(0)