- 前言
- 目标读者
- 非目标读者
- 本书的结构
- 以实践为基础
- 硬件
- 杂谈:个人的一点看法
- Python 术语表
- Python 版本表
- 排版约定
- 使用代码示例
- 第一部分 序幕
- 第 1 章 Python 数据模型
- 第二部分 数据结构
- 第 2 章 序列构成的数组
- 第 3 章 字典和集合
- 第 4 章 文本和字节序列
- 第三部分 把函数视作对象
- 第 5 章 一等函数
- 第 6 章 使用一等函数实现设计模式
- 第 7 章 函数装饰器和闭包
- 第四部分 面向对象惯用法
- 第 8 章 对象引用、可变性和垃圾回收
- 第 9 章 符合 Python 风格的对象
- 第 10 章 序列的修改、散列和切片
- 第 11 章 接口:从协议到抽象基类
- 第 12 章 继承的优缺点
- 第 13 章 正确重载运算符
- 第五部分 控制流程
- 第 14 章 可迭代的对象、迭代器和生成器
- 14.1 Sentence 类第1版:单词序列
- 14.2 可迭代的对象与迭代器的对比
- 14.3 Sentence 类第2版:典型的迭代器
- 14.4 Sentence 类第3版:生成器函数
- 14.5 Sentence 类第4版:惰性实现
- 14.6 Sentence 类第5版:生成器表达式
- 14.7 何时使用生成器表达式
- 14.8 另一个示例:等差数列生成器
- 14.9 标准库中的生成器函数
- 14.10 Python 3.3 中新出现的句法:yield from
- 14.11 可迭代的归约函数
- 14.12 深入分析 iter 函数
- 14.13 案例分析:在数据库转换工具中使用生成器
- 14.14 把生成器当成协程
- 14.15 本章小结
- 14.16 延伸阅读
- 第 15 章 上下文管理器和 else 块
- 第 16 章 协程
- 第 17 章 使用期物处理并发
- 第 18 章 使用 asyncio 包处理并发
- 第六部分 元编程
- 第 19 章 动态属性和特性
- 第 20 章 属性描述符
- 第 21 章 类元编程
- 结语
- 延伸阅读
- 附录 A 辅助脚本
- Python 术语表
- 作者简介
- 关于封面
18.6 使用 asyncio 包编写服务器
演示 TCP 服务器时通常使用回显服务器。我们要构建更好玩一点的示例服务器,用于查找 Unicode 字符,分别使用简单的 TCP 协议和 HTTP 协议实现。这两个服务器的作用是,让客户端使用 4.8 节讨论过的 unicodedata 模块,通过规范名称查找 Unicode 字符。图 18-2 展示了在一个 Telnet 会话中访问 TCP 版字符查找服务器所做的两次查询,一次查询国际象棋棋子字符,一次查询名称中包含“sun”的字符。
图 18-2:在一个 Telnet 会话中访问 tcp_charfinder.py 服务器——查询“chess black”和“sun”
接下来讨论实现方式。
18.6.1 使用asyncio包编写TCP服务器
下面几个示例的大多数逻辑在 charfinder.py 模块中,这个模块没有任何并发。你可以在命令行中使用 charfinder.py 脚本查找字符,不过这个脚本更为重要的作用是为使用 asyncio 包编写的服务器提供支持。charfinder.py 脚本的代码在本书的代码仓库中。
charfinder 模块读取 Python 内建的 Unicode 数据库,为每个字符名称中的每个单词建立索引,然后倒排索引,存进一个字典。例如,在倒排索引中,'SUN' 键对应的条目是一个集合(set),里面是名称中包含 'SUN' 这个词的 10 个 Unicode 字符。9 倒排索引保存在本地一个名为 charfinder_index.pickle 的文件中。如果查询多个单词,charfinder 会计算从索引中所得集合的交集。
9在 Python 3.5 中,新增了 4 个名称中包含 'SUN' 的 Unicode 字符:U+1F323(WHITE SUN)、U+1F324(WHITE SUN WITH SMALL CLOUD)、U+1F325(WHITE SUN BEHIND CLOUD)和 U+1F326(WHITE SUN BEHIND CLOUD WITH RAIN)。——编者注
下面我们把注意力集中在响应图 18-2 中那两个查询的 tcp_charfinder.py 脚本上。我要对这个脚本中的代码做大量说明,因此把它分为两部分,分别在示例 18-14 和示例 18-15 中列出。
示例 18-14 tcp_charfinder.py:使用 asyncio.start_server 函数实现的简易 TCP 服务器;这个模块余下的代码在示例 18-15 中
import sys import asyncio from charfinder import UnicodeNameIndex ➊ CRLF = b'\r\n' PROMPT = b'?> ' index = UnicodeNameIndex() ➋ @asyncio.coroutine def handle_queries(reader, writer): ➌ while True: ➍ writer.write(PROMPT) # 不能使用yield from! ➎ yield from writer.drain() # 必须使用yield from! ➏ data = yield from reader.readline() ➐ try: query = data.decode().strip() except UnicodeDecodeError: ➑ query = '\x00' client = writer.get_extra_info('peername') ➒ print('Received from {}: {!r}'.format(client, query)) ➓ if query: if ord(query[:1]) < 32: ⓫ break lines = list(index.find_description_strs(query)) ⓬ if lines: writer.writelines(line.encode() + CRLF for line in lines) ⓭ writer.write(index.status(query, len(lines)).encode() + CRLF) ⓮ yield from writer.drain() ⓯ print('Sent {} results'.format(len(lines))) ⓰ print('Close the client socket') ⓱ writer.close() ⓲
❶ UnicodeNameIndex 类用于构建名称索引,提供查询方法。
❷ 实例化 UnicodeNameIndex 类时,它会使用 charfinder_index.pickle 文件(如果有的话),或者构建这个文件,因此第一次运行时可能要等几秒钟服务器才能启动。10
10Leonardo Rochael 指出,可以在示例 18-15 中的 main 函数里使用 loop.run_with_executor() 方法,在另一个线程中构建 Unicode 名称索引,这样索引构建好之后,服务器能立即开始接收请求。他说得对,不过这个应用的唯一用途是查询索引,因此那样做没有多大好处。不过,Leo 建议的做法是个不错的练习,有兴趣的话你可以去做。
❸ 这个协程要传给 asyncio.start_server 函数,接收的两个参数是 asyncio.StreamReader 对象和 asyncio.StreamWriter 对象。
❹ 这个循环处理会话,直到从客户端收到控制字符后退出。
❺ StreamWriter.write 方法不是协程,只是普通的函数;这一行代码发送 ?> 提示符。
❻ StreamWriter.drain 方法刷新 writer 缓冲;因为它是协程,所以必须使用 yield from 调用。
❼ StreamReader.readline 方法是协程,返回一个 bytes 对象。
❽ Telnet 客户端发送控制字符时,可能会抛出 UnicodeDecodeError 异常;遇到这种情况时,为了简单起见,假装发送的是空字符。
❾ 返回与套接字连接的远程地址。
❿ 在服务器的控制台中记录查询。
⓫ 如果收到控制字符或者空字符,退出循环。
⓬ 返回一个生成器,产出包含 Unicode 码位、真正的字符和字符名称的字符串(例如, U+0039\t9\tDIGIT NINE);为了简单起见,我从中构建了一个列表。
⓭ 使用默认的 UTF-8 编码把 lines 转换成 bytes 对象,并在每一行末尾添加回车符和换行符;注意,参数是一个生成器表达式。
⓮ 输出状态,例如 627 matches for 'digit'。11
11在 Python 3.5 中,是 755 matches for 'digit'。 ——编者注
⓯ 刷新输出缓冲。
⓰ 在服务器的控制台中记录响应。
⓱ 在服务器的控制台中记录会话结束。
⓲ 关闭 StreamWriter 流。
handle_queries 协程的名称是复数,因为它启动交互式会话后能处理各个客户端发来的多次请求。
注意,示例 18-14 中所有的 I/O 操作都使用 bytes 格式。因此,我们要解码从网络中收到的字符串,还要编码发出的字符串。Python 3 默认使用的编码是 UTF-8,这里就隐式使用了这个编码。
注意一点,有些 I/O 方法是协程,必须由 yield from 驱动,而另一些则是普通的函数。例如,StreamWriter.write 是普通的函数,我们假定它大多数时候都不会阻塞,因为它把数据写入缓冲;而刷新缓冲并真正执行 I/O 操作的 StreamWriter.drain 是协程,StreamReader.readline 也是协程。写作本书时,asyncio 包的 API 文档有重大的改进,明确标识出了哪些方法是协程。
示例 18-15 接续示例 18-14,列出这个模块的 main 函数。
示例 18-15 tcp_charfinder.py(接续示例 18-14):main 函数创建并销毁事件循环和套接字服务器
def main(address='127.0.0.1', port=2323): ➊ port = int(port) loop = asyncio.get_event_loop() server_coro = asyncio.start_server(handle_queries, address, port, loop=loop) ➋ server = loop.run_until_complete(server_coro) ➌ host = server.sockets[0].getsockname() ➍ print('Serving on {}. Hit CTRL-C to stop.'.format(host)) ➎ try: loop.run_forever() ➏ except KeyboardInterrupt: # 按CTRL-C键 pass print('Server shutting down.') server.close() ➐ loop.run_until_complete(server.wait_closed()) ➑ loop.close() ➒ if __name__ == '__main__': main(*sys.argv[1:]) ➓
❶ 调用 main 函数时可以不传入参数。
❷ asyncio.start_server 协程运行结束后,返回的协程对象返回一个 asyncio.Server 实例,即一个 TCP 套接字服务器。
❸ 驱动 server_coro 协程,启动服务器(server)。
❹ 获取这个服务器的第一个套接字的地址和端口,然后……
❺ ……在服务器的控制台中显示出来。这是这个脚本在服务器的控制台中显示的第一个输出。
❻ 运行事件循环;main 函数在这里阻塞,直到在服务器的控制台中按 CTRL-C 键才会关闭。
❼ 关闭服务器。
❽ server.wait_closed() 方法返回一个期物;调用 loop.run_until_complete 方法,运行期物。
❾ 终止事件循环。
❿ 这是处理可选的命令行参数的简便方式:展开 sys.argv[1:],传给 main 函数,未指定的参数使用相应的默认值。
注意,run_until_complete 方法的参数是一个协程(start_server 方法返回的结果)或一个 Future 对象(server.wait_closed 方法返回的结果)。如果传给 run_until_complete 方法的参数是协程,会把协程包装在 Task 对象中。
仔细查看 tcp_charfinder.py 脚本在服务器控制台中生成的输出(如示例 18-16),更易于理解脚本中控制权的流动。
示例 18-16 tcp_charfinder.py:这是图 18-2 所示会话在服务器端的输出
$ python3 tcp_charfinder.py Serving on ('127.0.0.1', 2323). Hit CTRL-C to stop. ➊ Received from ('127.0.0.1', 62910): 'chess black' ➋ Sent 6 results Received from ('127.0.0.1', 62910): 'sun' ➌ Sent 10 results Received from ('127.0.0.1', 62910): '\x00' ➍ Close the client socket ➎
❶ 这是 main 函数的输出。
❷ handle_queries 协程中那个 while 循环第一次迭代的输出。
❸ 那个 while 循环第二次迭代的输出。12
12在 Python 3.5 中是 Sent 14 results。参见本小节开头的编者注。——编者注
❹ 用户按下 CTRL-C 键;服务器收到控制字符,关闭会话。
❺ 客户端套接字关闭了,但是服务器仍在运行,准备为其他客户端提供服务。
注意,main 函数几乎会立即显示 Serving on... 消息,然后在调用 loop.run_forever() 方法时阻塞。在那一点,控制权流动到事件循环中,而且一直待在那里,不过偶尔会回到 handle_queries 协程,这个协程需要等待网络发送或接收数据时,控制权又交还事件循环。在事件循环运行期间,只要有新客户端连接服务器就会启动一个 handle_queries 协程实例。因此,这个简单的服务器可以并发处理多个客户端。出现 KeyboardInterrupt 异常,或者操作系统把进程杀死,服务器会关闭。
tcp_charfinder.py 脚本利用 asyncio 包提供的高层流 API,有现成的服务器可用,所以我们只需实现一个处理程序(普通的回调或协程)。此外,asyncio 包受 Twisted 框架中抽象的传送和协议启发,还提供了低层传送和协议 API。详情请参见 asyncio 包的文档,里面有一个使用低层 API 实现的 TCP 回显服务器。
下一节实现 HTTP 版字符查找服务器。
18.6.2 使用aiohttp包编写Web服务器
asyncio 版国旗下载示例使用的 aiohttp 库也支持服务器端 HTTP,我就使用这个库实现了 http_charfinder.py 脚本。图 18-3 是这个简易服务器的 Web 界面,显示搜索“cat face”表情符号得到的结果。
图 18-3:浏览器窗口中显示在 http_charfinder.py 服务器中搜索“cat face”得到的结果
有些浏览器显示 Unicode 字符的效果比其他浏览器好。图 18-3 中的截图在 OS X 版 Firefox 浏览器中截取,我在 Safari 中也得到了相同的结果。但是,运行在同一台设备中的最新版 Chrome 和 Opera 却不能显示猫脸等表情符号。不过其他搜索结果(例如“chess”)正常,因此这可能是 OS X 版 Chrome 和 Opera 的字体问题。
我们先分析 http_charfinder.py 脚本中最重要的后半部分:启动和关闭事件循环与 HTTP 服务器。参见示例 18-17。
示例 18-17 http_charfinder.py:main 和 init 函数
@asyncio.coroutine def init(loop, address, port): ➊ app = web.Application(loop=loop) ➋ app.router.add_route('GET', '/', home) ➌ handler = app.make_handler() ➍ server = yield from loop.create_server(handler, address, port) ➎ return server.sockets[0].getsockname() ➏ def main(address="127.0.0.1", port=8888): port = int(port) loop = asyncio.get_event_loop() host = loop.run_until_complete(init(loop, address, port)) ➐ print('Serving on {}. Hit CTRL-C to stop.'.format(host)) try: loop.run_forever() ➑ except KeyboardInterrupt: # 按CTRL-C键 pass print('Server shutting down.') loop.close() ➒ if __name__ == '__main__': main(*sys.argv[1:])
❶ init 协程产出一个服务器,交给事件循环驱动。
❷ aiohttp.web.Application 类表示 Web 应用……
❸ ……通过路由把 URL 模式映射到处理函数上;这里,把 GET / 路由映射到 home 函数上(参见示例 18-18)。
❹ app.make_handler 方法返回一个 aiohttp.web.RequestHandler 实例,根据 app 对象设置的路由处理 HTTP 请求。
❺ create_server 方法创建服务器,以 handler 为协议处理程序,并把服务器绑定在指定的地址(address)和端口(port)上。
❻ 返回第一个服务器套接字的地址和端口。
❼ 运行 init 函数,启动服务器,获取服务器的地址和端口。
❽ 运行事件循环;控制权在事件循环手上时,main 函数会在这里阻塞。
❾ 关闭事件循环。
我们已经熟悉了 asyncio 包的 API,现在可以对比一下示例 18-17 与前面的 TCP 示例(见示例 18-15),看它们创建服务器的方式有何不同。
在前面的 TCP 示例中,服务器通过 main 函数中的下面两行代码创建并排定运行时间:
server_coro = asyncio.start_server(handle_queries, address, port, loop=loop) server = loop.run_until_complete(server_coro)
在这个 HTTP 示例中,init 函数通过下述方式创建服务器:
server = yield from loop.create_server(handler, address, port)
但是 init 是协程,驱动它运行的是 main 函数中的这一行:
host = loop.run_until_complete(init(loop, address, port))
asyncio.start_server 函数和 loop.create_server 方法都是协程,返回的结果都是 asyncio.Server 对象。为了启动服务器并返回服务器的引用,这两个协程都要由他人驱动,完成运行。在 TCP 示例中,做法是调用 loop.run_until_complete(server_coro),其中 server_coro 是 asyncio.start_server 函数返回的结果。在 HTTP 示例中,create_server 方法在 init 协程中的一个 yield from 表达式里调用,而 init 协程则由 main 函数中的 loop.run_until_complete(init(...)) 调用驱动。
我提到这一点是为了强调之前讨论过的一个基本事实:只有驱动协程,协程才能做事,而驱动 asyncio.coroutine 装饰的协程有两种方法,要么使用 yield from,要么传给 asyncio 包中某个参数为协程或期物的函数,例如 run_until_complete。
示例 18-18 列出 home 函数。根据这个 HTTP 服务器的配置,home 函数用于处理 /(根)URL。
示例 18-18 http_charfinder.py:home 函数
def home(request): ➊ query = request.GET.get('query', '').strip() ➋ print('Query: {!r}'.format(query)) ➌ if query: ➍ descriptions = list(index.find_descriptions(query)) res = '\n'.join(ROW_TPL.format(**vars(descr)) for descr in descriptions) msg = index.status(query, len(descriptions)) else: descriptions = [] res = '' msg = 'Enter words describing characters.' html = template.format(query=query, result=res, ➎ message=msg) print('Sending {} results'.format(len(descriptions))) ➏ return web.Response(content_type=CONTENT_TYPE, text=html) ➐
❶ 一个路由处理函数,参数是一个 aiohttp.web.Request 实例。
❷ 获取查询字符串,去掉首尾的空白。
❸ 在服务器的控制台中记录查询。
❹ 如果有查询字符串,从索引(index)中找到结果,使用 HTML 表格中的行渲染结果,把结果赋值给 res 变量,再把状态消息赋值给 msg 变量。
❺ 渲染 HTML 页面。
❻ 在服务器的控制台中记录响应。
❼ 构建 Response 对象,将其返回。
注意,home 不是协程,既然定义体中没有 yield from 表达式,也没必要是协程。在 aiohttp 包的文档中,add_route 方法的条目下面说道,“如果处理程序是普通的函数,在内部会将其转换成协程”。
示例 18-18 中的 home 函数虽然简单,却有一个缺点。home 是普通的函数,而不是协程,这一事实预示着一个更大的问题:我们需要重新思考如何实现 Web 应用,以获得高并发。下面来分析这个问题。
18.6.3 更好地支持并发的智能客户端
示例 18-18 中的 home 函数很像是 Django 或 Flask 中的视图函数,实现方式完全没有考虑异步:获取请求,从数据库中读取数据,然后构建响应,渲染完整的 HTML 页面。在这个示例中,存储在内存中的 UnicodeNameIndex 对象是“数据库”。但是,对真正的数据库来说,应该异步访问,否则在等待数据库查询结果的过程中,事件循环会阻塞。例如,aiopg 包提供了一个异步 PostgreSQL 驱动,与 asyncio 包兼容;这个包支持使用 yield from 发送查询和获取结果,因此视图函数的表现与真正的协程一样。
除了防止阻塞调用之外,高并发的系统还必须把复杂的工作分成多步,以保持敏捷。http_charfinder.py 服务器表明了这一点:如果搜索“cjk”,得到的结果是 75 821 个中文、日文和韩文象形文字。13 此时,home 函数会返回一个 5.3MB 的 HTML 文档,显示一个有 75 821 行的表格。
13这正是 CJK 表示的意思:不断增加的中文、日文和韩文字符。以后的 Python 版本支持的 CJK 象形文字数量可能会比 Python 3.4 多。
我在自己的设备中使用命令行 HTTP 客户端 curl 访问架设在本地的 http_charfinder.py 服务器,查询“cjk”,2 秒钟后获得响应。浏览器要布局包含这么大一个表格的页面,用的时间会更长。当然,大多数查询返回的响应要小得多:查询“braille”返回 256 行结果,页面大小为 19KB,在我的设备中用时 0.017 秒。可是,如果服务器要用 2 秒钟处理“cjk”查询,那么其他所有客户端都至少要等 2 秒——这是不可接受的。
避免响应时间太长的方法是实现分页:首次至多返回(比如说)200 行,用户点击链接或滚动页面时再获取更多结果。如果查看本书代码仓库中的 charfinder.py 模块,你会发现 UnicodeNameIndex.find_descriptions 方法有两个可选的参数——start 和 stop,这是偏移值,用于支持分页。因此,我们可以返回前 200 个结果,当用户想查看更多结果时,再使用 AJAX 或 WebSockets 发送下一批结果。
实现分批发送结果所需的大多数代码都在浏览器这一端,因此 Google 和所有大型互联网公司都大量依赖客户端代码构建服务:智能的异步客户端能更好地使用服务器资源。
虽然智能的客户端甚至对老式 Django 应用也有帮助,但是要想真正为这种客户端服务,我们需要全方位支持异步编程的框架,从处理 HTTP 请求和响应到访问数据库,全都支持异步。如果想实现实时服务,例如游戏和以 WebSockets 支持的媒体流,那就尤其应该这么做。14
14在“杂谈”中我会进一步说明这个趋势。
这里留一个练习给读者:改进 http_charfinder.py 脚本,添加下载进度条。此外还有一个附加题:实现 Twitter 那样的“无限滚动”。做完这个练习后,我们对如何使用 asyncio 包做异步编程的讨论就结束了。
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论