Python 异步功能入门

举报
Yuchuan 发表于 2021/12/29 20:54:55 2021/12/29
【摘要】 本文为您提供了开始使异步编程技术成为您的技能的一部分所需的工具。使用 Python 异步功能可让您以编程方式控制何时发生上下文切换。这意味着您在线程编程中可能会看到的许多更棘手的问题更容易处理。

目录

你听说过 Python 中的异步编程吗?您是否想了解更多有关 Python 异步功能以及如何在工作中使用它们的信息?也许您甚至尝试过编写线程程序并遇到一些问题。如果您想了解如何使用 Python 异步功能,那么您来对地方了。

在本文中,您将了解:

  • 什么是同步程序
  • 什么的异步程序
  • 为什么要编写异步程序
  • 如何使用 Python 异步功能

理解异步编程

同步程序被执行一次一个步骤。即使有条件分支、循环和函数调用,您仍然可以从一次执行一个步骤的角度考虑代码。每一步完成后,程序就会进入下一个步骤。

以下是两个以这种方式工作的程序示例:

  • 批处理程序通常创建为同步程序。您获得一些输入,对其进行处理并创建一些输出。步骤一个接一个,直到程序达到所需的输出。程序只需要注意步骤和顺序。

  • 命令行程序是在终端中运行的小型快速进程。这些脚本用于创建某些内容、将一件事转换为另一件事、生成报告或列出一些数据。这可以表示为一系列程序步骤,这些步骤按顺序执行,直到程序完成。

一个异步程序的行为不同。它仍然一次执行一个步骤。不同之处在于,系统可能不会等待执行步骤完成后再继续执行下一个步骤。

这意味着即使上一步尚未完成并且仍在其他地方运行,程序仍将继续执行未来的执行步骤。这也意味着程序知道在上一步完成运行时该做什么。

为什么要以这种方式编写程序?本文的其余部分将帮助您回答这个问题,并为您提供优雅地解决有趣的异步问题所需的工具。

构建同步 Web 服务器

Web 服务器的基本工作单元或多或少与批处理相同。服务器将获取一些输入、处理它并创建输出。编写为同步程序,这将创建一个可工作的 Web 服务器。

它也将是一个绝对糟糕的网络服务器。

为什么?在这种情况下,一个工作单元(输入、处理、输出)并不是唯一的目的。真正的目的是尽快处理数百甚至数千个工作单元。这可能会持续很长时间,甚至可能同时到达多个工作单元。

同步网络服务器可以做得更好吗?当然,您可以优化执行步骤,以便尽快处理所有进入的工作。不幸的是,这种方法存在局限性。结果可能是 Web 服务器响应速度不够快,无法处理足够的工作,甚至在工作堆积时超时。

注意:如果您尝试优化上述方法,您可能会看到其他限制。其中包括网络速度、文件 IO 速度、数据库查询速度以及其他连接服务的速度,仅举几例。这些都有一个共同点,都是IO函数。所有这些项目都比 CPU 的处理速度慢几个数量级。

在同步程序中,如果一个执行步骤启动了一个数据库查询,那么 CPU 基本上是空闲的,直到数据库查询返回。对于面向批处理的程序,大多数时候这不是优先事项。处理该 IO 操作的结果是目标。通常,这可能比 IO 操作本身花费更长的时间。任何优化工作都将集中在处理工作上,而不是 IO。

异步编程技术允许您的程序通过释放 CPU 来做其他工作来利用相对较慢的 IO 进程。

对编程的不同思考

当您开始尝试理解异步编程时,您可能会看到很多关于阻塞或编写非阻塞代码重要性的讨论。(就我个人而言,我很难从我询问的人和我阅读的文档中很好地掌握这些概念。)

什么是非阻塞代码?就此而言,什么是阻塞代码?这些问题的答案会帮助您编写更好的 Web 服务器吗?如果是这样,你怎么能做到?让我们一探究竟吧!

编写异步程序要求您对编程有不同的看法。虽然这种新的思维方式可能很难让你理解,但它也是一个有趣的练习。那是因为现实世界几乎完全是异步的,您与之交互的方式也是如此。

