【异步爬虫】学习笔记

举报
爱打瞌睡的CV君 发表于 2022/07/07 23:45:44 2022/07/07
【摘要】 文章目录 一、asyncio1、定义协程2、asyncio的一些方法①、asyncio.wait()②、asyncio.gather()③、asyncio.as_completed() ...

一、asyncio

  • event_loop:事件循环,相当于一个无限循环,我们可以把一些函数注册到这个事件循环上,当满足条件发生的时候,就会调用对应的处理方法。
  • coroutine:中文翻译叫协程,在 Python 中常指代为协程对象类型,我们可以将协程对象注册到时间循环中,它会被事件循环调用。我们可以使用 - async 关键字来定义一个方法,这个方法在调用时不会立即被执行,而是返回一个协程对象。
  • task:任务,它是对协程对象的进一步封装,包含了任务的各个状态。
  • future:代表将来执行或没有执行的任务的结果,实际上和 task 没有本质区别。

1、定义协程

  • 第一个例子
import asyncio


async def execute(x):
    print('Number:', x)

coroutine = execute(1)
print('After calling execute')
print('Coroutine:', coroutine)
print('*'*40, '分割线', '*'*40)
print('After calling loop')
loop = asyncio.get_event_loop()
loop.run_until_complete(coroutine)

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 输出结果为:
After calling execute
Coroutine: <coroutine object execute at 0x000002836EB16C40>
**************************************** 分割线 ****************************************
After calling loop
Number: 1

  
 
  • 1
  • 2
  • 3
  • 4
  • 5

首先,引入asyncio这个包,这样菜鸟使用asyncawait
然后,使用async定义一个execute()方法,这个方法的功能是,接收一个数字之后,打印出这个数字;
紧接着,尝试直接调用这个方法,但这个方法没有执行,而是返回了一个coroutine协程对象
之后,使用get_event_loop()方法创建一个事件循环loop,并调用loop对象run_until_complete()方法将协程注册到loop中,并启动;
这次之后,就可以看到execute()方法输出的结果。

结论: async定义的方法会编程一个无法执行的coroutine协程对象,必须将其注册到事件循环中才能执行。


一开始,还提到了task,相对于coroutine对象,它多了运行状态,我们可以根据这些状态来获取协程对象的执行情况。

上个例子,当我们将coroutine对象传递给run_until_complete()方法的时候,实际上它进行了一个操作就是将coroutine封装成了task对象

  • 实操验证一下:
import asyncio


async def execute(x):
    print('Number:', x)

coroutine = execute(1)
print('After calling execute')
print('Coroutine:', coroutine)
print('*'*40, '分割线', '*'*40)
print('After calling loop')
loop = asyncio.get_event_loop()
task = loop.create_task(coroutine)
print('task:', task)
loop.run_until_complete(task)
print('task:', task)

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 输出结果:
After calling execute
Coroutine: <coroutine object execute at 0x000001A58E626B40>
**************************************** 分割线 ****************************************
After calling loop
task: <Task pending name='Task-1' coro=<execute() running at ‘这里是文件路径’:10>>
Number: 1
task: <Task finished name='Task-1' coro=<execute() done, defined at ‘这里是文件路径’:10> result=None>

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

这里在定义loop对象之后,紧接着调用了它的create_task()方法coroutine对象转化为了task对象,然后打印输出,发现它是pending状态;
然后,将task对象添加到事件循环中得到执行,紧接着再打印输出,发现它的状态变成了finished,与此同时,还可以发现result变成了1(也就是定义的execute()方法的返回结果)。


直接通过asyncio的ensure_future()方法,不需要借助loop来定义,也可以返回task对象。

  • 实操验证一下:
import asyncio


async def execute(x):
    print('Number:', x)

coroutine = execute(1)
print('After calling execute')
print('Coroutine:', coroutine)
print('*'*40, '分割线', '*'*40)
print('After calling loop')
task = asyncio.ensure_future(coroutine)
print('task:', task)
loop = asyncio.get_event_loop()
loop.run_until_complete(task)
print('task:', task)

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 输出结果:
After calling execute
Coroutine: <coroutine object execute at 0x000001A58E626B40>
**************************************** 分割线 ****************************************
After calling loop
task: <Task pending name='Task-1' coro=<execute() running at ‘这里是文件路径’:10>>
Number: 1
task: <Task finished name='Task-1' coro=<execute() done, defined at ‘这里是文件路径’:10> result=None>

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

