Java ExecutorService invokeAll() 中断

发布于 2024-12-13 00:34:19 字数 1818 浏览 1 评论 0原文

我有一个宽度为 10 的固定线程池 ExecutorService,以及一个包含 100 个 Callable 的列表,每个线程池等待 20 秒并记录它们的中断。

我在一个单独的线程中对该列表调用 invokeAll ,并且几乎立即中断该线程。 ExecutorService 执行按预期中断,但 Callable 实际记录的中断次数远多于预期的 10 个 - 大约 20-40 个。如果 ExecutorService 可以同时执行不超过 10 个线程,为什么会这样呢?

完整源代码:(由于并发性,您可能需要多次运行)

@Test
public void interrupt3() throws Exception{
    int callableNum = 100;
    int executorThreadNum = 10;
    final AtomicInteger interruptCounter = new AtomicInteger(0);
    final ExecutorService executorService = Executors.newFixedThreadPool(executorThreadNum);
    final List <Callable <Object>> executeds = new ArrayList <Callable <Object>>();
    for (int i = 0; i < callableNum; ++i) {
        executeds.add(new Waiter(interruptCounter));
    }
    Thread watcher = new Thread(new Runnable() {

        @Override
        public void run(){
            try {
                executorService.invokeAll(executeds);
            } catch(InterruptedException ex) {
                // NOOP
            }
        }
    });
    watcher.start();
    Thread.sleep(200);
    watcher.interrupt();
    Thread.sleep(200);
    assertEquals(10, interruptCounter.get());
}

// This class just waits for 20 seconds, recording it's interrupts
private class Waiter implements Callable <Object> {
    private AtomicInteger    interruptCounter;

    public Waiter(AtomicInteger interruptCounter){
        this.interruptCounter = interruptCounter;
    }

    @Override
    public Object call() throws Exception{
        try {
            Thread.sleep(20000);
        } catch(InterruptedException ex) {
            interruptCounter.getAndIncrement();
        }
        return null;
    }
}

使用 WinXP 32 位、Oracle JRE 1.6.0_27 和 JUnit4

I have a fixed thread pool ExecutorService of width 10, and a list of 100 Callable's, each waiting for 20 seconds and recording their interrupts.

I'm calling invokeAll on that list in a separate thread, and almost immediately interrupting this thread. ExecutorService execution is interrupted as expected, but the actual number of interrupts recorded by Callables is far more than expected 10 - around 20-40. Why is that so, if ExecutorService can execute no more than 10 threads simultaneously?

Full source: (You may need to run it more that once due to concurrency)

@Test
public void interrupt3() throws Exception{
    int callableNum = 100;
    int executorThreadNum = 10;
    final AtomicInteger interruptCounter = new AtomicInteger(0);
    final ExecutorService executorService = Executors.newFixedThreadPool(executorThreadNum);
    final List <Callable <Object>> executeds = new ArrayList <Callable <Object>>();
    for (int i = 0; i < callableNum; ++i) {
        executeds.add(new Waiter(interruptCounter));
    }
    Thread watcher = new Thread(new Runnable() {

        @Override
        public void run(){
            try {
                executorService.invokeAll(executeds);
            } catch(InterruptedException ex) {
                // NOOP
            }
        }
    });
    watcher.start();
    Thread.sleep(200);
    watcher.interrupt();
    Thread.sleep(200);
    assertEquals(10, interruptCounter.get());
}

// This class just waits for 20 seconds, recording it's interrupts
private class Waiter implements Callable <Object> {
    private AtomicInteger    interruptCounter;

    public Waiter(AtomicInteger interruptCounter){
        this.interruptCounter = interruptCounter;
    }

    @Override
    public Object call() throws Exception{
        try {
            Thread.sleep(20000);
        } catch(InterruptedException ex) {
            interruptCounter.getAndIncrement();
        }
        return null;
    }
}

Using WinXP 32-bit, Oracle JRE 1.6.0_27 and JUnit4

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

∝单色的世界 2024-12-20 00:34:19

我不同意您应该只接收 10 次中断的假设。

Assume the CPU has 1 core.
1. Main thread starts Watcher and sleeps
2. Watcher starts and adds 100 Waiters then blocks
3. Waiter 1-10 start and sleep in sequence
4. Main wakes and interrupts Watcher then sleeps
5. Watcher cancels Waiter 1-5 then is yielded by the OS   (now we have 5 interrupts)
6. Waiter 11-13 start and sleep
7. Watcher cancels Waiter 6-20 then is yielded by the OS   (now we have 13 interrupts)
8. Waiter 14-20 are "started" resulting in a no-op
9. Waiter 21-24 start and sleep
....

本质上,我的论点是,不能保证 Watcher 线程在必须产生时间片并允许 ExecutorService 的工作线程启动更多 Waiter 任务之前被允许取消所有 100 个“Waiter”RunnableFuture 实例。

更新:显示来自AbstractExecutorService的代码

public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
    throws InterruptedException {
    if (tasks == null)
        throw new NullPointerException();
    List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
    boolean done = false;
    try {
        for (Callable<T> t : tasks) {
            RunnableFuture<T> f = newTaskFor(t);
            futures.add(f);
            execute(f);
        }
        for (Future<T> f : futures) {
            if (!f.isDone()) {
                try {
                    f.get(); //If interrupted, this is where the InterruptedException will be thrown from
                } catch (CancellationException ignore) {
                } catch (ExecutionException ignore) {
                }
            }
        }
        done = true;
        return futures;
    } finally {
        if (!done)
            for (Future<T> f : futures)
                f.cancel(true); //Specifying "true" is what allows an interrupt to be sent to the ExecutorService's worker threads
    }
}