想象一下:你是一位试图同时做几件事的父母。你必须平衡支票簿,洗衣服,并留意孩子们。不知何故,您甚至可以不假思索地同时完成所有这些事情!让我们分解一下:

  • 平衡支票簿是一项同步任务。一步一个脚印,直到完成。你自己做所有的工作。

  • 但是,您可以脱离支票簿去洗衣服。您卸下干衣机,将衣服从洗衣机移到干衣机,然后在洗衣机中开始另一次装载。

  • 使用洗衣机和烘干机是一项同步任务,但大部分工作发生洗衣机和烘干机启动之后。一旦你让他们开始,你就可以走开并回到支票簿任务。此时,洗衣机和烘干机的任务已经变得异步了。洗衣机和烘干机将独立运行,直到蜂鸣器响起(通知您任务需要注意)。

  • 照看孩子是另一项异步任务。一旦他们设置好并开始演奏,他们就可以在大多数情况下独立进行。当有人需要关注时,这种情况就会发生变化,比如有人饿了或受伤了。当您的一个孩子惊慌失措时,您会做出反应。孩子们是一项具有高优先级的长期任务。观看它们会取代您可能正在做的任何其他任务,例如支票簿或洗衣。

这些示例有助于说明阻塞和非阻塞代码的概念。让我们从编程的角度考虑这个问题。在这个例子中,你就像 CPU。当你搬动衣服时,你(CPU)很忙,无法做其他工作,比如平衡支票簿。但这没关系,因为任务相对较快。

另一方面,启动洗衣机和烘干机不会妨碍您执行其他任务。这是一个异步函数,因为您不必等待它完成。一旦开始,你就可以回到其他事情上。这称为上下文切换:您正在做的事情的上下文发生了变化,当洗衣任务完成时,机器的蜂鸣器将在未来某个时间通知您。

作为一个人,这就是你一直工作的方式。您自然而然地同时处理多种事情,通常不假思索。作为开发人员,诀窍是如何将这种行为转换为执行相同操作的代码。

Programming Parents: Not as Easy as It Looks!

如果您在上面的示例中认出了自己(或您的父母),那就太好了!您在理解异步编程方面取得了长足的进步。同样,您可以很容易地在竞争任务之间切换上下文,选择一些任务并恢复其他任务。现在您将尝试将此行为编程为虚拟父母!

思想实验#1:The Synchronous Parent

您将如何创建一个父程序以完全同步的方式完成上述任务?由于看孩子是一项高优先级的任务,也许您的程序会做到这一点。父母一边看着孩子,一边等待可能需要他们注意的事情发生。但是,在这种情况下,没有其他事情(例如支票簿或洗衣店)可以完成。

现在,您可以按照自己的意愿重新排列任务的优先级,但在任何给定时间都只会发生其中一项。这是同步、循序渐进的方法的结果。就像上面描述的同步 Web 服务器一样,这可以工作,但它可能不是最好的生活方式。在孩子们睡着之前,父母将无法完成任何其他任务。所有其他任务都会在之后进行,一直持续到深夜。(几周后,许多真正的父母可能会跳出窗外!)

思想实验#2:The Polling Parent

如果您使用polling,那么您可以更改内容以便完成多个任务。在这种方法中,父级会定期脱离当前任务并检查是否还有其他任务需要注意。

让我们将轮询间隔设为十五分钟。现在,您的父母每十五分钟检查一次洗衣机、烘干机或孩子是否需要任何关注。如果没有,那么父母可以回去处理支票簿。但是,如果这些任务中的任何一个确实需要注意,那么父母会在回到支票簿之前处理它。这个循环一直持续到轮询循环的下一次超时。

这种方法也很有效,因为多项任务受到关注。但是,有几个问题:

  1. 家长可能会花很多时间检查不需要注意的东西:洗衣机和烘干机还没有完成,孩子们不需要任何注意,除非发生了意外。

  2. 父级可能会错过需要注意的已完成任务:例如,如果洗衣机在轮询间隔开始时完成了它的循环,那么它在长达 15 分钟内都不会得到任何关注!更重要的是,照看孩子被认为是最优先的任务。当事情可能发生严重错误时,他们无法忍受十五分钟的无人注意。

您可以通过缩短轮询间隔来解决这些问题,但现在您的父级(CPU)将花费更多时间在任务之间进行上下文切换。这是您开始达到收益递减点的时候。(再一次,像这样生活了几个星期,嗯……见之前关于窗户和跳跃的评论。)

思想实验#3:The Threading Parent