和上例的结果是一样的

2、asyncio的一些方法


①、asyncio.wait()

可以将一个操作分成多个部分并分开执行,而wait(tasks)可以被用于中断任务集合(tasks)中的某个被事件循环轮询到的任务,直到该协程的其他后台操作完成才被唤醒。

  • 例如:
import time
import asyncio


async def task_1():
    print('开始运行IO任务1...')
    await asyncio.sleep(3)  # 假设该任务耗时3s
    print('IO任务1已完成,耗时3s')
    return task_1.__name__


async def task_2():
    print('开始运行IO任务2...')
    await asyncio.sleep(2)  # 假设该任务耗时2s
    print('IO任务2已完成,耗时2s')
    return task_2.__name__


async def main():  # 调用方
    tasks = [task_1(), task_2()]  # 把所有任务添加到task中
    done,pending = await asyncio.wait(tasks)  # 子生成器
    for r in done:  # done和pending都是一个任务,所以返回结果需要逐个调用result()
        print('协程无序返回值:'+r.result())

if __name__ == '__main__':
    start = time.time()
    loop = asyncio.get_event_loop() # 创建一个事件循环对象loop
    try:
        loop.run_until_complete(main()) # 完成事件循环,直到最后一个任务结束
    finally:
        loop.close()  # 结束事件循环
    print('所有IO任务总耗时%.5f秒' % float(time.time()-start))

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 输出结果:
开始运行IO任务1...
开始运行IO任务2...
IO任务2已完成,耗时2s
IO任务1已完成,耗时3s
协程无序返回值:task_2
协程无序返回值:task_1
所有IO任务总耗时3.00769
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 其中:
done, pending = await asyncio.wait(aws)

  
 
  • 1

此处并发运行传入的aws(awaitable objects),同时通过await返回一个包含(done, pending)的元组,done表示已完成的任务列表,pending表示未完成的任务列表。
注:
①只有当给wait()传入timeout参数时才有可能产生pending列表。
②通过wait()返回的结果集是按照事件循环中的任务完成顺序排列的,所以其往往和原始任务顺序不同。

②、asyncio.gather()

如果只关心协程并发运行后的结果集合,可以使用gather(),它不仅通过await返回仅一个结果集,而且这个结果集的结果顺序是传入任务的原始顺序。

  • 例如:
import time
import asyncio
async def taskIO_1():
    print('开始运行IO任务1...')
    await asyncio.sleep(3)  # 假设该任务耗时3s
    print('IO任务1已完成,耗时3s')
    return taskIO_1.__name__
async def taskIO_2():
    print('开始运行IO任务2...')
    await asyncio.sleep(2)  # 假设该任务耗时2s
    print('IO任务2已完成,耗时2s')
    return taskIO_2.__name__
async def main(): # 调用方
    resualts = await asyncio.gather(taskIO_1(), taskIO_2()) # 子生成器
    print(resualts)

if __name__ == '__main__':
    start = time.time()
    loop = asyncio.get_event_loop() # 创建一个事件循环对象loop
    try:
        loop.run_until_complete(main()) # 完成事件循环,直到最后一个任务结束
    finally:
        loop.close() # 结束事件循环
    print('所有IO任务总耗时%.5f秒' % float(time.time()-start))


  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 结果为:
开始运行IO任务1...
开始运行IO任务2...
IO任务2已完成,耗时2s
IO任务1已完成,耗时3s
['taskIO_1', 'taskIO_2']
所有IO任务总耗时3.00936
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

gather()通过await直接返回一个结果集列表,我们可以清晰的从执行结果看出来,虽然任务2是先完成的,但最后返回的结果集的顺序是按照初始传入的任务顺序排的。

③、asyncio.as_completed()

