ThreadPoolExecutor与单独的Asyncio循环通信

发布于 2025-01-28 07:33:56 字数 1230 浏览 6 评论 0原文

我的任务是IO绑定在循环中运行的任务。这项任务可以做很多工作,并且经常会陷入困境(这是正确的词吗?)。我的计划是使用run_in_executor使用ProcessPoolExecutorthreadpoolexecutor单独运行它,并允许主循环执行此操作,并允许主循环执行此操作,我的计划是在单独的过程或线程中运行它它的工作。目前,用于任务之间的通信,我使用asyncio.priorityqueue() and asyncio.event()进行通信,并希望重新使用这些或具有相同接口的其他内容。

当前代码:

# Getter for events and queues so communication can happen
send, receive, send_event, receive_event = await process_obj.get_queues()

# Creates task based off the process object
future = asyncio.create_task(process_obj.main())

当前的过程代码:

async def main():

    while True:
        #does things that hogs loop

我想做什么:

# Getter for events and queues so communication can happen
send, receive, send_event, receive_event = await process_obj.get_queues()

# I assume I could use Thread or Process executors
pool = concurrent.futures.ThreadPoolExecutor()
result = await loop.run_in_executor(pool, process_obj.run())

新的过程代码:

def run():
    asyncio.create_task(main())

async def main():

    while True:
        #does things that hogs loop

我如何在最初可以在这个新线程和原始循环之间进行通信?

I have a task that is IO bound running in a loop. This task does a lot of work and is often times hogging the loop (Is that the right word for it?). My plan is to run it in a separate process or thread using run_in_executor with ProcessPoolExecutor or ThreadPoolExecutor to run it separately and allow the main loop to do its work. Currently for communication between tasks I use asyncio.PriorityQueue() and asyncio.Event() for communication and would like to reuse these, or something with the same interface, if possible.

Current code:

# Getter for events and queues so communication can happen
send, receive, send_event, receive_event = await process_obj.get_queues()

# Creates task based off the process object
future = asyncio.create_task(process_obj.main())

Current process code:

async def main():

    while True:
        #does things that hogs loop

What I want to do:

# Getter for events and queues so communication can happen
send, receive, send_event, receive_event = await process_obj.get_queues()

# I assume I could use Thread or Process executors
pool = concurrent.futures.ThreadPoolExecutor()
result = await loop.run_in_executor(pool, process_obj.run())

New process code:

def run():
    asyncio.create_task(main())

async def main():

    while True:
        #does things that hogs loop

How do I communicate between this new thread and the original loop like I could originally?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

梦里兽 2025-02-04 07:33:56

我没有什么可以复制您的代码。因此,请从YouTube下载器中考虑此代码为例,我希望它能帮助您了解如何从线程函数获得结果:

示例代码:

def on_download(self, is_mp3: bool, is_mp4: bool, url: str) -> None:
    if is_mp3 == False and is_mp4 == False:
        self.ids.info_lbl.text = 'Please select a type of file to download.'
    else:
        self.ids.info_lbl.text = 'Downloading...'
        
        self.is_mp3 = is_mp3
        self.is_mp4 = is_mp4
        self.url = url
        
        Clock.schedule_once(self.schedule_download, 2)
        Clock.schedule_interval(self.start_progress_bar, 0.1)
        
def schedule_download(self, dt: float) -> None:
    '''
    Callback method for the download.
    '''
    
    pool = ThreadPool(processes=1)
    _downloader = Downloader(self.d_path)
    self.async_result = pool.apply_async(_downloader.download,
                                         (self.is_mp3, self.is_mp4, self.url))
    Clock.schedule_interval(self.check_process, 0.1)
    
def check_process(self, dt: float) -> None:
    '''
    Check if download is complete.
    '''
    if self.async_result.ready():
        resp = self.async_result.get()

        if resp[0] == 'Error. Download failed.':
            self.ids.info_lbl.text = resp[0]
            # progress bar gray if error
            self.stop_progress_bar(value=0)
        else:
            # progress bar blue if success
            self.stop_progress_bar(value=100)
            self.ids.file_name.text = resp[0]
            self.ids.info_lbl.text = 'Finished downloading.'
            self.ids.url_input.text = ''
        
        Clock.unschedule(self.check_process)

我个人更喜欢yourprocessing.pool import import threadpool,现在看起来看起来很像就像您的代码“ hogs”一样,因为您正在等待结果。因此,很明显,直到有结果程序将等待(这可能很长)。如果您在我的示例代码中查看:

on_download将安排和事件安排下载,则将安排另一个事件检查过程。我无法判断您的应用是GUI应用程序还是终端,因为您的问题中几乎没有代码,但是您必须要做什么,在循环中,您必须安排检查过程的事件。
如果您查看我的检查过程如果self.async_result.ready():,只有在我的结果准备就绪时才会返回。
现在您正在等待结果,这里的一切都在后台发生,时不时地会检查结果(它不会像没有结果那样,主循环将继续执行它必须而不是等待它)。

因此,基本上,您必须在循环中安排一些事件(尤其是结果的事件),而不是排队等待一个事件。这是否有意义,我的示例代码有帮助吗?对不起,我真的很糟糕地解释我的脑海;)

-> mainloop
  -> new Thread if there is any
  -> check for result if there is any Threads
    -> if there is a result
      -> do something
  -> mainloop keeps running
  -> back to top