“要是我能克隆自己就好了……” 如果你是父母,那你可能也有类似的想法!由于您正在对虚拟父母进行编程,因此您基本上可以通过使用线程来做到这一点。这是一种允许一个程序的多个部分同时运行的机制。独立运行的每一段代码称为一个线程,所有线程共享相同的内存空间。

如果您将每个任务视为一个程序的一部分,那么您可以将它们分开并作为线程运行。换句话说,您可以“克隆”父对象,为每项任务创建一个实例:看孩子、监视洗衣机、监视烘干机和平衡支票簿。所有这些“克隆”都是独立运行的。

这听起来是一个不错的解决方案,但这里也存在一些问题。一是您必须明确告诉每个父实例在您的程序中做什么。这可能会导致一些问题,因为所有实例共享程序空间中的所有内容。

例如,假设家长 A 正在监控烘干机。家长 A 看到衣服已经干了,便控制了烘干机并开始卸下衣服。同时,B家长看到洗衣机洗好了,便控制了洗衣机,开始脱衣服。但是,家长 B 还需要控制烘干机,以便他们可以将湿衣服放入里面。这不可能发生,因为家长 A 目前控制着烘干机。

不一会,A家长就卸完衣服了。现在他们想要控制洗衣机并开始将衣服放入空的烘干机中。这也不可能发生,因为家长 B 目前控制着洗衣机!

这两个父母现在僵持不下。两者都控制自己的资源希望控制其他资源。他们将永远等待另一个父实例释放控制权。作为程序员,您必须编写代码来解决这种情况。

注意:线程程序允许您创建多个共享相同内存空间的并行执行路径。这既是优点也是缺点。线程之间共享的任何内存都受制于一个或多个线程试图同时使用相同的共享内存。这可能会导致数据损坏、以无效状态读取的数据以及通常只是杂乱无章的数据。

在线程编程中,上下文切换发生在系统控制下,而不是程序员。系统控制何时切换上下文以及何时让线程访问共享数据,从而更改内存使用方式的上下文。所有这些类型的问题都可以在线程代码中管理,但是很难正确处理,并且在错误时很难调试。

这是线程可能引起的另一个问题。假设一个孩子受伤了,需要紧急治疗。家长C被分配了看管孩子的任务,所以他们马上带着孩子。在紧急护理中,家长 C 需要开一张相当大的支票来支付看病的费用。

与此同时,家长 D 正在家里处理支票簿。他们不知道有这么大的支票在写,所以当家庭支票账户突然透支时,他们非常惊讶!

请记住,这两个父实例在同一个程序中工作。家庭支票帐户是共享资源,因此您必须想办法让看孩子的父母通知支票簿余额的父母。否则,您需要提供某种锁定机制,以便支票簿资源一次只能由一个父母使用,并进行更新。

在实践中使用 Python 异步功能

现在,您将采用上述思想实验中概述的一些方法,并将它们转换为可运行的 Python 程序。

您可能还想设置一个Python 虚拟环境来运行代码,这样您就不会干扰您的系统 Python。

同步编程

第一个示例显示了一种让任务从队列中检索工作并处理该工作的有点人为的方法。Python 中的队列是一个很好的FIFO先进先出)数据结构。它提供了将事物放入队列并按照插入顺序再次取出它们的方法。

在这种情况下,工作是从队列中获取一个数字,并让循环计数达到该数字。它在循环开始时打印到控制台,并再次输出总数。该程序演示了多个同步任务处理队列中工作的一种方法。

example_1.py下面完整列出了存储库中命名的程序:

 1import queue
 2
 3def task(name, work_queue):
 4    if work_queue.empty():
 5        print(f"Task {name} nothing to do")
 6    else:
 7        while not work_queue.empty():
 8            count = work_queue.get()
 9            total = 0
10            print(f"Task {name} running")
11            for x in range(count):
12                total += 1
13            print(f"Task {name} total: {total}")
14
15def main():
16    """
17    This is the main entry point for the program
18    """
19    # Create the queue of work
20    work_queue = queue.Queue()
21
22    # Put some work in the queue
23    for work in [15, 10, 5, 2]:
24        work_queue.put(work)
25
26    # Create some synchronous tasks
27    tasks = [(task, "One", work_queue), (task, "Two", work_queue)]
28
29    # Run the tasks
30    for t, n, q in tasks:
31        t(n, q)
32
33if __name__ == "__main__":
34    main()