as_completed(tasks)是一个生成器,它管理着一个协程列表(此处是传入的tasks)的运行。当任务集合中的某个任务率先执行完毕时,会率先通过await关键字返回该任务结果。可见其返回结果的顺序和wait()一样,均是按照完成任务顺序排列的。

  • 例如:
import time
import asyncio
async def taskIO_1():
    print('开始运行IO任务1...')
    await asyncio.sleep(3)  # 假设该任务耗时3s
    print('IO任务1已完成,耗时3s')
    return taskIO_1.__name__
async def taskIO_2():
    print('开始运行IO任务2...')
    await asyncio.sleep(2)  # 假设该任务耗时2s
    print('IO任务2已完成,耗时2s')
    return taskIO_2.__name__
async def main(): # 调用方
    tasks = [taskIO_1(), taskIO_2()]  # 把所有任务添加到task中
    for completed_task in asyncio.as_completed(tasks):
        resualt = await completed_task # 子生成器
        print('协程无序返回值:'+resualt)

if __name__ == '__main__':
    start = time.time()
    loop = asyncio.get_event_loop() # 创建一个事件循环对象loop
    try:
        loop.run_until_complete(main()) # 完成事件循环,直到最后一个任务结束
    finally:
        loop.close() # 结束事件循环
    print('所有IO任务总耗时%.5f秒' % float(time.time()-start))


  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 结果为:
开始运行IO任务2...
开始运行IO任务1...
IO任务2已完成,耗时2s
协程无序返回值:taskIO_2
IO任务1已完成,耗时3s
协程无序返回值:taskIO_1
所有IO任务总耗时3.00300
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

从上面的程序可以看出,使用as_completed(tasks)wait(tasks)相同之处是返回结果的顺序是协程的完成顺序,这与gather()恰好相反。而不同之处是as_completed(tasks)可以实时返回当前完成的结果,而wait(tasks)需要等待所有协程结束后返回的done去获得结果。

二、aiohttp


1、多线程与异步的区别

在这里插入图片描述
异步爬虫不同于多进程爬虫,它使用单线程(即仅创建一个事件循环,然后把所有任务添加到事件循环中)就能并发处理多任务。在轮询到某个任务后,当遇到耗时操作(如请求URL)时,挂起该任务并进行下一个任务,当之前被挂起的任务更新了状态(如获得了网页响应),则被唤醒,程序继续从上次挂起的地方运行下去。极大的减少了中间不必要的等待时间。


2、aiohttp安装

  • 原因
    aiohttp库,用来实现异步网页请求等功能,相当于异步版的requests
  • 安装
pip3 install aiohttp

  
 
  • 1

3、ClientSession

在协程中使用ClientSession()get()request()方法来请求网页。(其中async with异步上下文管理器,其封装了异步实现等功能)

  • 例如:
import aiohttp
import asyncio


async def get_text():
    async with aiohttp.ClientSession() as session:
        async with session.get('http://httpbin.org/get') as resp:
            print(resp.status)
            print(await resp.text())


def main():
    loop = asyncio.get_event_loop()
    task = get_text()
    loop.run_until_complete(task)
    loop.close()


if __name__ == '__main__':
     main()

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 输出结果为:
200
{
  "args": {}, 
  "headers": {
    "Accept": "*/*", 
    "Accept-Encoding": "gzip, deflate", 
    "Host": "httpbin.org", 
    "User-Agent": "Python/3.8 aiohttp/3.8.1", 
    "X-Amzn-Trace-Id": "Root=1-6216f458-66a3ef0733ff064f527672f8"
  }, 
  "origin": "36.153.167.77", 
  "url": "http://httpbin.org/get"
}

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 还可以这样使用:
session.request(method='GET', url='http://httpbin.org/request')

  
 
  • 1

三、参考文章

Python 中异步协程的使用方法介绍

Python异步IO之协程(二):使用asyncio的不同方法实现协程

Python实战异步爬虫(协程)+分布式爬虫(多进程)

文章来源: luckystar.blog.csdn.net,作者:爱打瞌睡的CV君,版权归原作者所有,如需转载,请联系作者。

原文链接:luckystar.blog.csdn.net/article/details/123100078

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。