There is not much I could reproduce your code. So please consider this code from YouTube Downloader as example and I hope that will help you to understand how to get result from thread function:

example code:

def on_download(self, is_mp3: bool, is_mp4: bool, url: str) -> None:
    if is_mp3 == False and is_mp4 == False:
        self.ids.info_lbl.text = 'Please select a type of file to download.'
    else:
        self.ids.info_lbl.text = 'Downloading...'
        
        self.is_mp3 = is_mp3
        self.is_mp4 = is_mp4
        self.url = url
        
        Clock.schedule_once(self.schedule_download, 2)
        Clock.schedule_interval(self.start_progress_bar, 0.1)
        
def schedule_download(self, dt: float) -> None:
    '''
    Callback method for the download.
    '''
    
    pool = ThreadPool(processes=1)
    _downloader = Downloader(self.d_path)
    self.async_result = pool.apply_async(_downloader.download,
                                         (self.is_mp3, self.is_mp4, self.url))
    Clock.schedule_interval(self.check_process, 0.1)
    
def check_process(self, dt: float) -> None:
    '''
    Check if download is complete.
    '''
    if self.async_result.ready():
        resp = self.async_result.get()

        if resp[0] == 'Error. Download failed.':
            self.ids.info_lbl.text = resp[0]
            # progress bar gray if error
            self.stop_progress_bar(value=0)
        else:
            # progress bar blue if success
            self.stop_progress_bar(value=100)
            self.ids.file_name.text = resp[0]
            self.ids.info_lbl.text = 'Finished downloading.'
            self.ids.url_input.text = ''
        
        Clock.unschedule(self.check_process)

Personally I prefer from multiprocessing.pool import ThreadPool and now it looks like your code 'hogs up' because you are awaiting for result. So obviously until there is result program will wait (and that may be long). If you look in my example code:

on_download will schedule and event schedule download and this one will schedule another event check process. I can't tell if you app is GUI app or terminal as there is pretty much no code in your question but what you have to do, in your loop you have to schedule an event of check process.
If you look on my check process: if self.async_result.ready(): that will only return when my result is ready.
Now you are waiting for the result, here everything is happening in the background and every now and then the main loop will check for the result (it won't hog up as if there is no result the main loop will carry on doing what it have to rather than wait for it).

So basically you have to schedule some events (especially the one for the result) in your loop rather than going line by line and waiting for one. Does that make sense and does my example code is helpful? Sorry I am really bad at explaining what is in my head ;)

-> mainloop
  -> new Thread if there is any
  -> check for result if there is any Threads
    -> if there is a result
      -> do something
  -> mainloop keeps running
  -> back to top
战皆罪 2025-02-04 07:33:56

当您在main coroutine中执行时执行时,它不会使循环缠绕,但会阻止循环不接受休息任务来完成其作业。在基于事件的应用程序中运行一个过程并不是最佳解决方案,因为该过程在数据共享中不太友好。

不使用并行性也可以同时完成所有操作。您需要的只是在的末尾执行等待Asyncio.sleep(0),而true。它可以回到循环,并允许执行其余任务。因此,我们不从Coroutine退出。

在下面的示例中,我有一个侦听器,该在true 时使用,并处理由emitter添加到队列中的数据。

import asyncio
from queue import Empty
from queue import Queue
from random import choice

queue = Queue()


async def listener():
    while True:
        try:
            # data polling from the queue
            data = queue.get_nowait()
            print(data)  # {"type": "event", "data": {...}}
        except (Empty, Exception):
            pass
        finally:
            # the magic action
            await asyncio.sleep(0)


async def emitter():
    # add a data to the queue
    queue.put({"type": "event", "data": {...}})


async def main():
    # first create a task for listener
    running_loop = asyncio.get_running_loop()
    running_loop.create_task(listener())
    for _ in range(5):
        # create tasks for emitter with random intervals to
        # demonstrate that the listener is still running in
        # the loop and handling the data put into the queue
        running_loop.create_task(emitter())
        await asyncio.sleep(choice(range(2)))


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

When you execute the while True in your main coroutine, it doesn't hog the loop but blocks the loop not accepting the rest task to do their jobs. Running a process in your event-based application is not the best solution as the processes are not much friendly in data sharing.

It is possible to do all concurrently without using parallelism. All you need is to execute a await asyncio.sleep(0) at the end of while True. It yields back to the loop and allows the rest tasks to be executed. So we do not exit from the coroutine.

In the following example, I have a listener that uses while True and handles the data added by emitter to the queue.

import asyncio
from queue import Empty
from queue import Queue
from random import choice

queue = Queue()


async def listener():
    while True:
        try:
            # data polling from the queue
            data = queue.get_nowait()
            print(data)  # {"type": "event", "data": {...}}
        except (Empty, Exception):
            pass
        finally:
            # the magic action
            await asyncio.sleep(0)


async def emitter():
    # add a data to the queue
    queue.put({"type": "event", "data": {...}})


async def main():
    # first create a task for listener
    running_loop = asyncio.get_running_loop()
    running_loop.create_task(listener())
    for _ in range(5):
        # create tasks for emitter with random intervals to
        # demonstrate that the listener is still running in
        # the loop and handling the data put into the queue
        running_loop.create_task(emitter())
        await asyncio.sleep(choice(range(2)))


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文