如何使用 CompletionService 取消耗时过长的任务
我使用包裹在 2 线程 FixThreadPool ExecutorService 周围的 CompletionService 提交一些 Future 任务,然后设置一个等于提交的任务数量的循环,并使用completionservice.take() 等待它们全部完成或失败。问题是偶尔它永远不会完成(但我不知道为什么),所以我将 take() 方法更改为 poll(300,Timeout.SECONDS),其想法是如果一项任务需要超过 5 分钟才能完成poll 将失败,然后最终将退出循环,我可以遍历所有 future 并调用 future.cancel(true) 来强制取消有问题的任务。
但是当我运行代码并且它挂起时,我看到轮询每 5 分钟连续失败一次,并且不再运行任何任务,因此我假设这两个工作人员以某种方式陷入僵局并且永远不会完成,并且永远不允许启动其他任务。由于超时时间为 5 分钟,并且仍有 1000 个任务要运行,因此打破循环所需的时间太长,因此取消了作业。
所以我想做的是,如果 5 分钟内没有完成,则中断/强制取消当前任务,但我看不到任何方法可以做到这一点。
此代码示例显示了我所说的
import com.jthink.jaikoz.exception.JaikozException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.*;
public class CompletionServiceTest
{
public static void main(final String[] args)
{
CompletionService<Boolean> cs = new ExecutorCompletionService<Boolean>(Executors.newFixedThreadPool(2));
Collection<Worker> tasks = new ArrayList<Worker>(10);
tasks.add(new Worker(1));
tasks.add(new Worker(2));
tasks.add(new Worker(3));
tasks.add(new Worker(4));
tasks.add(new Worker(5));
tasks.add(new Worker(6));
List<Future<Boolean>> futures = new ArrayList<Future<Boolean>>(tasks.size());
try
{
for (Callable task : tasks)
{
futures.add(cs.submit(task));
}
for (int t = 0; t < futures.size(); t++)
{
Future<Boolean> result = cs.poll(10, TimeUnit.SECONDS);
if(result==null)
{
System.out.println("Worker TimedOut:");
continue;
}
else
{
try
{
if(result.isDone() && result.get())
{
System.out.println("Worker Completed:");
}
else
{
System.out.println("Worker Failed");
}
}
catch (ExecutionException ee)
{
ee.printStackTrace();
}
}
}
}
catch (InterruptedException ie)
{
}
finally
{
//Cancel by interrupting any existing tasks currently running in Executor Service
for (Future<Boolean> f : futures)
{
f.cancel(true);
}
}
System.out.println("Done");
}
}
class Worker implements Callable<Boolean>
{
private int number;
public Worker(int number)
{
this.number=number;
}
public Boolean call()
{
if(number==3)
{
try
{
Thread.sleep(50000);
}
catch(InterruptedException tie)
{
}
}
return true;
}
}
输出的简化版本
Worker Completed:
Worker Completed:
Worker Completed:
Worker Completed:
Worker Completed:
Worker TimedOut:
Done
I submit some some Future tasks using a CompletionService wrapped round a 2 thread FixedThreadPool ExecutorService, I set then setup a loop equal to the number of tasks submitted and use completionservice.take() waiting for them all to complete or fail. Trouble is very occasionally it never finishes (but I don't know why) so I changed the take() method to a poll(300,Timeout.SECONDS), the idea being if one task takes a longer than 5 minutes to complete that poll will fail and then eventually will get out of the loop and I can go through all the futures and call future.cancel(true) to force cancellation of the offending task.
But when I run the code and it hangs, I see the poll fails continually once every 5 minutes and no more tasks run so I assume that the two workers are deadlocked in some way and never finish, and never allow additional tasks to start. Because the timeout is 5 minutes and there were still 1000 tasks to run the time taken to break out the loop was too long so cancelled the job.
So what I want to do is interupt/force cancellation the current task if hasnt completed in 5 minutes but I cant see any way to do it.
This code sample shows a simplified version of what Im talking about
import com.jthink.jaikoz.exception.JaikozException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.*;
public class CompletionServiceTest
{
public static void main(final String[] args)
{
CompletionService<Boolean> cs = new ExecutorCompletionService<Boolean>(Executors.newFixedThreadPool(2));
Collection<Worker> tasks = new ArrayList<Worker>(10);
tasks.add(new Worker(1));
tasks.add(new Worker(2));
tasks.add(new Worker(3));
tasks.add(new Worker(4));
tasks.add(new Worker(5));
tasks.add(new Worker(6));
List<Future<Boolean>> futures = new ArrayList<Future<Boolean>>(tasks.size());
try
{
for (Callable task : tasks)
{
futures.add(cs.submit(task));
}
for (int t = 0; t < futures.size(); t++)
{
Future<Boolean> result = cs.poll(10, TimeUnit.SECONDS);
if(result==null)
{
System.out.println("Worker TimedOut:");
continue;
}
else
{
try
{
if(result.isDone() && result.get())
{
System.out.println("Worker Completed:");
}
else
{
System.out.println("Worker Failed");
}
}
catch (ExecutionException ee)
{
ee.printStackTrace();
}
}
}
}
catch (InterruptedException ie)
{
}
finally
{
//Cancel by interrupting any existing tasks currently running in Executor Service
for (Future<Boolean> f : futures)
{
f.cancel(true);
}
}
System.out.println("Done");
}
}
class Worker implements Callable<Boolean>
{
private int number;
public Worker(int number)
{
this.number=number;
}
public Boolean call()
{
if(number==3)
{
try
{
Thread.sleep(50000);
}
catch(InterruptedException tie)
{
}
}
return true;
}
}
Output
Worker Completed:
Worker Completed:
Worker Completed:
Worker Completed:
Worker Completed:
Worker TimedOut:
Done
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(3)
我想我已经解决了这个问题,基本上,如果发生超时,我会迭代未来对象的列表,找到第一个尚未完成的对象,并强制取消。看起来不太优雅,但似乎很有效。
我更改了池的大小,只是为了显示更好地演示解决方案的输出,但也适用于 2 线程池。
输出是
I think Ive solved it, basically if a timeout occurs I then iterate through my list of future objects and find the first one that has not completed, and force cancellation. Doesn't seem that elegant but seems to work.
Ive changed size of pool just to show output that better demonstrates the solution but works with 2 threaded pool as well.
Output is
在您的工作示例中,您的 Callable 正在阻止支持中断的调用。如果您的真实代码在内部锁(
synchronized
块)上发生死锁,您将无法通过中断取消它。相反,您可以使用显式锁 (java.util.concurrent.Lock
),它允许您指定要等待锁获取的时间。如果线程等待锁超时(可能是因为遇到了死锁情况),它可能会中止并显示错误消息。顺便说一句,在您的示例中,您的
Callable
不应吞下InterruptedException
。您应该将其传递(重新抛出,或将 InterruptedException 添加到方法声明的 throws 行),或者在 catch 块中重置线程的中断状态(使用 Thread.currentThread ().interrupt())。In your worker example, your Callable is blocking on a call that supports interruption. If your real code is deadlocking on an intrinsic lock (a
synchronized
block), you won't be able to cancel it via interruption. Instead, you can use an explicit lock (java.util.concurrent.Lock
), which allows you to specify how long you want to wait for lock acquisition. If a thread times out waiting for a lock, possibly because it has encountered a deadlock situation, it can abort with an error message.By the way, in your example, your
Callable
shouldn't swallowInterruptedException
. You should either pass it up (rethrow, or addInterruptedException
to the throws line of your method declaration), or in the catch block, reset the interrupted state of the thread (withThread.currentThread().interrupt()
).您随时可以调用
future.get(timeout...)
如果尚未完成,它将返回超时异常...然后您可以调用
future.cancel()
。You can always call
future.get(timeout...)
It will return timeout exception if it did not finish yet... then you can call
future.cancel()
.