包含f.cancel(true)的finally块是中断将传播到当前任务的时间。跑步。正如您所看到的,这是一个紧密循环,但不能保证执行循环的线程能够在一个时间片内迭代 Future 的所有实例。

I disagree with the hypothesis that you should only receive 10 interrupts.

Assume the CPU has 1 core.
1. Main thread starts Watcher and sleeps
2. Watcher starts and adds 100 Waiters then blocks
3. Waiter 1-10 start and sleep in sequence
4. Main wakes and interrupts Watcher then sleeps
5. Watcher cancels Waiter 1-5 then is yielded by the OS   (now we have 5 interrupts)
6. Waiter 11-13 start and sleep
7. Watcher cancels Waiter 6-20 then is yielded by the OS   (now we have 13 interrupts)
8. Waiter 14-20 are "started" resulting in a no-op
9. Waiter 21-24 start and sleep
....

Essentially, my argument is that there is no guarantee that the Watcher thread will be allowed to cancel all 100 "Waiter" RunnableFuture instances before it has to yield the time slice and allow the ExecutorService's worker threads to start more Waiter tasks.

Update: Showing code from AbstractExecutorService

public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
    throws InterruptedException {
    if (tasks == null)
        throw new NullPointerException();
    List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
    boolean done = false;
    try {
        for (Callable<T> t : tasks) {
            RunnableFuture<T> f = newTaskFor(t);
            futures.add(f);
            execute(f);
        }
        for (Future<T> f : futures) {
            if (!f.isDone()) {
                try {
                    f.get(); //If interrupted, this is where the InterruptedException will be thrown from
                } catch (CancellationException ignore) {
                } catch (ExecutionException ignore) {
                }
            }
        }
        done = true;
        return futures;
    } finally {
        if (!done)
            for (Future<T> f : futures)
                f.cancel(true); //Specifying "true" is what allows an interrupt to be sent to the ExecutorService's worker threads
    }
}

The finally block which contains f.cancel(true) is when the interrupt would be propagated to task which is currently running. As you can see, this is a tight loop, but there is no guarantee that the thread executing the loop would be able to iterate through all instances of Future in one time slice.

熊抱啵儿 2024-12-20 00:34:19

如果您想实现相同的行为,

    ArrayList<Runnable> runnables = new ArrayList<Runnable>();
    executorService.getQueue().drainTo(runnables);

请在中断线程池之前添加此块。

它将把所有等待队列排入新列表。

所以它只会中断正在运行的线程。

If you want to achieve same behaviour

    ArrayList<Runnable> runnables = new ArrayList<Runnable>();
    executorService.getQueue().drainTo(runnables);

Adding this block before interrupt the threadpool.

It will drain all the waiting queue into new list.

So it will interrupt only running threads.

抱猫软卧 2024-12-20 00:34:19
PowerMock.mockStatic ( Executors.class );
EasyMock.expect ( Executors.newFixedThreadPool ( 9 ) ).andReturn ( executorService );

Future<MyObject> callableMock = (Future<MyObject>) 
EasyMock.createMock ( Future.class );
EasyMock.expect ( callableMock.get ( EasyMock.anyLong (), EasyMock.isA ( TimeUnit.class ) ) ).andReturn ( ccs ).anyTimes ();

List<Future<MyObject>> futures = new ArrayList<Future<MyObject>> ();
futures.add ( callableMock );
EasyMock.expect ( executorService.invokeAll ( EasyMock.isA ( List.class ) ) ).andReturn ( futures ).anyTimes ();

executorService.shutdown ();
EasyMock.expectLastCall ().anyTimes ();

EasyMock.expect ( mock.getMethodCall ( ) ).andReturn ( result ).anyTimes ();

PowerMock.replayAll ();
EasyMock.replay ( callableMock, executorService, mock );

Assert.assertEquals ( " ", answer.get ( 0 ) );
PowerMock.verifyAll ();
PowerMock.mockStatic ( Executors.class );
EasyMock.expect ( Executors.newFixedThreadPool ( 9 ) ).andReturn ( executorService );

Future<MyObject> callableMock = (Future<MyObject>) 
EasyMock.createMock ( Future.class );
EasyMock.expect ( callableMock.get ( EasyMock.anyLong (), EasyMock.isA ( TimeUnit.class ) ) ).andReturn ( ccs ).anyTimes ();

List<Future<MyObject>> futures = new ArrayList<Future<MyObject>> ();
futures.add ( callableMock );
EasyMock.expect ( executorService.invokeAll ( EasyMock.isA ( List.class ) ) ).andReturn ( futures ).anyTimes ();

executorService.shutdown ();
EasyMock.expectLastCall ().anyTimes ();

EasyMock.expect ( mock.getMethodCall ( ) ).andReturn ( result ).anyTimes ();

PowerMock.replayAll ();
EasyMock.replay ( callableMock, executorService, mock );

Assert.assertEquals ( " ", answer.get ( 0 ) );
PowerMock.verifyAll ();
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文