使用异步编程来动手实现用户态的协程和管理

举报
码乐 发表于 2023/12/29 09:45:50 2023/12/29
【摘要】 1 写在前面有很多地方需要并发地实现访问或提供服务,可以使用到异步的方式编程,其中协程是一个流行的方式。在golang中 启动协程很容易,这里将在python中也实现类似的功能。channel 用于在协程直接通信,并且可以根据channle条件退出协程,一个缓冲channel定义如下chanNoSize = make(chan int) chanWithSize = ma...

1 写在前面

有很多地方需要并发地实现访问或提供服务,可以使用到异步的方式编程,其中协程是一个流行的方式。在golang中 启动协程很容易,这里将在python中也实现类似的功能。

channel 用于在协程直接通信,并且可以根据channle条件退出协程,一个缓冲channel定义如下

chanNoSize = make(chan int)
          
chanWithSize = make(chan int,2)

go的并发模型是一种被称之为CSP的类型,在CSP 模型中 chanel是第一类对象, 它不关注发送消息的实体。

而关注与发送消息使用的channel communicating sequential Process 简称CSP顺序通信进程,是一种用户态和系统级线程控制的混合编程模型。

csp可以被认为是一种形式语言,用于描述系统中的互动模式.

在go内部channel实现了以下几个功能:

     qcount 保存队列的项目/数据计数

     dataqsize 是循环队列大小。 这用于缓冲通道场景,也就是make的第二参数
     elemsize 是通道相对单个元素的大小
     buf 是我们使用缓冲通道时存储数据的实际循环队列
     closed 表示通道是否关闭。 

语法 close(chanWithSize) 默认为0,关闭时设置为1.

chanel 是被单独创建并且可以在进程之间传递。

通信模式类似于 boss-worker模式。 一个实体通过将消息发送到 channel
监听这个channel的实体处理时,两个实体之间是匿名的,

2 实现python版本的协程

而python这类线程 默认的开发方式为 线程内存同步方式.
我们可以使用一些内部函数,实现类型go的协程,稍微会复杂一些。

我们定义一个协程通信管理队列,类似于channel

    class Queuey():
        def __init__(self, maxsize):
            self.mutex = Lock()
            self.maxsize = maxsize
            self.items = list()
            self.getters = list()
            self.putters = list()

实现一个可冲入锁对象 self.mutex 一个锁对象是一个同步基元。
要创建一个锁 的方法是: -调用 threading.Lock()。 有以下方法。

  acquire() -- 锁定,可能会阻塞,直到获得锁。
  release() -- 解除对锁的锁定
  locked() -- 测试该锁是否被锁定

锁不属于锁定它的线程;另一个线程可以 解锁它。 如果一个线程试图锁定一个它已经锁定的锁 将会阻塞,直到另一个线程将其解锁。 死锁可能会随之产生。

acquire(blocking=True, timeout=-1) -> bool
(acquisition_lock()是一个弃用的同义词)

锁定该锁。 没有参数,如果该锁已经被 锁定(即使是被同一个线程),等待另一个线程释放 锁,一旦获得锁,则返回True。

有参数时,只有当参数为 True 时才会阻塞。

并且返回值反映了是否获得了锁。

阻塞操作是可中断的。

3 无阻塞放入和取出

先获取到锁, 如果结果队列items中不为空,那么就在待处理的 待返回对象中取出一个,唤醒一个 左部待添加的 fut 元素,并将其设置为 True, 返回队列 左部 一个元素,同时 错误信息返回None,表示没有错误(对应go的nil)

def get_noblock(self):
    with self.mutex:
        if self.items: 
            if self.putters:
                self.putters.pop(0).set_result(True)
            
            return self.items.pop(0), None
        else:
            fut = Future()
            self.getters.append(fut)
            return fut, None

先获取到锁, 如果结果队列items中 小于缓冲大小 maxsize,那么添加到item处理队列中,再进行 getters 操作,如果getters队列不为空,那么从items队列的左部获取一个值,并设置结果到getters左部。

如果结果队列已经达到最大缓冲大小,那么将待返回结果 future 添加到 putters 队列(如果结果队列有对象需要返回时将从这里取得并返回)。

   def put_noblock(self, item):
        with self.mutex:  
            if len(self.items) < self.maxsize:
                self.items.append(item) 
                if self.getters:
                    self.getters.pop(0).set_result(
                        self.items.pop(0)
                    )
            else:
                fut = Future()
                self.putters.append(fut)
                return fut

获取的任务返回状态默认是pending。在无阻塞的操作函数完成后,基于此,我们可以实现同步 或 异步地操作协程。

  • 同步存和同步取

         def get_sync(self):
              item, fut = self.get_noblock()
              if fut:
                  item = fut.result()
              return item
    
        def put_sync(self, item):
              while True:
                  fut = self.put_noblock(item)
                  if fut is None:
                      return
                  fut.result()
    
  • 异步取和存

     async def get_async(self):
                 item, fut = self.get_noblock()
                 if fut:
                     item = await wait_for(wrap_future(fut), None)
                 return item
    
     async def put_async(self, item):
         while True:
             fut = self.put_noblock(item)
             if fut is None:
                 return
             await wait_for(wrap_future(fut), None)
    

如果是以协程的方式操作函数,则使用协程。 如果是协程,使用 异步的方式 取值,否则使用同步的方式

    def get(self):
        if sys._getframe(2).f_code.co_flags & 0x380: 
            print(f"get async item")
            return self.get_async()
        else:  # 不是协程,使用同步的方式 取值
            print(f"get sync item:")

            return self.get_sync() if len(self.items) > 0 else None

如果是以协程的方式操作函数,则使用协程。 如果是协程,使用 异步的方式 取值,否则使用同步的方式

    def put(self, item):
        if sys._getframe(2).f_code.co_flags & 0x380:  
            print(f"put async item:", item)
            return self.put_async(item)
        else:   
            print(f"put sync item:", item)
            return self.put_sync(item)

判断一个函数是否异步和协程的方式,在python中可以使用一个内置的方法,

    sys._getframe(2).f_code.co_flags & 0x380

这使python知道函数需要被协程一样的使用, 一种进入函数的方式的判断,在py3中需要在 调用函数 之前加 await 关键字

sys._getframe 从调用栈中返回一个框架对象。 如果给定了可选的整数深度,则返回比堆栈顶部调用次数少的那个框架对象。

调用堆栈的顶部以下的框架对象。 如果这个深度超过了调用 栈,ValueError将被引发。 默认的深度是0,返回 调用堆栈顶部的框架。

返回的 CodeType 中代码标记如果 为 十进制 896, 这个函数应该被用于内部和专门的目的 仅用于内部和专门用途。

4 在函数 使用

最后在函数中使用这个类, 在 线程中执行 异步取值

async def aproducer(q, n):
    for i in range(n):
        
        await q.put(i)
    
    await q.put(None)

async def aconsumer(q):
     
    while True:
        
        item = await q.get() 
        if item is None:
            break
        print("Async Got:", item)

小结

我们可以使用 wait_for 实现类似的功能。 它等待单个Future或coroutine完成,有超时参数可以设置。

Coroutine将被包裹在Task中。 返回Future或coroutine的结果。 当超时发生时。 它取消任务并引发TimeoutError。
为了避免任务取消,将其包裹在shield()中。 如果等待被取消了,任务也被取消了。

这个函数是一个coroutine。

下一节,我们完成同步存和异步取,异步存和异步取,并对此函数的做示例,与go的实现做对比。

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。