如何在 python ThreadPoolExecutor 中终止/取消/停止运行执行器 future? future.cancel() 返回 False

发布于 2025-01-14 09:05:48 字数 1342 浏览 11 评论 0原文

我想使用 python ThreadPoolExecutor (附加代码)同时调用两个 api。 如果这两个 api 调用中的任何一个有响应,我想停止调用另一个。因为对于我的用例,两个 api 之一将需要很长时间才能返回响应,我想避免调用。

def get_rest_api_response(url):
    return requets.get(url)
    
import requests, os
import concurrent.futures
from concurrent.futures import ThreadPoolExecutor, as_completed
with ThreadPoolExecutor(max_workers=4) as executor:
    f1 = executor.submit(get_rest_api_response, url="REST_API_URL_1")
    f2 = executor.submit(get_rest_api_response, url="REST_API_URL_2")
    
    no_future_is_done = True
    while(no_future_is_done):
        if f1.done():
            no_future_is_done = False
            print("f1 is done")
            output = f1.result()
            print(f2.cancel())  ######------> Failing!
        if f2.done():
            no_future_is_done = False
            print("f2 is done")
            output = f2.result()
            print(f1.cancel()) ######-------> Failing!
    print(output)

我正在使用 future.cancel() 但它失败并返回 False。 https://pd .codechef.com/docs/py/3.4.2/library/concurrent.futures.html#concurrent.futures.Future.cancel

还有其他方法可以实现吗 这?

I want to call two api's at same time using python ThreadPoolExecutor (code attached).
If either of these two api call responds, i want to stop calling the other. Because for my use case one of the two apis will take long time to return response which i want to avoid calling.

def get_rest_api_response(url):
    return requets.get(url)
    
import requests, os
import concurrent.futures
from concurrent.futures import ThreadPoolExecutor, as_completed
with ThreadPoolExecutor(max_workers=4) as executor:
    f1 = executor.submit(get_rest_api_response, url="REST_API_URL_1")
    f2 = executor.submit(get_rest_api_response, url="REST_API_URL_2")
    
    no_future_is_done = True
    while(no_future_is_done):
        if f1.done():
            no_future_is_done = False
            print("f1 is done")
            output = f1.result()
            print(f2.cancel())  ######------> Failing!
        if f2.done():
            no_future_is_done = False
            print("f2 is done")
            output = f2.result()
            print(f1.cancel()) ######-------> Failing!
    print(output)

I'm using future.cancel() but its failing and returning False.
https://pd.codechef.com/docs/py/3.4.2/library/concurrent.futures.html#concurrent.futures.Future.cancel

Is there any other way i can achieve this?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

夏至、离别 2025-01-21 09:05:48

尽管 ThreadPoolExecutor 不提供停止运行任务的工具,但我们可以更新目标任务函数以在设置线程安全标志时停止运行。这可以使用 threading.Event 来实现。

首先,它要求您首先创建一个 threading.Event 来控制正在运行的任务何时停止。

from threading import Event
...
# create an event to shut down all running tasks
event = Event()

然后可以将该事件作为参数传递给每个目标任务函数。

with ThreadPoolExecutor(max_workers=MAX_THREADS) as executor:
    # execute the task, passing the event
    future = executor.submit(work, event)

任务运行后,可以通过在Event上设置标志来从主线程停止它。

...
# stop running tasks via the event
event.set()

每个目标任务函数必须经常检查事件的状态,例如在任务内循环的每次迭代中。

如果设置了该事件,则任务可以停止运行,也许立即返回

...
# check the status of the flag
if event.is_set():
    return

如果您的目标任务函数打开了一些资源,则可能需要在返回之前进行清理。

这种停止运行任务的方法可能需要您更改任务的结构,以便拥有允许您检查标志值的循环结构。

例如,如果您的任务从文件或套接字读取数据,您可能需要更改在循环中的数据块中执行的读取操作,以便循环的每次迭代都可以检查标志的状态。

完整示例

# SuperFastPython.com
# example of stopping a running task in a thread pool
from time import sleep
from threading import Event
from concurrent.futures import ThreadPoolExecutor

# mock target task function
def work(event):
    # pretend read data for a long time
    for _ in range(10):
        # pretend to read some data
        sleep(1)
        # check if the task should stop
        if event.is_set():
            return

# create an event used to stop running tasks
event = Event()
# create a thread pool
with ThreadPoolExecutor() as executor:
    # execute one task
    future = executor.submit(work, event)
    # wait a moment
    print('The task is running...')
    sleep(2)
    # cancel the task, just in case it is not yet running
    future.cancel()
    # stop the running task using the flag
    print('Stopping the task...')
    event.set()
    # waiting for all tasks to complete
    print('Waiting...')
print('All done.')

运行该示例首先创建一个线程池并调度一个任务。

创建一个事件对象并将其传递给任务,在该任务中每次迭代都会检查它是否已设置,如果已设置则退出任务。

该任务开始正常执行两秒钟。首先,我们取消池中的所有任务,以防万一它尚未开始执行。

然后我们设置事件来触发正在运行的任务停止。该任务每秒检查事件的状态,并在事件设置后的下一次迭代中停止执行。

The task is running...
Stopping the task...
Waiting...
All done.

原始答案可以在此处找到!

Although the ThreadPoolExecutor does not provide a facility to stop running tasks, we can update our target task functions to stop running when a thread-safe flag is set. This can be implemented using threading.Event.

First, it requires that you first create a threading.Event to control when running tasks should stop.