让我们来看看每一行的作用:

  • 第 1 行导入queue模块。这是程序存储任务要完成的工作的地方。
  • 第 3 到 13 行定义了task(). 此功能将工作拉出work_queue并处理工作,直到没有更多工作要做。
  • 第 15 行定义main()运行程序任务。
  • 第 20 行创建了work_queue. 所有任务都使用此共享资源来检索工作。
  • 第 23 到 24行将工作放入work_queue. 在这种情况下,它只是要处理的任务的随机值计数。
  • 第 27 行创建了一个任务元组列表,其中包含这些任务将被传递的参数值。
  • 第 30 到 31 行迭代任务元组列表,调用每个元组并传递先前定义的参数值。
  • 第 34 行调用main()运行程序。

这个程序中的任务只是一个接受字符串和队列作为参数的函数。执行时,它会查找队列中的任何内容以进行处理。如果有工作要做,那么它将值从队列中拉出,开始一个for循环以计算到该值,并在最后输出总数。它继续从队列中取出工作,直到没有任何剩余并退出。

当这个程序运行时,它会产生你在下面看到的输出:

Task One running
Task One total: 15
Task One running
Task One total: 10
Task One running
Task One total: 5
Task One running
Task One total: 2
Task Two nothing to do

这表明它可以Task One完成所有工作。该while循环Task One内命中task()消耗所有的队列和处理它的工作。当该循环退出时,Task Two有机会运行。但是,它发现队列是空的,所以Task Two打印了一条语句,说它没有任何事情可做,然后退出。没有什么代码同时允许Task OneTask Two切换背景和工作在一起。

简单的协作并发

该程序的下一个版本允许这两个任务一起工作。添加yield语句意味着循环将在指定点产生控制权,同时仍保持其上下文。这样,yield 任务可以稍后重新启动。

yield语句变成task()了一个生成器。生成器函数就像 Python 中的任何其他函数一样被调用,但是当yield执行语句时,控制权会返回给函数的调用者。这本质上是一个上下文切换,因为控制从生成器函数转移到调用者。

有趣的是,可以通过调用生成器将控制权交还给生成器函数next()。这是一个返回到生成器函数的上下文切换,它使用在仍然完整的之前定义的所有函数变量来执行yield

while环路main()利用了这一点时,它调用next(t)。此语句会在任务之前产生的位置重新启动任务。所有这一切意味着您在上下文切换发生时处于控制之中:当yield语句在task().

这是协作多任务处理的一种形式。该程序正在放弃对其当前上下文的控制,以便其他内容可以运行。在这种情况下,它允许while循环main()运行task()作为生成器函数的两个实例。每个实例消耗来自同一个队列的工作。这有点聪明,但要获得与第一个程序相同的结果也需要做很多工作。该程序example_2.py演示了这种简单的并发性,如下所示:

 1import queue
 2
 3def task(name, queue):
 4    while not queue.empty():
 5        count = queue.get()
 6        total = 0
 7        print(f"Task {name} running")
 8        for x in range(count):
 9            total += 1
10            yield
11        print(f"Task {name} total: {total}")
12
13def main():
14    """
15    This is the main entry point for the program
16    """
17    # Create the queue of work
18    work_queue = queue.Queue()
19
20    # Put some work in the queue
21    for work in [15, 10, 5, 2]:
22        work_queue.put(work)
23
24    # Create some tasks
25    tasks = [task("One", work_queue), task("Two", work_queue)]
26
27    # Run the tasks
28    done = False
29    while not done:
30        for t in tasks:
31            try:
32                next(t)
33            except StopIteration:
34                tasks.remove(t)
35            if len(tasks) == 0:
36                done = True
37
38if __name__ == "__main__":
39    main()

这是上面代码中发生的事情:

  • 第 3 行到第 11 行定义task()同前,但第yield10 行的添加将函数变成了生成器。这是进行上下文切换并将控制权交还给 中的while循环的地方main()
  • 第 25 行创建了任务列表,但与您在前面的示例代码中看到的方式略有不同。在这种情况下,每个任务都使用其在tasks列表变量中输入的参数进行调用。这是task()第一次运行生成器函数所必需的。
  • 第 31 到 36 行是对while循环的修改main(),允许task()协同运行。这是控制返回到每个实例的地方,task()当它让步时,允许循环继续并运行另一个任务。
  • 第 32行将控制权交还给task(),并在yield调用点之后继续执行。
  • 第 36 行设置done变量。while当所有任务都完成并从 中删除时,循环结束tasks

