- 前言
- 目标读者
- 非目标读者
- 本书的结构
- 以实践为基础
- 硬件
- 杂谈:个人的一点看法
- Python 术语表
- Python 版本表
- 排版约定
- 使用代码示例
- 第一部分 序幕
- 第 1 章 Python 数据模型
- 第二部分 数据结构
- 第 2 章 序列构成的数组
- 第 3 章 字典和集合
- 第 4 章 文本和字节序列
- 第三部分 把函数视作对象
- 第 5 章 一等函数
- 第 6 章 使用一等函数实现设计模式
- 第 7 章 函数装饰器和闭包
- 第四部分 面向对象惯用法
- 第 8 章 对象引用、可变性和垃圾回收
- 第 9 章 符合 Python 风格的对象
- 第 10 章 序列的修改、散列和切片
- 第 11 章 接口:从协议到抽象基类
- 第 12 章 继承的优缺点
- 第 13 章 正确重载运算符
- 第五部分 控制流程
- 第 14 章 可迭代的对象、迭代器和生成器
- 14.1 Sentence 类第1版:单词序列
- 14.2 可迭代的对象与迭代器的对比
- 14.3 Sentence 类第2版:典型的迭代器
- 14.4 Sentence 类第3版:生成器函数
- 14.5 Sentence 类第4版:惰性实现
- 14.6 Sentence 类第5版:生成器表达式
- 14.7 何时使用生成器表达式
- 14.8 另一个示例:等差数列生成器
- 14.9 标准库中的生成器函数
- 14.10 Python 3.3 中新出现的句法:yield from
- 14.11 可迭代的归约函数
- 14.12 深入分析 iter 函数
- 14.13 案例分析:在数据库转换工具中使用生成器
- 14.14 把生成器当成协程
- 14.15 本章小结
- 14.16 延伸阅读
- 第 15 章 上下文管理器和 else 块
- 第 16 章 协程
- 第 17 章 使用期物处理并发
- 第 18 章 使用 asyncio 包处理并发
- 第六部分 元编程
- 第 19 章 动态属性和特性
- 第 20 章 属性描述符
- 第 21 章 类元编程
- 结语
- 延伸阅读
- 附录 A 辅助脚本
- Python 术语表
- 作者简介
- 关于封面
17.5 显示下载进度并处理错误
前面说过,17.1 节中的几个脚本没有处理错误,这样做是为了便于阅读和比较三种方案(依序、多线程和异步)的结构。
为了处理各种错误,我创建了 flags2 系列示例。
flags2_common.py
这个模块中包含所有 flags2 示例通用的函数和设置,例如 main 函数,负责解析命令行参数、计时和报告结果。这个脚本中的代码其实是提供支持的,与本章的话题没有直接关系,因此我把源码放在附录 A 里的示例 A-10 中。
flags2_sequential.py
能正确处理错误,以及显示进度条的 HTTP 依序下载客户端。flags2_threadpool.py 脚本会用到这个模块里的 download_one 函数。
flags2_threadpool.py
基于 futures.ThreadPoolExecutor 类实现的 HTTP 并发客户端,演示如何处理错误,以及集成进度条。
flags2_asyncio.py
与前一个脚本的作用相同,不过使用 asyncio 和 aiohttp 实现。这个脚本在第 18 章的 18.4 节中分析。
测试并发客户端时要小心
在公开的 HTTP 服务器上测试 HTTP 并发客户端时要小心,因为每秒可能会发起很多请求,这相当于是拒绝服务(DoS)攻击。我们不想攻击任何人,只是在学习如何开发高性能的客户端。访问公开的服务器时一定要管好自己的客户端。做高并发试验时,应该在本地架设 HTTP 服务器供测试。本书代码仓库中的 17-futures/countries/ 目录里有个 README.rst 文件,那里有架设说明。
flags2 系列示例最明显的特色是,有使用 TQDM 包实现的文本动画进度条。我在 YouTube 上发布了一个 108 秒的视频,展示了这个进度条,还对比了三个 flags 脚本的下载速度。在那个视频中,我先运行依序下载的脚本,不过 32 秒后中断了,因为那个脚本要用 5 分多钟访问 676 个 URL,下载 194 面国旗;然后,我分别运行多线程和 asyncio 版三次,每次都在 6 秒之内(即快了 60 多倍)完成任务。图 17-1 中有两个截图,分别是 flags2_threadpool.py 脚本运行中和运行结束后。
图 17-1:(左上)flags2_threadpool.py 运行中,显示着 tqdm 包生成的进度条;(右下)同一个终端窗口,脚本运行完毕后
TQDM 包特别易于使用,项目的 README.md 文件中有个 GIF 动画,演示了最简单的用法。安装 tqdm 包之后,8 在 Python 控制台中输入下述代码,会在注释那里看到进度条动画:
8可以使用 pip install tqdm 命令安装 tqdm 包。——编者注
>>> import time >>> from tqdm import tqdm >>> for i in tqdm(range(1000)): ... time.sleep(.01) ... >>> # -> 进度条会出现在这里 <-
除了这个灵巧的效果之外,tqdm 函数的实现方式也很有趣:能处理任何可迭代的对象,生成一个迭代器;使用这个迭代器时,显示进度条和完成全部迭代预计的剩余时间。为了计算预计剩余时间,tqdm 函数要获取一个能使用 len 函数确定大小的可迭代对象,或者在第二个参数中指定预期的元素数量。借助在 flags2 系列示例中集成 TQDM,我们可以深入了解这几个脚本的运作方式,因为我们必须使用 futures.as_completed 函数和 asyncio.as_completed 函数,这样 tqdm 函数才能在每个期物运行结束后更新进度。
flags2 系列示例的另一个特色是,提供了命令行接口。三个脚本接受的选项相同,运行任意一个脚本时指定 -h 选项就能看到所有选项。示例 17-8 显示的是帮助文本。
示例 17-8 flags2 系列脚本的帮助界面
$ python3 flags2_threadpool.py -h usage: flags2_threadpool.py [-h] [-a] [-e] [-l N] [-m CONCURRENT] [-s LABEL] [-v] [CC [CC ...]] Download flags for country codes. Default: top 20 countries by population. positional arguments: CC country code or 1st letter (eg. B for BA...BZ) optional arguments: -h, --help show this help message and exit -a, --all get all available flags (AD to ZW) -e, --every get flags for every possible code (AA...ZZ) -l N, --limit N limit to N first codes -m CONCURRENT, --max_req CONCURRENT maximum concurrent requests (default=30) -s LABEL, --server LABEL Server to hit; one of DELAY, ERROR, LOCAL, REMOTE (default=LOCAL) -v, --verbose output detailed progress info
所有选项都是可选的。下面说明最重要的选项。
不能忽略的选项是 -s/--server:用于选择测试时使用的 HTTP 服务器和基 URL。这个选项的值可以设为下述 4 个字符串(不区分大小写),用于确定脚本从哪里下载国旗。
LOCAL
使用 http://localhost:8001/flags;这是默认值。你应该配置一个本地 HTTP 服务器,响应 8001 端口的请求。我测试时使用 Nginx。本章示例代码中的 README.rst 文件说明了如何安装和配置 Nginx。
REMOTE
使用 http://flupy.org/data/flags;这是我搭建的公开网站,托管在一个共享服务器中。请不要使用太多并发请求访问这个网站。flupy.org 域名由 Cloudflare CDN 的一个免费账户管理,因此第一次下载时会发现很慢,不过一旦 CDN 有了缓存,速度就会变快。9
9测试这些脚本时,我向那个廉价的虚拟主机发起了一些并发请求,但是得到的响应是“HTTP 503 errors—Service Temporarily Unavailable”。后来我配置了 Cloudflare,现在没有这个错误了。
DELAY
使用 http://localhost:8002/flags;这是一个代理,会延迟 HTTP 响应,监听的端口是 8002。我在本地的 Nginx 服务器前加上了 Mozilla Vaurien,以此引入延迟。前面提到的那个 README.rst 文件中有运行 Vaurien 代理的说明。
ERROR
使用 http://localhost:8003/flags;这是一个代理,监听 8003 端口,引入了 HTTP 错误,并延迟响应。这个服务器使用的 Vaurien 配置与前面不同。
仅当在本地架设 HTTP 服务器,并且监听 8001 端口时,才能使用 LOCAL 选项。DELAY 和 ERROR 选项需要代理,分别监听 8002 和 8003 端口。在 GitHub 上本书的代码仓库中有个 17-futures/countries/README.rst 文件,说明了如何配置 Nginx 和 Mozilla Vaurien,以实现这些选项的要求。
默认情况下,各个 flags2 脚本会使用默认的并发连接数(各脚本有所不同)从 LOCAL 服务器中下载人口最多的 20 个国家的国旗。示例 17-9 是全部使用默认值运行 flags2_sequential.py 脚本得到的输出。
示例 17-9 全部使用默认值运行 flags2_sequential.py 脚本:LOCAL 服务器,人口最多的 20 国国旗,1 个并发连接
$ python3 flags2_sequential.py LOCAL site: http://localhost:8001/flags Searching for 20 flags: from BD to VN 1 concurrent connection will be used. -------------------- 20 flags downloaded. Elapsed time: 0.10s
我们可以使用多种不同的方式选择下载哪些国家的国旗。示例 17-10 展示如何下载国家代码以字母 A、B 或 C 开头的所有国旗。
示例 17-10 运行 flags2_threadpool.py 脚本,从 DELAY 服务器中下载国家代码以 A、B 或 C 开头的所有国旗
$ python3 flags2_threadpool.py -s DELAY a b c DELAY site: http://localhost:8002/flags Searching for 78 flags: from AA to CZ 30 concurrent connections will be used. -------------------- 43 flags downloaded. 35 not found. Elapsed time: 1.72s
不管使用什么方式选择国家代码,下载的国旗数量都可以使用 -l/--limit 选项限制。示例 17-11 演示如何发起 100 个请求,结合 -a 和 -l 选项下载 100 面国旗。
示例 17-11 运行 flags2_asyncio.py 脚本,使用 100 个并发请求(-m 100)从 ERROR 服务器中下载100 面国旗(-al 100)
$ python3 flags2_asyncio.py -s ERROR -al 100 -m 100 ERROR site: http://localhost:8003/flags Searching for 100 flags: from AD to LK 100 concurrent connections will be used. -------------------- 73 flags downloaded. 27 errors. Elapsed time: 0.64s
以上是 flags2 系列示例的用户界面。下面分析实现方式。
17.5.1 flags2系列示例处理错误的方式
这三个示例在负责下载一个文件的函数(download_one)中使用相同的策略处理 HTTP 404 错误(未找到)。其他异常则向上冒泡,交给 download_many 函数处理。
我们还是先分析依序下载的代码,因为这些代码更易于理解,而且使用线程池的脚本重用了这里的大部分代码。示例 17-12 列出的是 flags2_sequential.py 和 flags2_threadpool.py 脚本真正用于下载的函数。
示例 17-12 flags2_sequential.py:负责下载的基本函数;flags2_threadpool.py 脚本重用了这两个函数
def get_flag(base_url, cc): url = '{}/{cc}/{cc}.gif'.format(base_url, cc=cc.lower()) resp = requests.get(url) if resp.status_code != 200: ➊ resp.raise_for_status() return resp.content def download_one(cc, base_url, verbose=False): try: image = get_flag(base_url, cc) except requests.exceptions.HTTPError as exc: ➋ res = exc.response if res.status_code == 404: status = HTTPStatus.not_found ➌ msg = 'not found' else: ➍ raise else: save_flag(image, cc.lower() + '.gif') status = HTTPStatus.ok msg = 'OK' if verbose: ➎ print(cc, msg) return Result(status, cc) ➏
❶ get_flag 函数没有处理错误,当 HTTP 代码不是 200 时,使用 requests.Response.raise_for_status 方法抛出异常。10
10HTTP 代码 200 表示成功完成 HTTP 请求。——编者注
❷ download_one 函数捕获 requests.exceptions.HTTPError 异常,特别处理 HTTP 404 错误……
❸ ……方法是,把局部变量 status 设为 HTTPStatus.not_found;HTTPStatus 是从 flags2_common 模块(见示例 A-10)中导入的 Enum 对象。
❹ 重新抛出其他 HTTPError 异常;这些异常会向上冒泡,传给调用方。
❺ 如果在命令行中设定了 -v/--verbose 选项,显示国家代码和状态消息;这就是详细模式中看到的进度信息。
➏ download_one 函数的返回值是一个 namedtuple——Result,其中有个 status 字段,其值是 HTTPStatus.not_found 或 HTTPStatus.ok。
示例 17-13 列出的是 download_many 函数的依序下载版。代码虽然简单,不过值得分析一下,以便后面与并发版对比。我们要关注的是报告进度、处理错误和统计下载数量的方式。
示例 17-13 flags2_sequential.py:实现依序下载的 download_many 函数
def download_many(cc_list, base_url, verbose, max_req): counter = collections.Counter() ➊ cc_iter = sorted(cc_list) ➋ if not verbose: cc_iter = tqdm.tqdm(cc_iter) ➌ for cc in cc_iter: ➍ try: res = download_one(cc, base_url, verbose) ➎ except requests.exceptions.HTTPError as exc: ➏ error_msg = 'HTTP error {res.status_code} - {res.reason}' error_msg = error_msg.format(res=exc.response) except requests.exceptions.ConnectionError as exc: ➐ error_msg = 'Connection error' else: ➑ error_msg = '' status = res.status if error_msg: status = HTTPStatus.error ➒ counter[status] += 1 ➓ if verbose and error_msg: ⓫ print('*** Error for {}: {}'.format(cc, error_msg)) return counter ⓬
❶ 这个 Counter 实例用于统计不同的下载状态:HTTPStatus.ok、HTTPStatus.not_found 或 HTTPStatus.error。
❷ 按字母顺序传入的国家代码列表,保存在 cc_iter 变量中。
❸ 如果不是详细模式,把 cc_iter 传给 tqdm 函数,返回一个迭代器,产出 cc_iter 中的元素,还会显示进度条动画。
❹ 这个 for 循环迭代 cc_iter……
❺ ……不断调用 download_one 函数,执行下载。
❻ 处理 get_flag 函数抛出的与 HTTP 有关的且 download_one 函数没有处理的异常。
❼ 处理其他与网络有关的异常。其他异常会中止这个脚本,因为调用 download_many 函数的 flags2_common.main 函数中没有 try/except 块。
❽ 如果没有异常从 download_one 函数中逃出,从 download_one 函数返回的 namedtuple(HTTPStatus)中获取 status。
❾ 如果有错误,把局部变量 status 设为相应的状态。
❿ 以 HTTPStatus(一个 Enum)中的值为键,增加计数器。
⓫ 如果是详细模式,而且有错误,显示带有当前国家代码的错误消息。
⓬ 返回 counter,以便 main 函数能在最终的报告中显示数量。
下面分析重构后的线程池示例——flags2_threadpool.py。
17.5.2 使用futures.as_completed函数
为了集成 TQDM 进度条,并处理各次请求中的错误,flags2_threadpool.py 脚本用到我们见过的 futures.ThreadPoolExecutor 类和 futures.as_completed 函数。示例 17-14 是 flags2_threadpool.py 脚本的完整代码清单。这个脚本只实现了 download_many 函数,其他函数都重用 flags2_common 和 flags2_sequential 模块里的。
示例 17-14 flags2_threadpool.py:完整的代码清单
import collections from concurrent import futures import requests import tqdm ➊ from flags2_common import main, HTTPStatus ➋ from flags2_sequential import download_one ➌ DEFAULT_CONCUR_REQ = 30 ➍ MAX_CONCUR_REQ = 1000 ➎ def download_many(cc_list, base_url, verbose, concur_req): counter = collections.Counter() with futures.ThreadPoolExecutor(max_workers=concur_req) as executor: ➏ to_do_map = {} ➐ for cc in sorted(cc_list): ➑ future = executor.submit(download_one, cc, base_url, verbose) ➒ to_do_map[future] = cc ➓ done_iter = futures.as_completed(to_do_map) ⓫ if not verbose: done_iter = tqdm.tqdm(done_iter, total=len(cc_list)) ⓬ for future in done_iter: ⓭ try: res = future.result() ⓮ except requests.exceptions.HTTPError as exc: ⓯ error_msg = 'HTTP {res.status_code} - {res.reason}' error_msg = error_msg.format(res=exc.response) except requests.exceptions.ConnectionError as exc: error_msg = 'Connection error' else: error_msg = '' status = res.status if error_msg: status = HTTPStatus.error counter[status] += 1 if verbose and error_msg: cc = to_do_map[future] ⓰ print('*** Error for {}: {}'.format(cc, error_msg)) return counter if __name__ == '__main__': main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ)
❶ 导入显示进度条的库。
❷ 从 flags2_common 模块中导入一个函数和一个 Enum。
❸ 重用 flags2_sequential 模块(见示例 17-12)里的 download_one 函数。
❹ 如果没有在命令行中指定 -m/--max_req 选项,使用这个值作为并发请求数的最大值,也就是线程池的大小;真实的数量可能会比这少,例如下载的国旗数量较少。
❺ 不管要下载多少国旗,也不管 -m/--max_req 命令行选项的值是多少,MAX_CONCUR_REQ 会限制最大的并发请求数;这是一项安全预防措施。
❻ 把 max_workers 设为 concur_req,创建 ThreadPoolExecutor 实例;main 函数会把下面这三个值中最小的那个赋值给 concur_req:MAX_CONCUR_REQ、cc_list 的长度、-m/--max_req 命令行选项的值。这样能避免创建超过所需的线程。
❼ 这个字典把各个 Future 实例(表示一次下载)映射到相应的国家代码上,在处理错误时使用。
❽ 按字母顺序迭代国家代码列表。结果的顺序主要由 HTTP 响应的时间长短决定,不过,如果线程池的大小(由 concur_req 设定)比 len(cc_list) 小得多,可能会发现有按字母顺序批量下载的情况。
❾ 每次调用 executor.submit 方法排定一个可调用对象的执行时间,然后返回一个 Future 实例。第一个参数是可调用的对象,其余的参数是传给可调用对象的参数。
❿ 把返回的 future 和国家代码存储在字典中。
⓫ futures.as_completed 函数返回一个迭代器,在期物运行结束后产出期物。
⓬ 如果不是详细模式,把 as_completed 函数返回的结果传给 tqdm 函数,显示进度条;因为 done_iter 没有 len 函数,所以我们必须通过 total= 参数告诉 tqdm 函数预期的元素数量,这样 tqdm 才能预计剩余的工作量。
⓭ 迭代运行结束后的期物。
⓮ 在期物上调用 result 方法,要么返回可调用对象的返回值,要么抛出可调用的对象在执行过程中捕获的异常。这个方法可能会阻塞,等待确定结果;不过,在这个示例中不会阻塞,因为 as_completed 函数只返回已经运行结束的期物。
⓯ 处理可能出现的异常;这个函数余下的代码与依序下载版 download_many 函数一样(见示例 17-13),不过下一点除外。
⓰ 为了给错误消息提供上下文,以当前的 future 为键,从 to_do_map 中获取国家代码。在依序下载版中无须这么做,因为那一版迭代的是国家代码,所以知道当前国家的代码;而这里迭代的是期物。
示例 17-14 用到了一个对 futures.as_completed 函数特别有用的惯用法:构建一个字典,把各个期物映射到其他数据(期物运行结束后可能有用)上。这里,在 to_do_map 中,我们把各个期物映射到对应的国家代码上。这样,尽管期物生成的结果顺序已经乱了,依然便于使用结果做后续处理。
Python 线程特别适合 I/O 密集型应用,concurrent.futures 模块大大简化了某些使用场景下 Python 线程的用法。我们对 concurrent.futures 模块基本用法的介绍到此结束。下面讨论不适合使用 ThreadPoolExecutor 或 ProcessPoolExecutor 类时,有哪些替代方案。
17.5.3 线程和多进程的替代方案
Python 自 0.9.8 版(1993 年)就支持线程了,concurrent.futures 只不过是使用线程的最新方式。Python 3 废弃了原来的 thread 模块,换成了高级的 threading 模块。11 如果 futures.ThreadPoolExecutor 类对某个作业来说不够灵活,可能要使用 threading 模块中的组件(如 Thread、Lock、Semaphore 等)自行制定方案,比如说使用 queue 模块创建线程安全的队列,在线程之间传递数据。futures.ThreadPoolExecutor 类已经封装了这些组件。
11threading 模块自 Python 1.5.1(1998 年)就已存在,不过有些人仍然继续使用旧的 thread 模块。Python 3 把 thread 模块重命名为 _thread,以此强调这是低层实现,不应该在应用代码中使用。
对 CPU 密集型工作来说,要启动多个进程,规避 GIL。创建多个进程最简单的方式是,使用 futures.ProcessPoolExecutor 类。不过和前面一样,如果使用场景较复杂,需要更高级的工具。multiprocessing 模块的 API 与 threading 模块相仿,不过作业交给多个进程处理。对简单的程序来说,可以用 multiprocessing 模块代替 threading 模块,少量改动即可。不过,multiprocessing 模块还能解决协作进程遇到的最大挑战:在进程之间传递数据。
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论