- 前言
- 目标读者
- 非目标读者
- 本书的结构
- 以实践为基础
- 硬件
- 杂谈:个人的一点看法
- Python 术语表
- Python 版本表
- 排版约定
- 使用代码示例
- 第一部分 序幕
- 第 1 章 Python 数据模型
- 第二部分 数据结构
- 第 2 章 序列构成的数组
- 第 3 章 字典和集合
- 第 4 章 文本和字节序列
- 第三部分 把函数视作对象
- 第 5 章 一等函数
- 第 6 章 使用一等函数实现设计模式
- 第 7 章 函数装饰器和闭包
- 第四部分 面向对象惯用法
- 第 8 章 对象引用、可变性和垃圾回收
- 第 9 章 符合 Python 风格的对象
- 第 10 章 序列的修改、散列和切片
- 第 11 章 接口:从协议到抽象基类
- 第 12 章 继承的优缺点
- 第 13 章 正确重载运算符
- 第五部分 控制流程
- 第 14 章 可迭代的对象、迭代器和生成器
- 14.1 Sentence 类第1版:单词序列
- 14.2 可迭代的对象与迭代器的对比
- 14.3 Sentence 类第2版:典型的迭代器
- 14.4 Sentence 类第3版:生成器函数
- 14.5 Sentence 类第4版:惰性实现
- 14.6 Sentence 类第5版:生成器表达式
- 14.7 何时使用生成器表达式
- 14.8 另一个示例:等差数列生成器
- 14.9 标准库中的生成器函数
- 14.10 Python 3.3 中新出现的句法:yield from
- 14.11 可迭代的归约函数
- 14.12 深入分析 iter 函数
- 14.13 案例分析:在数据库转换工具中使用生成器
- 14.14 把生成器当成协程
- 14.15 本章小结
- 14.16 延伸阅读
- 第 15 章 上下文管理器和 else 块
- 第 16 章 协程
- 第 17 章 使用期物处理并发
- 第 18 章 使用 asyncio 包处理并发
- 第六部分 元编程
- 第 19 章 动态属性和特性
- 第 20 章 属性描述符
- 第 21 章 类元编程
- 结语
- 延伸阅读
- 附录 A 辅助脚本
- Python 术语表
- 作者简介
- 关于封面
18.4 改进 asyncio 下载脚本
17.5 节说过,flags2 系列示例的命令行接口相同。本节要分析这个系列中的 flags2_asyncio.py 脚本。例如,示例 18-6 展示如何使用 100 个并发请求(-m 100)从 ERROR 服务器中下载 100 面国旗(-al 100)。
示例 18-6 运行 flags2_asyncio.py 脚本
$ python3 flags2_asyncio.py -s ERROR -al 100 -m 100 ERROR site: http://localhost:8003/flags Searching for 100 flags: from AD to LK 100 concurrent connections will be used. -------------------- 73 flags downloaded. 27 errors. Elapsed time: 0.64s
测试并发客户端要谨慎
尽管线程版和 asyncio 版 HTTP 客户端的下载总时间相差无几,但是 asyncio 版发送请求的速度更快,因此很有可能对服务器发起 DoS 攻击。为了全速测试这些并发客户端,应该在本地搭建 HTTP 服务器,详情参见本书代码仓库中 17-futures/countries/ 目录里的 README.rst 文件。
下面分析 flags2_asyncio.py 脚本的实现方式。
18.4.1 使用asyncio.as_completed函数
在示例 18-5 中,我把一个协程列表传给 asyncio.wait 函数,经由 loop.run_until_complete 方法驱动,全部协程运行完毕后,这个函数会返回所有下载结果。可是,为了更新进度条,各个协程运行结束后就要立即获取结果。在线程池版示例中(见示例 17-14),为了集成进度条,我们使用的是 as_completed 生成器函数;幸好,asyncio 包提供了这个生成器函数的相应版本。
为了使用 asyncio 包实现 flags2 示例,我们要重写几个函数;重写后的函数可以供 concurrent.future 版重用。之所以要重写,是因为在使用 asyncio 包的程序中只有一个主线程,而在这个线程中不能有阻塞型调用,因为事件循环也在这个线程中运行。所以,我要重写 get_flag 函数,使用 yield from 访问网络。现在,由于 get_flag 是协程, download_one 函数必须使用 yield from 驱动它,因此 download_one 自己也要变成协程。之前,在示例 18-5 中,download_one 由 download_many 驱动:download_one 函数由 asyncio. wait 函数调用,然后传给 loop.run_until_complete 方法。现在,为了报告进度并处理错误,我们要更精确地控制,所以我把 download_many 函数中的大多数逻辑移到一个新的协程 downloader_coro 中,只在 download_many 函数中设置事件循环,以及调度 downloader_coro 协程。
示例 18-7 展示的是 flags2_asyncio.py 脚本的前半部分,定义 get_flag 和 download_one 协程。示例 18-8 列出余下的源码,定义 downloader_coro 协程和 download_many 函数。
示例 18-7 flags2_asyncio.py:脚本的前半部分;余下的代码在示例 18-8 中
import asyncio import collections import aiohttp from aiohttp import web import tqdm from flags2_common import main, HTTPStatus, Result, save_flag # 默认设为较小的值,防止远程网站出错 # 例如503 - Service Temporarily Unavailable DEFAULT_CONCUR_REQ = 5 MAX_CONCUR_REQ = 1000 class FetchError(Exception): ➊ def __init__(self, country_code): self.country_code = country_code @asyncio.coroutine def get_flag(base_url, cc): ➋ url = '{}/{cc}/{cc}.gif'.format(base_url, cc=cc.lower()) resp = yield from aiohttp.request('GET', url) if resp.status == 200: image = yield from resp.read() return image elif resp.status == 404: raise web.HTTPNotFound() else: raise aiohttp.HttpProcessingError( code=resp.status, message=resp.reason, headers=resp.headers) @asyncio.coroutine def download_one(cc, base_url, semaphore, verbose): ➌ try: with (yield from semaphore): ➍ image = yield from get_flag(base_url, cc) ➎ except web.HTTPNotFound: ➏ status = HTTPStatus.not_found msg = 'not found' except Exception as exc: raise FetchError(cc) from exc ➐ else: save_flag(image, cc.lower() + '.gif') ➑ status = HTTPStatus.ok msg = 'OK' if verbose and msg: print(cc, msg) return Result(status, cc)
❶ 这个自定义的异常用于包装其他 HTTP 或网络异常,并获取 country_code,以便报告错误。
❷ get_flag 协程有三种返回结果:返回下载得到的图像;HTTP 响应码为 404 时,抛出 web.HTTPNotFound 异常;返回其他 HTTP 状态码时,抛出 aiohttp.HttpProcessingError 异常。
❸ semaphore 参数是 asyncio.Semaphore 类的实例。Semaphore 类是同步装置,用于限制并发请求数量。
❹ 在 yield from 表达式中把 semaphore 当成上下文管理器使用,防止阻塞整个系统:如果 semaphore 计数器的值是所允许的最大值,只有这个协程会阻塞。
❺ 退出这个 with 语句后,semaphore 计数器的值会递减,解除阻塞可能在等待同一个 semaphore 对象的其他协程实例。
❻ 如果没找到国旗,相应地设置 Result 的状态。
❼ 其他异常当作 FetchError 抛出,传入国家代码,并使用“PEP 3134 — Exception Chaining and Embedded Tracebacks”引入的 raise X from Y 句法链接原来的异常。
❽ 这个函数的作用是把国旗文件保存到硬盘中。
可以看出,与依序下载版相比,示例 18-7 中的 get_flag 和 download_one 函数改动幅度很大,因为现在这两个函数是协程,要使用 yield from 做异步调用。
对于我们分析的这种网络客户端代码来说,一定要使用某种限流机制,防止向服务器发起太多并发请求,因为如果服务器过载,那么系统的整体性能可能会下降。flags2_threadpool.py 脚本(见示例 17-14)限流的方法是,在 download_many 函数中实例化 ThreadPoolExecutor 类时把 max_workers 参数的值设为 concur_req,只在线程池中启动 concur_req 个线程。在 flags2_asyncio.py 脚本中我的做法是,在 downloader_coro 函数中创建一个 asyncio.Semaphore 实例(在后面的示例 18-8 中),然后把它传给示例 18-7 中 download_one 函数的 semaphore 参数。7
7感谢 Guto Maia 指出本书的草稿没有说明 Semaphore 类。
Semaphore 对象维护着一个内部计数器,若在对象上调用 .acquire() 协程方法,计数器则递减;若在对象上调用 .release() 协程方法,计数器则递增。计数器的初始值在实例化 Semaphore 时设定,如 downloader_coro 函数中的这一行所示:
semaphore = asyncio.Semaphore(concur_req)
如果计数器大于零,那么调用 .acquire() 方法不会阻塞;可是,如果计数器为零,那么 .acquire() 方法会阻塞调用这个方法的协程,直到其他协程在同一个 Semaphore 对象上调用 .release() 方法,让计数器递增。在示例 18-7 中,我没有调用 .acquire() 或 .release() 方法,而是在 download_one 函数中的下述代码块中把 semaphore 当作上下文管理器使用:
with (yield from semaphore): image = yield from get_flag(base_url, cc)
这段代码保证,任何时候都不会有超过 concur_req 个 get_flag 协程启动。
现在来分析示例 18-8 中这个脚本余下的代码。注意,download_many 函数中以前的大多数功能现在都在 downloader_coro 协程中。我们必须这么做,因为必须使用 yield from 获取 asyncio.as_completed 函数产出的期物的结果,所以 as_completed 函数必须在协程中调用。可是,我不能直接把 download_many 函数改成协程,因为必须在脚本的最后一行把 download_many 函数传给 flags2_common 模块中定义的 main 函数,可 main 函数的参数不是协程,而是一个普通的函数。因此,我定义了 downloader_coro 协程,让它运行 as_completed 循环。现在,download_many 函数只用于设置事件循环,并把 downloader_coro 协程传给 loop.run_until_complete 方法,调度 downloader_coro。
示例 18-8 flags2_asyncio.py:接续示例 18-7
@asyncio.coroutine def downloader_coro(cc_list, base_url, verbose, concur_req): ➊ counter = collections.Counter() semaphore = asyncio.Semaphore(concur_req) ➋ to_do = [download_one(cc, base_url, semaphore, verbose) for cc in sorted(cc_list)] ➌ to_do_iter = asyncio.as_completed(to_do) ➍ if not verbose: to_do_iter = tqdm.tqdm(to_do_iter, total=len(cc_list)) ➎ for future in to_do_iter: ➏ try: res = yield from future ➐ except FetchError as exc: ➑ country_code = exc.country_code ➒ try: error_msg = exc.__cause__.args[0] ➓ except IndexError: error_msg = exc.__cause__.__class__.__name__ ⓫ if verbose and error_msg: msg = '*** Error for {}: {}' print(msg.format(country_code, error_msg)) status = HTTPStatus.error else: status = res.status counter[status] += 1 ⓬ return counter ⓭ def download_many(cc_list, base_url, verbose, concur_req): loop = asyncio.get_event_loop() coro = downloader_coro(cc_list, base_url, verbose, concur_req) counts = loop.run_until_complete(coro) ⓮ loop.close() ⓯ return counts if __name__ == '__main__': main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ)
❶ 这个协程的参数与 download_many 函数一样,但是不能直接调用,因为它是协程函数,而不是像 download_many 那样的普通函数。
❷ 创建一个 asyncio.Semaphore 实例,最多允许激活 concur_req 个使用这个计数器的协程。
❸ 多次调用 download_one 协程,创建一个协程对象列表。
❹ 获取一个迭代器,这个迭代器会在期物运行结束后返回期物。
❺ 把迭代器传给 tqdm 函数,显示进度。
❻ 迭代运行结束的期物;这个循环与示例 17-14 中 download_many 函数里的那个十分相似;不同的部分主要是异常处理,因为两个 HTTP 库(requests 和 aiohttp)之间有差异。
❼ 获取 asyncio.Future 对象的结果,最简单的方法是使用 yield from,而不是调用 future.result() 方法。
❽ download_one 函数抛出的各个异常都包装在 FetchError 对象里,并且链接原来的异常。
❾ 从 FetchError 异常中获取错误发生时的国家代码。
❿ 尝试从原来的异常(__cause__)中获取错误消息。
⓫ 如果在原来的异常中找不到错误消息,使用所链接异常的类名作为错误消息。
⓬ 记录结果。
⓭ 与其他脚本一样,返回计数器。
⓮ download_many 函数只是实例化 downloader_coro 协程,然后通过 run_until_complete 方法把它传给事件循环。
⓯ 所有工作做完后,关闭事件循环,返回 counts。
在示例 18-8 中不能像示例 17-14 那样把期物映射到国家代码上,因为 asyncio.as_completed 函数返回的期物与传给 as_completed 函数的期物可能不同。在 asyncio 包内部,我们提供的期物会被替换成生成相同结果的期物。8
8关于这一点的详细讨论,可以阅读我在 python-tulip 讨论组中发起的话题,题为“Which other futures my come out of asyncio.as_completed?”。Guido 回复了,而且深入分析了 as_completed 函数的实现,还说明了 asyncio 包中期物与协程之间的紧密关系。
因为失败时不能以期物为键从字典中获取国家代码,所以我实现了自定义的 FetchError 异常(如示例 18-7 所示)。FetchError 包装网络异常,并关联相应的国家代码,因此在详细模式中报告错误时能显示国家代码。如果没有错误,那么国家代码是 for 循环顶部那个 yield from future 表达式的结果。
我们使用 asyncio 包实现的这个示例与前面的 flags2_threadpool.py 脚本具有相同的功能,这一话题到此结束。接下来,我们要改进 flags2_asyncio.py 脚本,进一步探索 asyncio 包。
在分析示例 18-7 的过程中,我发现 save_flag 函数会执行硬盘 I/O 操作,而这应该异步执行。下一节说明做法。
18.4.2 使用Executor对象,防止阻塞事件循环
Python 社区往往会忽略一个事实——访问本地文件系统会阻塞,想当然地认为这种操作不会受网络访问的高延迟影响(这也极难预料)。与之相比,Node.js 程序员则始终谨记,所有文件系统函数都会阻塞,因为这些函数的签名中指明了要有回调。表 18-1 已经指出,硬盘 I/O 阻塞会浪费几百万个 CPU 周期,而这可能会对应用程序的性能产生重大影响。
在示例 18-7 中,阻塞型函数是 save_flag。在这个脚本的线程版中(见示例 17-14),save_flag 函数会阻塞运行 download_one 函数的线程,但是阻塞的只是众多工作线程中的一个。阻塞型 I/O 调用在背后会释放 GIL,因此另一个线程可以继续。但是在 flags2_asyncio.py 脚本中,save_flag 函数阻塞了客户代码与 asyncio 事件循环共用的唯一线程,因此保存文件时,整个应用程序都会冻结。这个问题的解决方法是,使用事件循环对象的 run_in_executor 方法。
asyncio 的事件循环在背后维护着一个 ThreadPoolExecutor 对象,我们可以调用 run_in_executor 方法,把可调用的对象发给它执行。若想在这个示例中使用这个功能,download_one 协程只有几行代码需要改动,如示例 18-9 所示。
示例 18-9 flags2_asyncio_executor.py:使用默认的 ThreadPoolExecutor 对象运行 save_flag 函数
@asyncio.coroutine def download_one(cc, base_url, semaphore, verbose): try: with (yield from semaphore): image = yield from get_flag(base_url, cc) except web.HTTPNotFound: status = HTTPStatus.not_found msg = 'not found' except Exception as exc: raise FetchError(cc) from exc else: loop = asyncio.get_event_loop() ➊ loop.run_in_executor(None, ➋ save_flag, image, cc.lower() + '.gif') ➌ status = HTTPStatus.ok msg = 'OK' if verbose and msg: print(cc, msg) return Result(status, cc)
❶ 获取事件循环对象的引用。
❷ run_in_executor 方法的第一个参数是 Executor 实例;如果设为 None,使用事件循环的默认 ThreadPoolExecutor 实例。
❸ 余下的参数是可调用的对象,以及可调用对象的位置参数。
我测试示例 18-9 时,没有发现改用 run_in_executor 方法保存图像文件后性能有明显变化,因为图像都不大(平均 13KB)。不过,如果编辑 flags2_common.py 脚本中的 save_flag 函数,把各个文件保存的字节数变成原来的 10 倍(只需把 fp.write(img) 改成 fp.write(img*10)),此时便会看到效果。下载的平均字节数变成 130KB 后,使用 run_in_executor 方法的好处就体现出来了。如果下载包含百万像素的图像,速度提升更明显。
如果需要协调异步请求,而不只是发起完全独立的请求,协程较之回调的好处会变得显而易见。下一节说明回调的问题,并给出解决方法。
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论