这是运行此程序时产生的输出:

Task One running
Task Two running
Task Two total: 10
Task Two running
Task One total: 15
Task One running
Task Two total: 5
Task One total: 2

您可以看到Task OneTask Two都在运行并消耗队列中的工作。这就是预期的目的,因为两个任务都在处理工作,每个任务负责队列中的两个项目。这很有趣,但同样,要实现这些结果需要做很多工作。

这里的技巧是使用yield语句,它变成task()一个生成器并执行上下文切换。程序使用这个上下文切换来控制 中的while循环main(),允许一个任务的两个实例协同运行。

请注意如何Task Two首先输出其总数。这可能会让您认为任务是异步运行的。但是,这仍然是一个同步程序。它的结构使这两个任务可以来回交换上下文。首先Task Two输出它的总数的原因是它只计数到 10,而Task One计数到 15。Task Two只是首先到达它的总数,所以它可以在 之前将其输出打印到控制台Task One

注意:此后的所有示例代码都使用一个名为codetiming的模块来计时并输出代码段执行所需的时间。有一个伟大的文章,这里就RealPython是进入有关codetiming模块以及如何使用它的深度。

该模块是 Python Package Index 的一部分,由Real Python团队成员Geir Arne Hjelle构建。Geir Arne 对我审阅和建议本文的内容有很大帮助。如果您正在编写需要包含计时功能的代码,Geir Arne 的 codetiming 模块非常值得一看。

要使代码定时模块可用于后面的示例,您需要安装它。这可以pip使用以下命令完成:pip install codetiming,或使用以下命令:pip install -r requirements.txt。该requirements.txt文件是示例代码存储库的一部分。

具有阻塞调用的协作并发

程序的下一个版本与上一个版本相同,只是time.sleep(delay)在任务循环的主体中添加了 a 。这会根据从工作队列中检索到的值向任务循环的每次迭代添加延迟。延迟模拟任务中发生的阻塞调用的效果。

阻塞调用是在一段时间内阻止 CPU 执行任何其他操作的代码。在上面的思想实验中,如果父母在支票簿完成之前无法摆脱平衡,那将是一个阻塞调用。

time.sleep(delay) 在这个例子中做同样的事情,因为 CPU 不能做任何其他事情,只能等待延迟到期。

 1import time
 2import queue
 3from codetiming import Timer
 4
 5def task(name, queue):
 6    timer = Timer(text=f"Task {name} elapsed time: {{:.1f}}")
 7    while not queue.empty():
 8        delay = queue.get()
 9        print(f"Task {name} running")
10        timer.start()
11        time.sleep(delay)
12        timer.stop()
13        yield
14
15def main():
16    """
17    This is the main entry point for the program
18    """
19    # Create the queue of work
20    work_queue = queue.Queue()
21
22    # Put some work in the queue
23    for work in [15, 10, 5, 2]:
24        work_queue.put(work)
25
26    tasks = [task("One", work_queue), task("Two", work_queue)]
27
28    # Run the tasks
29    done = False
30    with Timer(text="\nTotal elapsed time: {:.1f}"):
31        while not done:
32            for t in tasks:
33                try:
34                    next(t)
35                except StopIteration:
36                    tasks.remove(t)
37                if len(tasks) == 0:
38                    done = True
39
40if __name__ == "__main__":
41    main()

下面是上面代码的不同之处:

  • 第 1 行导入time模块以允许程序访问time.sleep().
  • 第 3 行Timercodetiming模块中导入代码。
  • 第 6 行创建了Timer用于测量任务循环每次迭代所用时间的实例。
  • 第 10 行启动timer实例
  • 第 11 行更改task()为包含time.sleep(delay)模拟 IO 延迟的 a。这取代了for在 中进行计数的循环example_1.py
  • 第 12 行停止timer实例并输出自timer.start()调用以来经过的时间。
  • 第 30 行创建了一个Timer上下文管理器,它将输出整个 while 循环执行所花费的时间。

运行此程序时,您将看到以下输出:

Task One running
Task One elapsed time: 15.0
Task Two running
Task Two elapsed time: 10.0
Task One running
Task One elapsed time: 5.0
Task Two running
Task Two elapsed time: 2.0

Total elapsed time: 32.0

和以前一样,Task OneTask Two都在运行,消耗队列中的工作并处理它。然而,即使增加了延迟,您也可以看到协作并发并没有给您带来任何好处。延迟会停止整个程序的处理,CPU只是等待IO延迟结束。