from threading import Event
...
# create an event to shut down all running tasks
event = Event()

This event can then be passed to each target task function as an argument.

with ThreadPoolExecutor(max_workers=MAX_THREADS) as executor:
    # execute the task, passing the event
    future = executor.submit(work, event)

Once the task is running, it can be stopped from the main thread by setting the flag on the Event.

...
# stop running tasks via the event
event.set()

Each target task function must check the status of the event frequently, such as within each iteration of a loop within the task.

If the event is set, the task can then stop running, perhaps by returning immediately.

...
# check the status of the flag
if event.is_set():
    return

If your target task function has some resources open, it may need to clean up before returning.

This approach to stop running tasks may require that you change the structure of your task so that you have a loop structure allowing you to check the value of the flag.

For example, if your task reads data from a file or a socket, you may need to change the read operation to be performed in blocks of data in a loop so that each iteration of the loop you can check the status of the flag.

Full example

# SuperFastPython.com
# example of stopping a running task in a thread pool
from time import sleep
from threading import Event
from concurrent.futures import ThreadPoolExecutor

# mock target task function
def work(event):
    # pretend read data for a long time
    for _ in range(10):
        # pretend to read some data
        sleep(1)
        # check if the task should stop
        if event.is_set():
            return

# create an event used to stop running tasks
event = Event()
# create a thread pool
with ThreadPoolExecutor() as executor:
    # execute one task
    future = executor.submit(work, event)
    # wait a moment
    print('The task is running...')
    sleep(2)
    # cancel the task, just in case it is not yet running
    future.cancel()
    # stop the running task using the flag
    print('Stopping the task...')
    event.set()
    # waiting for all tasks to complete
    print('Waiting...')
print('All done.')

Running the example first creates a thread pool and schedules a task.

An event object is created and passed to the task where it is checked each iteration to see if it has been set and if so to bail out of the task.

The task starts executing as per normal for two seconds. First, we cancel all tasks in the pool, just in case it has not yet started executing.

We then set the event to trigger the running task to stop. The task checks the status of the event each second, and stops executing on the next iteration after the event has been set.

The task is running...
Stopping the task...
Waiting...
All done.

The original answer can be found here!

尾戒 2025-01-21 09:05:48

对于所提到的具体情况,如果您的 get_rest_api_response 函数可以永远阻塞,您应该首先为其设置超时,否则您必须终止线程。
与@Shahzod1011类似,但在类中管理任务和信号,如果每个任务都很复杂,则更适合。

import concurrent.futures
import time

class MyTask:
    def __init__(self, task_name):
        self.task_name = task_name
        self.if_run = True

    def run(self):
        while self.if_run:
            print(f"Task '{self.task_name}' is running. '{self.if_run}'")
            time.sleep(1)


class ThreadPoolManager:
    def __init__(self):
        self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=5)
        self.task = {}  # task_name - MyTask[task_name]
        self.task_execution_pool = {}    # task_name - future

    def add_task(self, task_name):
        self.task[task_name] = MyTask(task_name)
        future = self.executor.submit(self.task[task_name].run)
        self.task_execution_pool[task_name] = future

    def stop_task(self, task_name):
        if task_name in self.task_execution_pool:
            self.task[task_name].if_run = False
            self.task.pop(task_name)
            print(f"stopping task'{task_name}'")
            future = self.task_execution_pool.pop(task_name)
            future.cancel()


if __name__ == "__main__":
    thread_pool = ThreadPoolManager()

    # Add tasks
    thread_pool.add_task("Task1")
    thread_pool.add_task("Task2")

    time.sleep(5)
    print("end of sleep")
    # Stop tasks
    thread_pool.stop_task("Task1")
    thread_pool.stop_task("Task2")

    thread_pool.executor.shutdown(wait=True)

您可以在 ThreadPoolManager 中修改 MyTaskadd_task :一个 MyTask,两个 run 函数对于每个 API。

For the specific situation mentioned in question, if your get_rest_api_response function can block forever, you should set it a timeout first, or you have to kill the thread.
Similar to @Shahzod1011, but manage tasks and signals in class, more suitable if each task is complicated.

import concurrent.futures
import time

class MyTask:
    def __init__(self, task_name):
        self.task_name = task_name
        self.if_run = True

    def run(self):
        while self.if_run:
            print(f"Task '{self.task_name}' is running. '{self.if_run}'")
            time.sleep(1)


class ThreadPoolManager:
    def __init__(self):
        self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=5)
        self.task = {}  # task_name - MyTask[task_name]
        self.task_execution_pool = {}    # task_name - future

    def add_task(self, task_name):
        self.task[task_name] = MyTask(task_name)
        future = self.executor.submit(self.task[task_name].run)
        self.task_execution_pool[task_name] = future

    def stop_task(self, task_name):
        if task_name in self.task_execution_pool:
            self.task[task_name].if_run = False
            self.task.pop(task_name)
            print(f"stopping task'{task_name}'")
            future = self.task_execution_pool.pop(task_name)
            future.cancel()


if __name__ == "__main__":
    thread_pool = ThreadPoolManager()

    # Add tasks
    thread_pool.add_task("Task1")
    thread_pool.add_task("Task2")

    time.sleep(5)
    print("end of sleep")
    # Stop tasks
    thread_pool.stop_task("Task1")
    thread_pool.stop_task("Task2")

    thread_pool.executor.shutdown(wait=True)

You can modify MyTask and add_task in ThreadPoolManager : one MyTask, two run function for each api.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文