BlockingQueue 的rainTo() 方法的线程安全性

发布于 2024-11-18 21:01:19 字数 861 浏览 4 评论 0原文

BlockingQueue 的文档说批量操作不是线程安全的,尽管它没有明确提及该方法drainTo()。

BlockingQueue 实现是 线程安全。所有排队方式 使用原子方式实现其效果 内部锁或其他形式的 并发控制。然而,大部分 集合操作addAll, containsAll、retainAll 和 removeAll 不一定执行 除非另有说明,否则以原子方式 在一个实现中。原来如此 可能,例如,对于 addAll(c) 之后失败(抛出异常) 仅添加 c 中的部分元素。

DrainTo() 方法的文档指定不能以线程安全的方式修改 BlockingQueue 元素排出到的集合。但是,它没有提到有关 danceTo() 操作是线程安全的任何内容。

删除所有可用元素 这个队列并将它们添加到给定的 收藏。这个操作可能会比较多 比重复轮询更有效 队列。期间遇到故障 尝试添加元素到 集合 c 可能会产生元素 两者都不存在、其中之一或两者都存在 当关联时集合 抛出异常。尝试排水 队列本身导致 IllegalArgumentException。此外, 此操作的行为是 如果指定集合则未定义 操作进行时被修改 进展。

那么,drainTo()方法是线程安全的吗?换句话说,如果一个线程在阻塞队列上调用了rainTo()方法,而另一个线程在同一队列上调用了add()或put(),那么在这两个操作结束时队列的状态是否一致?

Documentation of BlockingQueue says bulk operations are not thread-safe, though it doesn't explicitly mention the method drainTo().

BlockingQueue implementations are
thread-safe. All queuing methods
achieve their effects atomically using
internal locks or other forms of
concurrency control. However, the bulk
Collection operations addAll,
containsAll, retainAll and removeAll
are not necessarily performed
atomically unless specified otherwise
in an implementation. So it is
possible, for example, for addAll(c)
to fail (throwing an exception) after
adding only some of the elements in c.

Documentation of drainTo() method specifies that the collection, to which the elements of BlockingQueue are drained to, cannot be modified in thread-safe fashion. But, it doesn't mention anything about drainTo() operation being thread-safe.

Removes all available elements from
this queue and adds them to the given
collection. This operation may be more
efficient than repeatedly polling this
queue. A failure encountered while
attempting to add elements to
collection c may result in elements
being in neither, either or both
collections when the associated
exception is thrown. Attempts to drain
a queue to itself result in
IllegalArgumentException. Further, the
behavior of this operation is
undefined if the specified collection
is modified while the operation is in
progress.

So, is drainTo() method thread-safe? In other words, if one thread has invoked drainTo() method on a blocking queue and other one is calling add() or put() on the same queue, is the queue's state consistent at the end of both the operations?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

倥絔 2024-11-25 21:01:19

我认为您混淆了术语“线程安全”和“原子”。它们的意思不同。方法可以是线程安全的但不是原子的,也可以是原子的(对于单个线程)而不是线程安全的。

线程安全是一个橡皮术语,很难在不循环的情况下进行定义。根据 Goetz 的说法,一个好的工作模型是,如果一个方法在多线程上下文中使用时与在单线程上下文中运行时“一样正确”,那么该方法就是线程安全的。弹性在于,正确性是主观的,除非有正式的规范来衡量。

相比之下,原子很容易定义。它只是意味着该操作要么完全发生,要么根本不发生。

所以你的问题的答案是,drainTo()是线程安全的,但不是原子的。它不是原子的,因为它可能在耗尽过程中抛出异常。但是,以此为模,无论其他线程是否同时对队列执行操作,队列仍将处于一致状态。


(上面的讨论隐含地表明,BlockingQueue 接口的具体实现正确地实现了该接口。如果没有正确实现,则一切都失败了。)

I think you are confusing the terms "thread-safe" and "atomic". They do not mean the same thing. A method can be thread-safe without being atomic, and can be atomic (for a single thread) without being thread-safe.

Thread-safe is a rubbery term that is hard to define without being circular. According to Goetz, a good working model is that a method is thread-safe if it is "as correct" when used in a multi-threaded context as it is run in a single-threaded context. The rubberyness is in the fact that correctness is subjective unless you have a formal specification to measure against.

By contrast, atomic is easy to define. It simply means that the operation either happens completely or it doesn't happen at all.

So the answer to your question is that drainTo() is thread-safe, but not atomic. It is not atomic because it could throw an exception half way through draining. However, modulo that, the queue will still be in a consistent state, whether or not other threads were doing things to the queue at the same time.


(It is implicit in the above discussion that the specific implementation of the BlockingQueue interface implements the interface correctly. If it doesn't, all bets are off.)

暮色兮凉城 2024-11-25 21:01:19

DrainTo() 是线程安全的,因为同时发生的队列上的任何操作都不会更改结果,也不会破坏队列的状态。否则,该方法将毫无意义。

如果目标集合(添加结果的集合)做了一些“聪明”的事情,您可能会遇到问题。但由于您通常将队列排空到只有单个线程可以访问的集合,因此这更多是一个理论上的问题。

drainTo() is thread safe in the sense that any operation on the queue that happens at the same time will not change the result nor will it corrupt the state of the queue. Otherwise, the method would be pretty pointless.

You could run into problems if the target collection (the one to which the results are added) does something "clever". But since you usually drain the queue to a collection to which only a single thread has access, it's more of a theoretical problem.

安静 2024-11-25 21:01:19

偶然发现这个问题,想添加一个实施信息。

来自 Java 8 source of PriorityBlockingQueue :

 /**
     * @throws UnsupportedOperationException {@inheritDoc}
     * @throws ClassCastException            {@inheritDoc}
     * @throws NullPointerException          {@inheritDoc}
     * @throws IllegalArgumentException      {@inheritDoc}
     */
    public int drainTo(Collection<? super E> c, int maxElements) {
        if (c == null)
            throw new NullPointerException();
        if (c == this)
            throw new IllegalArgumentException();
        if (maxElements <= 0)
            return 0;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            int n = Math.min(size, maxElements);
            for (int i = 0; i < n; i++) {
                c.add((E) queue[0]); // In this order, in case add() throws.
                dequeue();
            }
            return n;
        } finally {
            lock.unlock();
        }
    }

您可以看到一个 ReentrantLock 用于锁定关键部分。 poll()offer() 方法也使用相同的锁。因此,在本例中 PriorityBlockingQueue 的 BlockingQueue 实现确实是阻塞

stumbled upon this question and felt like adding an implementation info.

From Java 8 source of PriorityBlockingQueue :

 /**
     * @throws UnsupportedOperationException {@inheritDoc}
     * @throws ClassCastException            {@inheritDoc}
     * @throws NullPointerException          {@inheritDoc}
     * @throws IllegalArgumentException      {@inheritDoc}
     */
    public int drainTo(Collection<? super E> c, int maxElements) {
        if (c == null)
            throw new NullPointerException();
        if (c == this)
            throw new IllegalArgumentException();
        if (maxElements <= 0)
            return 0;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            int n = Math.min(size, maxElements);
            for (int i = 0; i < n; i++) {
                c.add((E) queue[0]); // In this order, in case add() throws.
                dequeue();
            }
            return n;
        } finally {
            lock.unlock();
        }
    }

You can see that a ReentrantLock is used to lock the critical section. The methods poll() and offer() are also using the same lock. So the BlockingQueue implementation in this case of PriorityBlockingQueue is indeed Blocking!

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文