Python 协程库 Asyncio

发布于 2023-10-03 13:29:42 字数 4247 浏览 39 评论 0

Python 在 3.5 版本的时候就加入了 asyncio,但是用了一下貌似不太好用,在版本 3.7,asyncio 有了极大的改善,使用了一下还算不错,asyncio 引入了 async await 语法,有点类似于 nodejs。

最简单的例子:

import asyncio

async def main():
    print('Hello ...')
    await asyncio.sleep(1)
    print('... World!')

# Python 3.7+
asyncio.run(main())

比如 PHP 的 swoole 扩展,它也是基于协程的,但是使用起来就没有 python 那么方便了,swoole 没有提供 async await 语法。asyncio 提供了 create_task 来创建协程任务,它可以类比为 swoole 的 go 函数:

async def main():
    task1 = asyncio.create_task(
        say_after(1, 'hello'))

    task2 = asyncio.create_task(
        say_after(2, 'world'))

    print(f"started at {time.strftime('%X')}")

    await task1
    await task2

    print(f"finished at {time.strftime('%X')}")

asyncio 还提供了许多辅助函数,比如 asyncio.gather 用来同时运行多个协程任务,相当于之前使用 swoole 实现过的 GroupWait。相应的 swoole 的 channel 也有 asyncio 的对应版本 asyncio.Queue。而且还提供了多种 queue,比如 PriorityQueue 和 LifoQueue,因此 asyncio 也是完全适用于 CSP 编程模型。

使用 asyncio 创建一个 TCP 服务:

import asyncio

async def handle_echo(reader, writer):
    while True:
        # 判断 EOF 结束符关闭链接
        if reader.at_eof():
            break
        data = await reader.readline()
        message = data.decode()
        addr = writer.get_extra_info('peername')

        print(f"Received {message!r} from {addr!r}")

        print(f"Send: {message!r}")
        writer.write(data)
        await writer.drain()

    print("Close the connection")
    writer.close()


async def tcp_server_task():
    server = await asyncio.start_server(handle_echo, '127.0.0.1', 8888)

    addr = server.sockets[0].getsockname()
    print(f'Serving on {addr}')

    async with server:
        await server.serve_forever()


async def main():
    task = asyncio.create_task(tcp_server_task())
    await task

asyncio.run(main())

可以看到代码并不是很多,创建一个简单的 TCP 服务并不能显示 asyncio 的强大,现在我们假设需要建立一个集中式的 TCP 日志收集服务,将 TCP 端口收到的日志保存在 postgresql 或者 mysql 中,日志的请求量会比较大,所以需要有较好的性能,使用如下代码简单实现:

import asyncio
from psycopg2.pool import SimpleConnectionPool
from datetime import datetime


class AsyncTask:

    def __init__(self):
        self.task_queue = asyncio.Queue()
        self.db_pool = SimpleConnectionPool(
            2, 20,
            dbname='echoes',
            host='192.168.2.10',
            user='twn39',
            password='tangweinan'
        )

    async def handle_echo(self, reader, writer):
        while True:
            # 判断 EOF 结束符关闭链接
            if reader.at_eof():
                break
            data = await reader.readline()
            message = data.decode()
            self.task_queue.put_nowait(message.strip())
            writer.write("OK\r\n".encode())
            await writer.drain()

        print("Close the connection")
        writer.close()

    async def tcp_server_task(self):
        server = await asyncio.start_server(self.handle_echo, '127.0.0.1', 8888)

        addr = server.sockets[0].getsockname()
        print(f'Serving on {addr}')

        async with server:
            await server.serve_forever()

    async def consume_task(self, name: str):
        while True:
            data = await self.task_queue.get()
            conn = self.db_pool.getconn()
            print(conn)
            cur = conn.cursor()
            cur.execute("insert into logs (level, message, created_at) values (%s, %s, %s)", (
                200, 'log test', datetime.now()
            ))
            conn.commit()
            self.db_pool.putconn(conn)
            print(f'work: {name} consume data: {data}')
            self.task_queue.task_done()


if __name__ == '__main__':
    async def main():
        async_task = AsyncTask()
        task = asyncio.create_task(async_task.tcp_server_task())
        task1 = asyncio.create_task(async_task.consume_task('worker-1'))
        task2 = asyncio.create_task(async_task.consume_task('worker-2'))
        await asyncio.gather(task, task1, task2, return_exceptions=True)

    asyncio.run(main())

在程序中引入了 Queue,这样可以异步写入数据库,在请求高峰时提高稳定性,我们创建了三个主要的任务,一个是监听 TCP 端口,获取日志数据,将日志放入 queue 中,另外两个是消费 queue,当 queue 中有数据时写入数据库,没有时等待数据。

代码中有两个无限循环(其实是三个),在程序运行的时候,相当于是并行的,但是实际上它是单线程的,协程可以中断,当多个协程运行时,实际上是函数之间相互切换运行,但是切换的时间很短,所以体现出来是并行的。多线程编程它的线程切换是由操作系统来实现的,所以相互切换的成本比较高,而协程是由用户来决定什么时候切换的,也称为用户端线程。

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据

关于作者

夏花。依旧

暂无简介

文章
评论
28 人气
更多

推荐作者

櫻之舞

文章 0 评论 0

弥枳

文章 0 评论 0

m2429

文章 0 评论 0

野却迷人

文章 0 评论 0

我怀念的。

文章 0 评论 0

    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文