这正是 Python 异步文档中阻塞代码的含义。您会注意到运行整个程序所花费的时间只是所有延迟的累积时间。以这种方式运行任务并不是一种胜利。

具有非阻塞调用的协作并发

该程序的下一个版本已进行了相当多的修改。它使用Python 3 中提供的asyncio/await来使用 Python 异步功能。

timequeue模块已被替换的asyncio包。这使您的程序可以访问异步友好(非阻塞)睡眠和队列功能。通过在第 4 行task()添加async前缀将其定义为异步。这向 Python 表明该函数将是异步的。

另一个重大变化是删除time.sleep(delay)andyield语句,并将它们替换为await asyncio.sleep(delay). 这会创建一个非阻塞延迟,它将执行上下文切换回调用者main()

while里面的循环main()不再存在。取而代之的是task_array对 的调用await asyncio.gather(...)。这说明了asyncio两件事:

  1. 创建两个任务task()并开始运行它们。
  2. 等待这两个完成后再继续。

程序的最后一行asyncio.run(main())运行main()。这会创建所谓的事件循环)。正是这个循环将运行main(),它将依次运行 的两个实例task()

事件循环是 Python 异步系统的核心。它运行所有代码,包括main(). 当任务代码正在执行时,CPU 正忙于工作。当到达await关键字时,发生上下文切换,并且控制权返回到事件循环。事件循环查看等待事件(在本例中为asyncio.sleep(delay)超时)的所有任务,并将控制权传递给事件准备就绪的任务。

await asyncio.sleep(delay)就 CPU 而言是非阻塞的。CPU 不是等待延迟超时,而是在事件循环任务队列上注册睡眠事件,并通过将控制权交给事件循环来执行上下文切换。事件循环不断寻找已完成的事件,并将控制权交还给等待该事件的任务。通过这种方式,CPU 可以在有工作的情况下保持忙碌,而事件循环会监视将来会发生的事件。

注意:异步程序在单个执行线程中运行。从一段代码到另一段会影响数据的上下文切换完全在您的控制之下。这意味着您可以在进行上下文切换之前将所有共享内存数据访问原子化并完成。这简化了线程代码中固有的共享内存问题。

example_4.py代码是下面列出:

 1import asyncio
 2from codetiming import Timer
 3
 4async def task(name, work_queue):
 5    timer = Timer(text=f"Task {name} elapsed time: {{:.1f}}")
 6    while not work_queue.empty():
 7        delay = await work_queue.get()
 8        print(f"Task {name} running")
 9        timer.start()
10        await asyncio.sleep(delay)
11        timer.stop()
12
13async def main():
14    """
15    This is the main entry point for the program
16    """
17    # Create the queue of work
18    work_queue = asyncio.Queue()
19
20    # Put some work in the queue
21    for work in [15, 10, 5, 2]:
22        await work_queue.put(work)
23
24    # Run the tasks
25    with Timer(text="\nTotal elapsed time: {:.1f}"):
26        await asyncio.gather(
27            asyncio.create_task(task("One", work_queue)),
28            asyncio.create_task(task("Two", work_queue)),
29        )
30
31if __name__ == "__main__":
32    asyncio.run(main())

以下是该程序与 之间的不同之处example_3.py

  • 第 1 行导入asyncio以访问 Python 异步功能。这将取代time导入。
  • 第 2 行Timercodetiming模块导入代码。
  • 第 4 行显示了asynctask()定义前添加的关键字。这通知task可以异步运行的程序。
  • 第 5 行创建了Timer用于测量任务循环每次迭代所用时间的实例。
  • 第 9 行启动timer实例
  • 第 10 行替换time.sleep(delay)为 non-blocking asyncio.sleep(delay),这也将控制(或切换上下文)返回到主事件循环。
  • 第 11 行停止timer实例并输出自timer.start()调用以来经过的时间。
  • 第 18 行创建了非阻塞异步work_queue.
  • 第 21 到 22 行work_queue使用await关键字以异步方式将工作放入。
  • 第 25 行创建了一个Timer上下文管理器,它将输出整个 while 循环执行所花费的时间。
  • 第 26 到 29 行创建了两个任务并将它们聚集在一起,因此程序将等待两个任务完成。
  • 第 32 行启动异步运行的程序。它还启动内部事件循环。

