如何在 python ThreadPoolExecutor 中终止/取消/停止运行执行器 future? future.cancel() 返回 False
我想使用 python ThreadPoolExecutor (附加代码)同时调用两个 api。 如果这两个 api 调用中的任何一个有响应,我想停止调用另一个。因为对于我的用例,两个 api 之一将需要很长时间才能返回响应,我想避免调用。
def get_rest_api_response(url):
return requets.get(url)
import requests, os
import concurrent.futures
from concurrent.futures import ThreadPoolExecutor, as_completed
with ThreadPoolExecutor(max_workers=4) as executor:
f1 = executor.submit(get_rest_api_response, url="REST_API_URL_1")
f2 = executor.submit(get_rest_api_response, url="REST_API_URL_2")
no_future_is_done = True
while(no_future_is_done):
if f1.done():
no_future_is_done = False
print("f1 is done")
output = f1.result()
print(f2.cancel()) ######------> Failing!
if f2.done():
no_future_is_done = False
print("f2 is done")
output = f2.result()
print(f1.cancel()) ######-------> Failing!
print(output)
我正在使用 future.cancel() 但它失败并返回 False。 https://pd .codechef.com/docs/py/3.4.2/library/concurrent.futures.html#concurrent.futures.Future.cancel
还有其他方法可以实现吗 这?
I want to call two api's at same time using python ThreadPoolExecutor (code attached).
If either of these two api call responds, i want to stop calling the other. Because for my use case one of the two apis will take long time to return response which i want to avoid calling.
def get_rest_api_response(url):
return requets.get(url)
import requests, os
import concurrent.futures
from concurrent.futures import ThreadPoolExecutor, as_completed
with ThreadPoolExecutor(max_workers=4) as executor:
f1 = executor.submit(get_rest_api_response, url="REST_API_URL_1")
f2 = executor.submit(get_rest_api_response, url="REST_API_URL_2")
no_future_is_done = True
while(no_future_is_done):
if f1.done():
no_future_is_done = False
print("f1 is done")
output = f1.result()
print(f2.cancel()) ######------> Failing!
if f2.done():
no_future_is_done = False
print("f2 is done")
output = f2.result()
print(f1.cancel()) ######-------> Failing!
print(output)
I'm using future.cancel() but its failing and returning False.
https://pd.codechef.com/docs/py/3.4.2/library/concurrent.futures.html#concurrent.futures.Future.cancel
Is there any other way i can achieve this?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
尽管 ThreadPoolExecutor 不提供停止运行任务的工具,但我们可以更新目标任务函数以在设置线程安全标志时停止运行。这可以使用 threading.Event 来实现。
首先,它要求您首先创建一个
threading.Event
来控制正在运行的任务何时停止。然后可以将该事件作为参数传递给每个目标任务函数。
任务运行后,可以通过在
Event
上设置标志来从主线程停止它。每个目标任务函数必须经常检查事件的状态,例如在任务内循环的每次迭代中。
如果设置了该事件,则任务可以停止运行,也许立即返回。
如果您的目标任务函数打开了一些资源,则可能需要在返回之前进行清理。
这种停止运行任务的方法可能需要您更改任务的结构,以便拥有允许您检查标志值的循环结构。
例如,如果您的任务从文件或套接字读取数据,您可能需要更改在循环中的数据块中执行的读取操作,以便循环的每次迭代都可以检查标志的状态。
完整示例
运行该示例首先创建一个线程池并调度一个任务。
创建一个事件对象并将其传递给任务,在该任务中每次迭代都会检查它是否已设置,如果已设置则退出任务。
该任务开始正常执行两秒钟。首先,我们取消池中的所有任务,以防万一它尚未开始执行。
然后我们设置事件来触发正在运行的任务停止。该任务每秒检查事件的状态,并在事件设置后的下一次迭代中停止执行。
原始答案可以在此处找到!
Although the
ThreadPoolExecutor
does not provide a facility to stop running tasks, we can update our target task functions to stop running when a thread-safe flag is set. This can be implemented using threading.Event.First, it requires that you first create a
threading.Event
to control when running tasks should stop.This event can then be passed to each target task function as an argument.
Once the task is running, it can be stopped from the main thread by setting the flag on the
Event
.Each target task function must check the status of the event frequently, such as within each iteration of a loop within the task.
If the event is set, the task can then stop running, perhaps by returning immediately.
If your target task function has some resources open, it may need to clean up before returning.
This approach to stop running tasks may require that you change the structure of your task so that you have a loop structure allowing you to check the value of the flag.
For example, if your task reads data from a file or a socket, you may need to change the read operation to be performed in blocks of data in a loop so that each iteration of the loop you can check the status of the flag.
Full example
Running the example first creates a thread pool and schedules a task.
An event object is created and passed to the task where it is checked each iteration to see if it has been set and if so to bail out of the task.
The task starts executing as per normal for two seconds. First, we cancel all tasks in the pool, just in case it has not yet started executing.
We then set the event to trigger the running task to stop. The task checks the status of the event each second, and stops executing on the next iteration after the event has been set.
The original answer can be found here!
对于所提到的具体情况,如果您的 get_rest_api_response 函数可以永远阻塞,您应该首先为其设置超时,否则您必须终止线程。
与@Shahzod1011类似,但在类中管理任务和信号,如果每个任务都很复杂,则更适合。
您可以在 ThreadPoolManager 中修改
MyTask
和add_task
:一个MyTask
,两个run
函数对于每个 API。For the specific situation mentioned in question, if your
get_rest_api_response
function can block forever, you should set it a timeout first, or you have to kill the thread.Similar to @Shahzod1011, but manage tasks and signals in class, more suitable if each task is complicated.
You can modify
MyTask
andadd_task
inThreadPoolManager
: oneMyTask
, tworun
function for each api.