Python 协程库 Asyncio
Python 在 3.5 版本的时候就加入了 asyncio,但是用了一下貌似不太好用,在版本 3.7,asyncio 有了极大的改善,使用了一下还算不错,asyncio 引入了 async await 语法,有点类似于 nodejs。
最简单的例子:
import asyncio async def main(): print('Hello ...') await asyncio.sleep(1) print('... World!') # Python 3.7+ asyncio.run(main())
比如 PHP 的 swoole 扩展,它也是基于协程的,但是使用起来就没有 python 那么方便了,swoole 没有提供 async await 语法。asyncio 提供了 create_task 来创建协程任务,它可以类比为 swoole 的 go 函数:
async def main(): task1 = asyncio.create_task( say_after(1, 'hello')) task2 = asyncio.create_task( say_after(2, 'world')) print(f"started at {time.strftime('%X')}") await task1 await task2 print(f"finished at {time.strftime('%X')}")
asyncio 还提供了许多辅助函数,比如 asyncio.gather 用来同时运行多个协程任务,相当于之前使用 swoole 实现过的 GroupWait。相应的 swoole 的 channel 也有 asyncio 的对应版本 asyncio.Queue。而且还提供了多种 queue,比如 PriorityQueue 和 LifoQueue,因此 asyncio 也是完全适用于 CSP 编程模型。
使用 asyncio 创建一个 TCP 服务:
import asyncio async def handle_echo(reader, writer): while True: # 判断 EOF 结束符关闭链接 if reader.at_eof(): break data = await reader.readline() message = data.decode() addr = writer.get_extra_info('peername') print(f"Received {message!r} from {addr!r}") print(f"Send: {message!r}") writer.write(data) await writer.drain() print("Close the connection") writer.close() async def tcp_server_task(): server = await asyncio.start_server(handle_echo, '127.0.0.1', 8888) addr = server.sockets[0].getsockname() print(f'Serving on {addr}') async with server: await server.serve_forever() async def main(): task = asyncio.create_task(tcp_server_task()) await task asyncio.run(main())
可以看到代码并不是很多,创建一个简单的 TCP 服务并不能显示 asyncio 的强大,现在我们假设需要建立一个集中式的 TCP 日志收集服务,将 TCP 端口收到的日志保存在 postgresql 或者 mysql 中,日志的请求量会比较大,所以需要有较好的性能,使用如下代码简单实现:
import asyncio from psycopg2.pool import SimpleConnectionPool from datetime import datetime class AsyncTask: def __init__(self): self.task_queue = asyncio.Queue() self.db_pool = SimpleConnectionPool( 2, 20, dbname='echoes', host='192.168.2.10', user='twn39', password='tangweinan' ) async def handle_echo(self, reader, writer): while True: # 判断 EOF 结束符关闭链接 if reader.at_eof(): break data = await reader.readline() message = data.decode() self.task_queue.put_nowait(message.strip()) writer.write("OK\r\n".encode()) await writer.drain() print("Close the connection") writer.close() async def tcp_server_task(self): server = await asyncio.start_server(self.handle_echo, '127.0.0.1', 8888) addr = server.sockets[0].getsockname() print(f'Serving on {addr}') async with server: await server.serve_forever() async def consume_task(self, name: str): while True: data = await self.task_queue.get() conn = self.db_pool.getconn() print(conn) cur = conn.cursor() cur.execute("insert into logs (level, message, created_at) values (%s, %s, %s)", ( 200, 'log test', datetime.now() )) conn.commit() self.db_pool.putconn(conn) print(f'work: {name} consume data: {data}') self.task_queue.task_done() if __name__ == '__main__': async def main(): async_task = AsyncTask() task = asyncio.create_task(async_task.tcp_server_task()) task1 = asyncio.create_task(async_task.consume_task('worker-1')) task2 = asyncio.create_task(async_task.consume_task('worker-2')) await asyncio.gather(task, task1, task2, return_exceptions=True) asyncio.run(main())
在程序中引入了 Queue,这样可以异步写入数据库,在请求高峰时提高稳定性,我们创建了三个主要的任务,一个是监听 TCP 端口,获取日志数据,将日志放入 queue 中,另外两个是消费 queue,当 queue 中有数据时写入数据库,没有时等待数据。
代码中有两个无限循环(其实是三个),在程序运行的时候,相当于是并行的,但是实际上它是单线程的,协程可以中断,当多个协程运行时,实际上是函数之间相互切换运行,但是切换的时间很短,所以体现出来是并行的。多线程编程它的线程切换是由操作系统来实现的,所以相互切换的成本比较高,而协程是由用户来决定什么时候切换的,也称为用户端线程。
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论