当你看这个程序的输出,通知如何都Task OneTask Two在同一时间启动,然后等待在模拟IO电话:

Task One running
Task Two running
Task Two total elapsed time: 10.0
Task Two running
Task One total elapsed time: 15.0
Task One running
Task Two total elapsed time: 5.0
Task One total elapsed time: 2.0

Total elapsed time: 17.0

这表明它await asyncio.sleep(delay)是非阻塞的,并且正在完成其他工作。

在程序结束时,您会注意到总example_3.py运行时间实际上是运行时间的一半。这就是使用 Python 异步功能的程序的优势!每个任务都能够同时运行await asyncio.sleep(delay)。程序的总执行时间现在少于其各部分的总和。你已经脱离了同步模型!

同步(阻塞)HTTP 调用

该程序的下一个版本既是一种进步,也是一种退步。该程序通过向 URL 列表发出 HTTP 请求并获取页面内容,对真实 IO 进行了一些实际工作。但是,它是以阻塞(同步)方式进行的。

程序已修改为导入精彩requests模块以进行实际的 HTTP 请求。此外,队列现在包含一个 URL 列表,而不是数字。此外,task()不再增加计数器。相反,requests获取从队列中检索到的 URL 的内容,并打印执行此操作所需的时间。

example_5.py代码是下面列出:

 1import queue
 2import requests
 3from codetiming import Timer
 4
 5def task(name, work_queue):
 6    timer = Timer(text=f"Task {name} elapsed time: {{:.1f}}")
 7    with requests.Session() as session:
 8        while not work_queue.empty():
 9            url = work_queue.get()
10            print(f"Task {name} getting URL: {url}")
11            timer.start()
12            session.get(url)
13            timer.stop()
14            yield
15
16def main():
17    """
18    This is the main entry point for the program
19    """
20    # Create the queue of work
21    work_queue = queue.Queue()
22
23    # Put some work in the queue
24    for url in [
25        "http://google.com",
26        "http://yahoo.com",
27        "http://linkedin.com",
28        "http://apple.com",
29        "http://microsoft.com",
30        "http://facebook.com",
31        "http://twitter.com",
32    ]:
33        work_queue.put(url)
34
35    tasks = [task("One", work_queue), task("Two", work_queue)]
36
37    # Run the tasks
38    done = False
39    with Timer(text="\nTotal elapsed time: {:.1f}"):
40        while not done:
41            for t in tasks:
42                try:
43                    next(t)
44                except StopIteration:
45                    tasks.remove(t)
46                if len(tasks) == 0:
47                    done = True
48
49if __name__ == "__main__":
50    main()

下面是这个程序中发生的事情:

  • 第 2 行导入requests,它提供了一种进行 HTTP 调用的便捷方式。
  • 第 3 行Timercodetiming模块中导入代码。
  • 第 6 行创建了Timer用于测量任务循环每次迭代所用时间的实例。
  • 第 11 行启动timer实例
  • 第 12 行引入了一个延迟,类似于example_3.py。但是,这次它调用session.get(url),它返回从 检索到的 URL 的内容work_queue
  • 第 13 行停止timer实例并输出自timer.start()调用以来经过的时间。
  • 第 23 到 32行将 URL 列表放入work_queue.
  • 第 39 行创建了一个Timer上下文管理器,它将输出整个 while 循环执行所花费的时间。

运行此程序时,您将看到以下输出:

Task One getting URL: http://google.com
Task One total elapsed time: 0.3
Task Two getting URL: http://yahoo.com
Task Two total elapsed time: 0.8
Task One getting URL: http://linkedin.com
Task One total elapsed time: 0.4
Task Two getting URL: http://apple.com
Task Two total elapsed time: 0.3
Task One getting URL: http://microsoft.com
Task One total elapsed time: 0.5
Task Two getting URL: http://facebook.com
Task Two total elapsed time: 0.5
Task One getting URL: http://twitter.com
Task One total elapsed time: 0.4

Total elapsed time: 3.2

就像在程序的早期版本中一样,yield变成task()了一个生成器。它还执行上下文切换,让其他任务实例运行。

每个任务从工作队列中获取一个 URL,检索页面的内容,并报告获取该内容所花费的时间。

和以前一样,yield允许您的两个任务协同运行。但是,由于该程序是同步运行的,因此每次session.get()调用都会阻塞 CPU,直到检索到页面为止。请注意最后运行整个程序所花费的总时间。这对于下一个示例将是有意义的。

