什么情况下BlockingQueue.take会抛出中断异常?

发布于 2024-08-16 04:07:52 字数 410 浏览 2 评论 0原文

假设我有一个线程消耗另一个线程生成的项目。它的run方法如下,inQueue是一个BlockingQueue,

boolean shutdown = false;
while (!shutdown) {
    try {
        WorkItem w = inQueue.take();
        w.consume();
    } catch (InterruptedException e) { 
        shutdown = true;
    }
}

此外,另一个线程会通过中断这个正在运行的线程来发出没有​​更多工作项的信号。如果 take() 不需要阻塞来检索下一个工作项,则会抛出中断异常。即,如果生产者发出信号表示已完成填充工作队列,是否有可能意外地将某些项目留在 inQueue 中或错过中断?

Let us suppose that I have a thread that consumes items produced by another thread. Its run method is as follows, with inQueue being a BlockingQueue

boolean shutdown = false;
while (!shutdown) {
    try {
        WorkItem w = inQueue.take();
        w.consume();
    } catch (InterruptedException e) { 
        shutdown = true;
    }
}

Furthermore, a different thread will signal that there are no more work items by interrupting this running thread. Will take() throw an interrupted exception if it does not need to block to retrieve the next work item. i.e. if the producer signals that it is done filling the work queue, is it possible to accidentally leave some items in inQueue or miss the interrupt?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(5

酒浓于脸红 2024-08-23 04:07:52

发出阻塞队列终止信号的一个好方法是向队列提交一个“poison”值,表明已发生关闭。这确保了队列的预期行为得到遵守。如果您关心清除队列,调用 Thread.interupt() 可能不是一个好主意。

提供一些代码:

boolean shutdown = false;
while (!shutdown) {
    try {
        WorkItem w = inQueue.take();
        if (w == QUEUE_IS_DEAD)
          shutdown = true;
        else
          w.consume();
    } catch (InterruptedException e) { 
        // possibly submit QUEUE_IS_DEAD to the queue
    }
}

A good way to signal termination of a blocking queue is to submit a 'poison' value into the queue that indicates a shutdown has occurred. This ensures that the expected behavior of the queue is honored. Calling Thread.interupt() is probably not a good idea if you care about clearing the queue.

To provide some code:

boolean shutdown = false;
while (!shutdown) {
    try {
        WorkItem w = inQueue.take();
        if (w == QUEUE_IS_DEAD)
          shutdown = true;
        else
          w.consume();
    } catch (InterruptedException e) { 
        // possibly submit QUEUE_IS_DEAD to the queue
    }
}
乖乖公主 2024-08-23 04:07:52

我想知道同样的事情并阅读 take() 我相信只有在取出队列中的所有项目后才会抛出中断异常,因为如果队列有项目,它就不会必须“等待”。
但我做了一个小测试:

package se.fkykko.slask;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicLong;

public class BlockingQueueTakeTest {

public static void main(String[] args) throws Exception {
    Runner t = new Runner();
    Thread t1 = new Thread(t);
    for (int i = 0; i < 50; i++) {
        t.queue.add(i);
    }
    System.out.println(("Number of items in queue: " + t.queue.size()));
    t1.start();
    Thread.sleep(1000);
    t1.interrupt();
    t1.join();
    System.out.println(("Number of items in queue: " + t.queue.size()));
    System.out.println(("Joined t1. Finished"));

}

private static final class Runner implements Runnable {
    BlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(100);
    AtomicLong m_count = new AtomicLong(0);

    @Override
    public void run() {
        try {
            while (true) {
                queue.take();
                System.out.println("Took item " + m_count.incrementAndGet());
                final long start = System.currentTimeMillis();
                while ((System.currentTimeMillis() - start) < 100) {
                    Thread.yield(); //Spin wait
                }
            }
        }
        catch (InterruptedException ex) {
            System.out.println("Interrupted. Count: " + m_count.get());
        }
    }
}

}

跑步者将获取 10-11 个项目,然后完成,即 take() 将抛出 InterruptedException,即使队列中仍然有项目。

摘要: 使用毒丸方法,然后您就可以完全控制队列中剩余的数量。

I wondered about the same thing and reading the javadoc for take() I believed that it would throw an interrupted exception only after having taken all the items in the queue, since if the queue had items, it would not have to "wait".
But I made a small test:

package se.fkykko.slask;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicLong;

public class BlockingQueueTakeTest {

public static void main(String[] args) throws Exception {
    Runner t = new Runner();
    Thread t1 = new Thread(t);
    for (int i = 0; i < 50; i++) {
        t.queue.add(i);
    }
    System.out.println(("Number of items in queue: " + t.queue.size()));
    t1.start();
    Thread.sleep(1000);
    t1.interrupt();
    t1.join();
    System.out.println(("Number of items in queue: " + t.queue.size()));
    System.out.println(("Joined t1. Finished"));

}

private static final class Runner implements Runnable {
    BlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(100);
    AtomicLong m_count = new AtomicLong(0);

    @Override
    public void run() {
        try {
            while (true) {
                queue.take();
                System.out.println("Took item " + m_count.incrementAndGet());
                final long start = System.currentTimeMillis();
                while ((System.currentTimeMillis() - start) < 100) {
                    Thread.yield(); //Spin wait
                }
            }
        }
        catch (InterruptedException ex) {
            System.out.println("Interrupted. Count: " + m_count.get());
        }
    }
}

}

The runner will take 10-11 items and then finish i.e. take() will throw InterruptedException even if there still is items in the queue.

Summary: Use the Poison pill approach instead, then you have full control over how much is left in the queue.

风情万种。 2024-08-23 04:07:52

According to javadoc, the take() method will throw InterruptedException if interrupted while waiting.

如果没有你 2024-08-23 04:07:52

如果您使用 ExecutorService::execute(Runnable) 启动线程,则通常无法从外部代码中断 ExecutorService 的线程,因为外部代码没有对每个正在运行的线程的 Thread 对象的引用(如果您需要 ExecutorService::execute ,请参阅此答案的末尾以获取解决方案)。但是,如果您改为使用 ExecutorService::submit(Callable) 提交作业,则会返回一个 Future,它在内部保留对一旦Callable::call()开始执行,正在运行的线程。该线程可以通过调用 Future::cancel(true) 来中断。因此,Callable 内(或由其调用)检查当前线程中断状态的任何代码都可以通过 Future 引用来中断。这包括 BlockingQueue::take(),即使被阻塞,它也会响应线程中断。 (如果在阻塞时被中断,JRE 阻塞方法通常会醒来,意识到它们已被中断,并抛出 InterruptedException。)

总结一下:Future::cancel() 和 < code>Future::cancel(true) 都取消未来的工作,而 Future::cancel(true)中断正在进行的工作 >(只要正在进行的工作响应线程中断)。两个 cancel 调用都不会影响已成功完成的工作。

请注意,一旦线程因取消而中断,就会在线程内引发 InterruptException(例如,在本例中通过 BlockingQueue::take())。但是,下次您在成功取消的 Future 上调用 Future::get() 时,将在主线程中抛出 CancellationException 异常。 (即在完成之前被取消的 Future)。这与您通常期望的不同:如果未取消的 Callable 抛出 InterruptedException,则下一次调用 Future::get()将抛出 InterruptedException,但如果取消的 Callable 抛出 InterruptedException,则下一次调用 Future::get()将通过CancellationException

下面是一个说明这一点的示例:

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;

public class Test {
    public static void main(String[] args) throws Exception {
        // Start Executor with 4 threads
        int numThreads = 4;
        ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(numThreads);
        try {
            // Set up BlockingQueue for inputs, and List<Future> for outputs
            BlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>();
            List<Future<String>> futures = new ArrayList<>(numThreads);
            for (int i = 0; i < numThreads; i++) {
                int threadIdx = i;
                futures.add(executor.submit(new Callable<String>() {
                    @Override
                    public String call() throws Exception {
                        try {
                            // Get an input from the queue (blocking)
                            int val = queue.take();
                            return "Thread " + threadIdx + " got value " + val;
                        } catch (InterruptedException e) {
                            // Thrown once Future::cancel(true) is called
                            System.out.println("Thread " + threadIdx + " got interrupted");
                            // This value is returned to the Future, but can never
                            // be read, since the caller will get a CancellationException
                            return "Thread " + threadIdx + " got no value";
                        }
                    }
                }));
            }

            // Enqueue (numThreads - 1) values into the queue, so that one thread blocks
            for (int i = 0; i < numThreads - 1; i++) {
                queue.add(100 + i);
            }

            // Cancel all futures
            for (int i = 0; i < futures.size(); i++) {
                Future<String> future = futures.get(i);
                // Cancel the Future -- this doesn't throw an exception until
                // the get() method is called
                future.cancel(/* mayInterruptIfRunning = */ true);
                try {
                    System.out.println(future.get());
                } catch (CancellationException e) {
                    System.out.println("Future " + i + " was cancelled");
                }
            }
        } finally {
            // Terminate main after all threads have shut down (this call does not block,
            // so main will exit before the threads stop running)
            executor.shutdown();
        }
    }
}

每次运行此命令时,输出都会不同,但这是一次运行:

Future 1 was cancelled
Future 0 was cancelled
Thread 2 got value 100
Thread 3 got value 101
Thread 1 got interrupted

这表明线程 2 和线程 3 在调用 Future::cancel() 之前完成。线程 1 被取消,因此在内部抛出了 InterruptedException,在外部抛出了 CancellationException。线程 0 在开始运行之前被取消。 (请注意,线程索引通常不会与 Future 索引相关,因此 Future 0 被取消 可能对应于线程 0 或线程 1 被取消,并且Future 1 被取消也是如此。)

高级:使用 Executor::execute 实现相同效果的一种方法(不返回 < code>Future 参考)而不是 Executor::submit 将使用自定义 ThreadFactory 创建一个 ThreadPoolExecutor,并让您的ThreadFactory 在并发集合(例如并发队列)中为创建的每个线程记录一个引用。然后,要取消所有线程,您只需对所有先前创建的线程调用 Thread::interrupt() 即可。但是,您需要处理竞争条件,即在中断现有线程时可能会创建新线程。要处理此问题,请设置一个对 ThreadFactory 可见的 AtomicBoolean 标志,告诉它不要创建更多线程,然后在设置后取消现有线程。

You can't in general interrupt the threads of an ExecutorService from external code if you used ExecutorService::execute(Runnable) to start the threads, because external code does not have a reference to the Thread objects of each of the running threads (see the end of this answer for a solution though, if you need ExecutorService::execute). However, if you instead use ExecutorService::submit(Callable<T>) to submit the jobs, you get back a Future<T>, which internally keeps a reference to the running thread once Callable::call() begins execution. This thread can be interrupted by calling Future::cancel(true). Any code within (or called by) the Callable that checks the current thread's interrupt status can therefore be interrupted via the Future reference. This includes BlockingQueue::take(), which, even when blocked, will respond to thread interruption. (JRE blocking methods will typically wake up if interrupted while blocked, realize they have been interrupted, and throw an InterruptedException.)

To summarize: Future::cancel() and Future::cancel(true) both cancel future work, while Future::cancel(true) also interrupts ongoing work (as long as the ongoing work responds to thread interrupt). Neither of the two cancel invocations affects work that has already successfully completed.

Note that once a thread is interrupted by cancellation, an InterruptException will be thrown within the thread (e.g. by BlockingQueue::take() in this case). However, you a CancellationException will be thrown back in the main thread the next time you call Future::get() on a successfully cancelled Future (i.e. a Future that was cancelled before it completed). This is different from what you would normally expect: if a non-cancelled Callable throws InterruptedException, the next call to Future::get() will throw InterruptedException, but if a cancelled Callable throws InterruptedException, the next call to Future::get() will through CancellationException.

Here's an example that illustrates this:

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;

public class Test {
    public static void main(String[] args) throws Exception {
        // Start Executor with 4 threads
        int numThreads = 4;
        ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(numThreads);
        try {
            // Set up BlockingQueue for inputs, and List<Future> for outputs
            BlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>();
            List<Future<String>> futures = new ArrayList<>(numThreads);
            for (int i = 0; i < numThreads; i++) {
                int threadIdx = i;
                futures.add(executor.submit(new Callable<String>() {
                    @Override
                    public String call() throws Exception {
                        try {
                            // Get an input from the queue (blocking)
                            int val = queue.take();
                            return "Thread " + threadIdx + " got value " + val;
                        } catch (InterruptedException e) {
                            // Thrown once Future::cancel(true) is called
                            System.out.println("Thread " + threadIdx + " got interrupted");
                            // This value is returned to the Future, but can never
                            // be read, since the caller will get a CancellationException
                            return "Thread " + threadIdx + " got no value";
                        }
                    }
                }));
            }

            // Enqueue (numThreads - 1) values into the queue, so that one thread blocks
            for (int i = 0; i < numThreads - 1; i++) {
                queue.add(100 + i);
            }

            // Cancel all futures
            for (int i = 0; i < futures.size(); i++) {
                Future<String> future = futures.get(i);
                // Cancel the Future -- this doesn't throw an exception until
                // the get() method is called
                future.cancel(/* mayInterruptIfRunning = */ true);
                try {
                    System.out.println(future.get());
                } catch (CancellationException e) {
                    System.out.println("Future " + i + " was cancelled");
                }
            }
        } finally {
            // Terminate main after all threads have shut down (this call does not block,
            // so main will exit before the threads stop running)
            executor.shutdown();
        }
    }
}

Each time you run this, the output will be different, but here's one run:

Future 1 was cancelled
Future 0 was cancelled
Thread 2 got value 100
Thread 3 got value 101
Thread 1 got interrupted

This shows that Thread 2 and Thread 3 completed before Future::cancel() was called. Thread 1 was cancelled, so internally InterruptedException was thrown, and externally CancellationException was thrown. Thread 0 was cancelled before it started running. (Note that the thread indices won't in general correlate with the Future indices, so Future 0 was cancelled could correspond to either thread 0 or thread 1 being cancelled, and the same for Future 1 was cancelled.)

Advanced: one way to achieve the same effect with Executor::execute (which does not return a Future reference) rather than Executor::submit would be to create a ThreadPoolExecutor with a custom ThreadFactory, and have your ThreadFactory record a reference in a concurrent collection (e.g. a concurrent queue) for every thread created. Then to cancel all threads, you can simply call Thread::interrupt() on all previously-created threads. However, you will need to deal with the race condition that new threads may be created while you are interrupting existing threads. To handle this, set an AtomicBoolean flag, visible to the ThreadFactory, that tells it not to create any more threads, then once that is set, cancel the existing threads.

梦初启 2024-08-23 04:07:52

java.concurrency.utils 包是由并发编程领域的一些最优秀的人才设计和实现的。此外,他们的书“Java Concurrency in Practice”明确认可将中断线程作为终止线程的方法。因此,如果由于中断而导致任何项目留在队列中,我会感到非常惊讶。

The java.concurrency.utils package was designed and implemented by some of the finest minds in concurrent programming. Also, interrupting threads as a means to terminate them is explicitly endorsed by their book "Java Concurrency in Practice". Therefore, I would be extremely surprised if any items were left in the queue due to an interrupt.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文