Java ExecutorService invokeAll() 中断
我有一个宽度为 10 的固定线程池 ExecutorService
,以及一个包含 100 个 Callable
的列表,每个线程池等待 20 秒并记录它们的中断。
我在一个单独的线程中对该列表调用 invokeAll
,并且几乎立即中断该线程。 ExecutorService
执行按预期中断,但 Callable 实际记录的中断次数远多于预期的 10 个 - 大约 20-40 个。如果 ExecutorService 可以同时执行不超过 10 个线程,为什么会这样呢?
完整源代码:(由于并发性,您可能需要多次运行)
@Test
public void interrupt3() throws Exception{
int callableNum = 100;
int executorThreadNum = 10;
final AtomicInteger interruptCounter = new AtomicInteger(0);
final ExecutorService executorService = Executors.newFixedThreadPool(executorThreadNum);
final List <Callable <Object>> executeds = new ArrayList <Callable <Object>>();
for (int i = 0; i < callableNum; ++i) {
executeds.add(new Waiter(interruptCounter));
}
Thread watcher = new Thread(new Runnable() {
@Override
public void run(){
try {
executorService.invokeAll(executeds);
} catch(InterruptedException ex) {
// NOOP
}
}
});
watcher.start();
Thread.sleep(200);
watcher.interrupt();
Thread.sleep(200);
assertEquals(10, interruptCounter.get());
}
// This class just waits for 20 seconds, recording it's interrupts
private class Waiter implements Callable <Object> {
private AtomicInteger interruptCounter;
public Waiter(AtomicInteger interruptCounter){
this.interruptCounter = interruptCounter;
}
@Override
public Object call() throws Exception{
try {
Thread.sleep(20000);
} catch(InterruptedException ex) {
interruptCounter.getAndIncrement();
}
return null;
}
}
使用 WinXP 32 位、Oracle JRE 1.6.0_27 和 JUnit4
I have a fixed thread pool ExecutorService
of width 10, and a list of 100 Callable
's, each waiting for 20 seconds and recording their interrupts.
I'm calling invokeAll
on that list in a separate thread, and almost immediately interrupting this thread. ExecutorService
execution is interrupted as expected, but the actual number of interrupts recorded by Callable
s is far more than expected 10 - around 20-40. Why is that so, if ExecutorService
can execute no more than 10 threads simultaneously?
Full source: (You may need to run it more that once due to concurrency)
@Test
public void interrupt3() throws Exception{
int callableNum = 100;
int executorThreadNum = 10;
final AtomicInteger interruptCounter = new AtomicInteger(0);
final ExecutorService executorService = Executors.newFixedThreadPool(executorThreadNum);
final List <Callable <Object>> executeds = new ArrayList <Callable <Object>>();
for (int i = 0; i < callableNum; ++i) {
executeds.add(new Waiter(interruptCounter));
}
Thread watcher = new Thread(new Runnable() {
@Override
public void run(){
try {
executorService.invokeAll(executeds);
} catch(InterruptedException ex) {
// NOOP
}
}
});
watcher.start();
Thread.sleep(200);
watcher.interrupt();
Thread.sleep(200);
assertEquals(10, interruptCounter.get());
}
// This class just waits for 20 seconds, recording it's interrupts
private class Waiter implements Callable <Object> {
private AtomicInteger interruptCounter;
public Waiter(AtomicInteger interruptCounter){
this.interruptCounter = interruptCounter;
}
@Override
public Object call() throws Exception{
try {
Thread.sleep(20000);
} catch(InterruptedException ex) {
interruptCounter.getAndIncrement();
}
return null;
}
}
Using WinXP 32-bit, Oracle JRE 1.6.0_27 and JUnit4
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(3)
我不同意您应该只接收 10 次中断的假设。
本质上,我的论点是,不能保证 Watcher 线程在必须产生时间片并允许 ExecutorService 的工作线程启动更多 Waiter 任务之前被允许取消所有 100 个“Waiter”RunnableFuture 实例。
更新:显示来自
AbstractExecutorService
的代码包含
f.cancel(true)
的finally块是中断将传播到当前任务的时间。跑步。正如您所看到的,这是一个紧密循环,但不能保证执行循环的线程能够在一个时间片内迭代Future
的所有实例。I disagree with the hypothesis that you should only receive 10 interrupts.
Essentially, my argument is that there is no guarantee that the Watcher thread will be allowed to cancel all 100 "Waiter" RunnableFuture instances before it has to yield the time slice and allow the ExecutorService's worker threads to start more Waiter tasks.
Update: Showing code from
AbstractExecutorService
The finally block which contains
f.cancel(true)
is when the interrupt would be propagated to task which is currently running. As you can see, this is a tight loop, but there is no guarantee that the thread executing the loop would be able to iterate through all instances ofFuture
in one time slice.如果您想实现相同的行为,
请在中断线程池之前添加此块。
它将把所有等待队列排入新列表。
所以它只会中断正在运行的线程。
If you want to achieve same behaviour
Adding this block before interrupt the threadpool.
It will drain all the waiting queue into new list.
So it will interrupt only running threads.