ThreadPoolExecutor与单独的Asyncio循环通信
我的任务是IO绑定在循环中运行的任务。这项任务可以做很多工作,并且经常会陷入困境(这是正确的词吗?)。我的计划是使用run_in_executor
使用ProcessPoolExecutor
或threadpoolexecutor
单独运行它,并允许主循环执行此操作,并允许主循环执行此操作,我的计划是在单独的过程或线程中运行它它的工作。目前,用于任务之间的通信,我使用asyncio.priorityqueue()
and asyncio.event()
进行通信,并希望重新使用这些或具有相同接口的其他内容。
当前代码:
# Getter for events and queues so communication can happen
send, receive, send_event, receive_event = await process_obj.get_queues()
# Creates task based off the process object
future = asyncio.create_task(process_obj.main())
当前的过程代码:
async def main():
while True:
#does things that hogs loop
我想做什么:
# Getter for events and queues so communication can happen
send, receive, send_event, receive_event = await process_obj.get_queues()
# I assume I could use Thread or Process executors
pool = concurrent.futures.ThreadPoolExecutor()
result = await loop.run_in_executor(pool, process_obj.run())
新的过程代码:
def run():
asyncio.create_task(main())
async def main():
while True:
#does things that hogs loop
我如何在最初可以在这个新线程和原始循环之间进行通信?
I have a task that is IO bound running in a loop. This task does a lot of work and is often times hogging the loop (Is that the right word for it?). My plan is to run it in a separate process or thread using run_in_executor
with ProcessPoolExecutor
or ThreadPoolExecutor
to run it separately and allow the main loop to do its work. Currently for communication between tasks I use asyncio.PriorityQueue()
and asyncio.Event()
for communication and would like to reuse these, or something with the same interface, if possible.
Current code:
# Getter for events and queues so communication can happen
send, receive, send_event, receive_event = await process_obj.get_queues()
# Creates task based off the process object
future = asyncio.create_task(process_obj.main())
Current process code:
async def main():
while True:
#does things that hogs loop
What I want to do:
# Getter for events and queues so communication can happen
send, receive, send_event, receive_event = await process_obj.get_queues()
# I assume I could use Thread or Process executors
pool = concurrent.futures.ThreadPoolExecutor()
result = await loop.run_in_executor(pool, process_obj.run())
New process code:
def run():
asyncio.create_task(main())
async def main():
while True:
#does things that hogs loop
How do I communicate between this new thread and the original loop like I could originally?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
我没有什么可以复制您的代码。因此,请从YouTube下载器中考虑此代码为例,我希望它能帮助您了解如何从线程函数获得结果:
示例代码:
我个人更喜欢
yourprocessing.pool import import threadpool
,现在看起来看起来很像就像您的代码“ hogs”一样,因为您正在等待结果。因此,很明显,直到有结果程序将等待(这可能很长)。如果您在我的示例代码中查看:on_download
将安排和事件安排下载
,则将安排另一个事件检查过程
。我无法判断您的应用是GUI应用程序还是终端,因为您的问题中几乎没有代码,但是您必须要做什么,在循环中,您必须安排检查过程
的事件。如果您查看我的
检查过程
:如果self.async_result.ready():
,只有在我的结果准备就绪时才会返回。现在您正在等待结果,这里的一切都在后台发生,时不时地会检查结果(它不会像没有结果那样,主循环将继续执行它必须而不是等待它)。
因此,基本上,您必须在循环中安排一些事件(尤其是结果的事件),而不是排队等待一个事件。这是否有意义,我的示例代码有帮助吗?对不起,我真的很糟糕地解释我的脑海;)
There is not much I could reproduce your code. So please consider this code from YouTube Downloader as example and I hope that will help you to understand how to get result from thread function:
example code:
Personally I prefer
from multiprocessing.pool import ThreadPool
and now it looks like your code 'hogs up' because you are awaiting for result. So obviously until there is result program will wait (and that may be long). If you look in my example code:on_download
will schedule and eventschedule download
and this one will schedule another eventcheck process
. I can't tell if you app is GUI app or terminal as there is pretty much no code in your question but what you have to do, in your loop you have to schedule an event ofcheck process
.If you look on my
check process
:if self.async_result.ready():
that will only return when my result is ready.Now you are waiting for the result, here everything is happening in the background and every now and then the main loop will check for the result (it won't hog up as if there is no result the main loop will carry on doing what it have to rather than wait for it).
So basically you have to schedule some events (especially the one for the result) in your loop rather than going line by line and waiting for one. Does that make sense and does my example code is helpful? Sorry I am really bad at explaining what is in my head ;)
当您在
main
coroutine中执行时执行
时,它不会使循环缠绕,但会阻止循环不接受休息任务来完成其作业。在基于事件的应用程序中运行一个过程并不是最佳解决方案,因为该过程在数据共享中不太友好。
不使用并行性也可以同时完成所有操作。您需要的只是在
的末尾执行
。它可以回到循环,并允许执行其余任务。因此,我们不从Coroutine退出。等待Asyncio.sleep(0)
,而true在下面的示例中,我有一个
侦听器
,该在true 时使用,并处理由emitter
添加到队列中的数据。When you execute the
while True
in yourmain
coroutine, it doesn't hog the loop but blocks the loop not accepting the rest task to do their jobs. Running a process in your event-based application is not the best solution as the processes are not much friendly in data sharing.It is possible to do all concurrently without using parallelism. All you need is to execute a
await asyncio.sleep(0)
at the end ofwhile True
. It yields back to the loop and allows the rest tasks to be executed. So we do not exit from the coroutine.In the following example, I have a
listener
that useswhile True
and handles the data added byemitter
to the queue.