Python3 asyncio 在线程中使用

发布于 2022-09-07 12:33:02 字数 1291 浏览 33 评论 0

下面代码中模拟了使用 2 个线程管理 event loop 的情况:

import time
import asyncio
import threading

async def task(c, i):
    for _ in range(i):
        print(c)
        await asyncio.sleep(1)
    return i

def thread(loop):  # 异步程序
    asyncio.set_event_loop(loop)
    asyncio.ensure_future(task('sub thread', 999))
    loop.run_forever()

def main():
    threading.Thread(target=thread, args=(asyncio.get_event_loop(), )).start()

    # 同步代码开始
    future = asyncio.ensure_future(task('main thread', 5))
    while not future.done():
        time.sleep(1)
    print('main done: %s' % future.result())


if __name__ == '__main__':
    main()

其中子线程(thread 函数)设定并启用了一个 999 次的 task 任务。

而在主线程中,添加了一个 5 次的 task 任务。我设置了一个 While 循环来检查这项任务是否已经运行完毕,如若完毕则打印出 main done: 5


提出的问题:

  1. 像这样的多线程 event loop 是否有其他方案可以实现?
  2. 在 main 函数的 5 个 task 的任务,最初尝试使用 run_until_complete 来等待执行结束,但是与 run_forever 冲突导致抛出 RuntimeError: This event loop is already running 的错误,那么除了使用 While 循环外,还有其他方法阻塞后面代码执行吗?

实际情况描述:

我有一个使用了 同步+异步 的程序,其中同步程序是运行在主线程上的,异步是运行在一个子线程中的。
同步与异步独立运行,各司其职。但是有时候需要在同步中使用异步的函数或方法并取得结果。

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

笑红尘 2022-09-14 12:33:02

你不需要循环调用 future.done(),用 future.result() 便可。

我建议把 eventloop 放在主线程,其它工作视类型可以放入

  1. 同(主)线程
    非阻塞(非CPU运算型)动作,例如 asyncio.sleep
  2. 从线程(池)
    阻塞(非CPU运算型)动作,例如 time.sleep
  3. 单独进程
    CPU运算型动作,例如计算质数

参考

https://docs.python.org/3/lib...
https://wiki.python.org/moin/...

例子

# -*- coding: utf-8 -*-
import asyncio
from datetime import datetime


async def add(a, b):
    await asyncio.sleep(1)
    return a + b


async def master_thread(loop):
    print("{} master: 1+2={}".format(datetime.now(), await add(1, 2)))


def slave_thread(loop):
    # 注意:这不是 coroutine 函数
    import time
    time.sleep(2)

    f = asyncio.run_coroutine_threadsafe(add(1, 2), loop)
    print("{} slave: 1+2={}".format(datetime.now(), f.result()))


async def main(loop):
    await asyncio.gather(
        master_thread(loop),
        # 线程池内执行
        loop.run_in_executor(None, slave_thread, loop),
    )


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main(loop))
    loop.close()
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文