异步(非阻塞)HTTP 调用

此版本的程序修改了前一个版本以使用 Python 异步功能。它还导入aiohttp模块,该模块是一个使用asyncio.

yield由于进行 HTTPGET调用的代码不再阻塞,这里的任务已被修改以移除调用。它还执行上下文切换回事件循环。

example_6.py程序如下:

 1import asyncio
 2import aiohttp
 3from codetiming import Timer
 4
 5async def task(name, work_queue):
 6    timer = Timer(text=f"Task {name} elapsed time: {{:.1f}}")
 7    async with aiohttp.ClientSession() as session:
 8        while not work_queue.empty():
 9            url = await work_queue.get()
10            print(f"Task {name} getting URL: {url}")
11            timer.start()
12            async with session.get(url) as response:
13                await response.text()
14            timer.stop()
15
16async def main():
17    """
18    This is the main entry point for the program
19    """
20    # Create the queue of work
21    work_queue = asyncio.Queue()
22
23    # Put some work in the queue
24    for url in [
25        "http://google.com",
26        "http://yahoo.com",
27        "http://linkedin.com",
28        "http://apple.com",
29        "http://microsoft.com",
30        "http://facebook.com",
31        "http://twitter.com",
32    ]:
33        await work_queue.put(url)
34
35    # Run the tasks
36    with Timer(text="\nTotal elapsed time: {:.1f}"):
37        await asyncio.gather(
38            asyncio.create_task(task("One", work_queue)),
39            asyncio.create_task(task("Two", work_queue)),
40        )
41
42if __name__ == "__main__":
43    asyncio.run(main())

下面是这个程序中发生的事情:

  • 第 2 行导入aiohttp库,它提供了一种异步方式来进行 HTTP 调用。
  • 第 3 行Timercodetiming模块中导入代码。
  • 第 5 行标记task()为异步函数。
  • 第 6 行创建了Timer用于测量任务循环每次迭代所用时间的实例。
  • 第 7 行创建了一个aiohttp会话上下文管理器。
  • 第 8 行创建了一个aiohttp响应上下文管理器。它还GET对从work_queue.
  • 第 11 行启动timer实例
  • 第 12 行使用会话异步获取从 URL 检索的文本。
  • 第 13 行停止timer实例并输出自timer.start()调用以来经过的时间。
  • 第 39 行创建了一个Timer上下文管理器,它将输出整个 while 循环执行所花费的时间。

运行此程序时,您将看到以下输出:

Task One getting URL: http://google.com
Task Two getting URL: http://yahoo.com
Task One total elapsed time: 0.3
Task One getting URL: http://linkedin.com
Task One total elapsed time: 0.3
Task One getting URL: http://apple.com
Task One total elapsed time: 0.3
Task One getting URL: http://microsoft.com
Task Two total elapsed time: 0.9
Task Two getting URL: http://facebook.com
Task Two total elapsed time: 0.4
Task Two getting URL: http://twitter.com
Task One total elapsed time: 0.5
Task Two total elapsed time: 0.3

Total elapsed time: 1.7

查看总经过时间,以及获取每个 URL 内容的时间。您会看到持续时间大约是所有 HTTPGET调用的累计时间的一半。这是因为 HTTPGET调用是异步运行的。换句话说,通过允许 CPU 一次发出多个请求,您可以有效地更好地利用 CPU。

由于 CPU 速度如此之快,本示例可能会创建与 URL 一样多的任务。在这种情况下,程序的运行时间将是单个最慢的 URL 检索的运行时间。

结论

本文为您提供了开始使异步编程技术成为您的技能的一部分所需的工具。使用 Python 异步功能可让您以编程方式控制何时发生上下文切换。这意味着您在线程编程中可能会看到的许多更棘手的问题更容易处理。

异步编程是一种强大的工具,但并不是对每种程序都有用。例如,如果您正在编写一个将 pi 计算到小数点后百万位的程序,那么异步代码将无济于事。那种程序受 CPU 限制,没有太多 IO。但是,如果您正在尝试实现执行 IO(如文件或网络访问)的服务器或程序,那么使用 Python 异步功能可能会产生巨大的差异。

总结一下,你已经学会了:

  • 什么同步程序
  • 如何异步程序是不同的,而且功能强大且易于管理
  • 为什么你可能想要编写异步程序
  • 如何使用 Python 内置的异步功能

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。