BlockingQueue的实现:SynchronousQueue和LinkedBlockingQueue有什么区别

发布于 2024-10-19 03:48:25 字数 999 浏览 12 评论 0原文

我看到了 BlockingQueue 的这些实现,但无法理解它们之间的差异。到目前为止我的结论是:

  1. 我永远不需要 SynchronousQueue
  2. LinkedBlockingQueue 确保 FIFO,BlockingQueue 必须使用参数 true 创建以使其成为 FIFO
  3. SynchronousQueue 破坏了大多数集合方法(包含、大小等)

那么我什么时候需要 SynchronousQueue ?这个实现的性能比LinkedBlockingQueue更好吗?

让事情变得更复杂......为什么当其他(Executors.newSingleThreadExecutorExecutors.newFixedThreadPool)使用LinkedBlockingQueue时Executors.newCachedThreadPool使用SynchronousQueue ?

编辑

第一个问题已解决。但我仍然不明白为什么当其他(Executors.newSingleThreadExecutorExecutors.newFixedThreadPool)使用LinkedBlockingQueue时Executors.newCachedThreadPool使用SynchronousQueue?

我得到的是,使用 SynchronousQueue,如果没有空闲线程,生产者将被阻塞。但由于线程数量实际上是无限的(如果需要,将创建新线程),因此这种情况永远不会发生。那么为什么要使用SynchronousQueue呢?

I see these implementation of BlockingQueue and can't understand the differences between them. My conclusion so far:

  1. I won't ever need SynchronousQueue
  2. LinkedBlockingQueue ensures FIFO, BlockingQueue must be created with parameter true to make it FIFO
  3. SynchronousQueue breaks most collections method (contains, size, etc)

So when do I ever need SynchronousQueue? Is the performance of this implementation better than LinkedBlockingQueue?

To make it more complicated... why does Executors.newCachedThreadPool use SynchronousQueue when the others (Executors.newSingleThreadExecutor and Executors.newFixedThreadPool) use LinkedBlockingQueue?

EDIT

The first question is solved. But I still don't understand why does Executors.newCachedThreadPool use SynchronousQueue when the others (Executors.newSingleThreadExecutor and Executors.newFixedThreadPool) use LinkedBlockingQueue?

What I get is, with SynchronousQueue, the producer will be blocked if there is no free thread. But since the number of threads is practically unlimited (new threads will be created if needed), this will never happen. So why should it uses SynchronousQueue?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

水晶透心 2024-10-26 03:48:25

SynchronousQueue 是一种非常特殊的队列 - 它在 Queue 接口后面实现了一种集合方法(生产者等待直到消费者准备好,消费者等待直到生产者准备好)。

因此,您可能仅在需要特定语义的特殊情况下才需要它,例如 单线程执行任务,无需排队进一步的请求

使用 SynchronousQueue 的另一个原因是性能。 SynchronousQueue 的实现似乎经过了高度优化,因此,如果您只需要一个集合点(如 Executors.newCachedThreadPool() 的情况,其中消费者是“按需”创建的,因此队列项不会累积),您可以通过使用 SynchronousQueue 获得性能提升。

简单的综合测试表明,在双核机器上的简单单生产者 - 单消费者场景中,SynchronousQueue 的吞吐量比 LinkedBlockingQueueArrayBlockingQueue 的吞吐量高出约 20 倍 队列长度 = 1。当队列长度增加时,它们的吞吐量上升,几乎达到 SynchronousQueue 的吞吐量。这意味着与其他队列相比,SynchronousQueue 在多核机器上的同步开销较低。但同样,只有在特定情况下,当您需要伪装成队列的集合点时,它才重要。

编辑:

这是一个测试:

public class Test {
    static ExecutorService e = Executors.newFixedThreadPool(2);
    static int N = 1000000;

    public static void main(String[] args) throws Exception {    
        for (int i = 0; i < 10; i++) {
            int length = (i == 0) ? 1 : i * 5;
            System.out.print(length + "\t");
            System.out.print(doTest(new LinkedBlockingQueue<Integer>(length), N) + "\t");
            System.out.print(doTest(new ArrayBlockingQueue<Integer>(length), N) + "\t");
            System.out.print(doTest(new SynchronousQueue<Integer>(), N));
            System.out.println();
        }

        e.shutdown();
    }

    private static long doTest(final BlockingQueue<Integer> q, final int n) throws Exception {
        long t = System.nanoTime();

        e.submit(new Runnable() {
            public void run() {
                for (int i = 0; i < n; i++)
                    try { q.put(i); } catch (InterruptedException ex) {}
            }
        });    

        Long r = e.submit(new Callable<Long>() {
            public Long call() {
                long sum = 0;
                for (int i = 0; i < n; i++)
                    try { sum += q.take(); } catch (InterruptedException ex) {}
                return sum;
            }
        }).get();
        t = System.nanoTime() - t;

        return (long)(1000000000.0 * N / t); // Throughput, items/sec
    }
}    

这是我机器上的结果:

在此处输入图像描述

SynchronousQueue is a very special kind of queue - it implements a rendezvous approach (producer waits until consumer is ready, consumer waits until producer is ready) behind the interface of Queue.

Therefore you may need it only in the special cases when you need that particular semantics, for example, Single threading a task without queuing further requests.

Another reason for using SynchronousQueue is performance. Implementation of SynchronousQueue seems to be heavily optimized, so if you don't need anything more than a rendezvous point (as in the case of Executors.newCachedThreadPool(), where consumers are created "on-demand", so that queue items don't accumulate), you can get a performance gain by using SynchronousQueue.

Simple synthetic test shows that in a simple single producer - single consumer scenario on dual-core machine throughput of SynchronousQueue is ~20 time higher that throughput of LinkedBlockingQueue and ArrayBlockingQueue with queue length = 1. When queue length is increased, their throughput rises and almost reaches throughput of SynchronousQueue. It means that SynchronousQueue has low synchronization overhead on multi-core machines compared to other queues. But again, it matters only in specific circumstances when you need a rendezvous point disguised as Queue.

EDIT:

Here is a test:

public class Test {
    static ExecutorService e = Executors.newFixedThreadPool(2);
    static int N = 1000000;

    public static void main(String[] args) throws Exception {    
        for (int i = 0; i < 10; i++) {
            int length = (i == 0) ? 1 : i * 5;
            System.out.print(length + "\t");
            System.out.print(doTest(new LinkedBlockingQueue<Integer>(length), N) + "\t");
            System.out.print(doTest(new ArrayBlockingQueue<Integer>(length), N) + "\t");
            System.out.print(doTest(new SynchronousQueue<Integer>(), N));
            System.out.println();
        }

        e.shutdown();
    }

    private static long doTest(final BlockingQueue<Integer> q, final int n) throws Exception {
        long t = System.nanoTime();

        e.submit(new Runnable() {
            public void run() {
                for (int i = 0; i < n; i++)
                    try { q.put(i); } catch (InterruptedException ex) {}
            }
        });    

        Long r = e.submit(new Callable<Long>() {
            public Long call() {
                long sum = 0;
                for (int i = 0; i < n; i++)
                    try { sum += q.take(); } catch (InterruptedException ex) {}
                return sum;
            }
        }).get();
        t = System.nanoTime() - t;

        return (long)(1000000000.0 * N / t); // Throughput, items/sec
    }
}    

And here is a result on my machine:

enter image description here

深海少女心 2024-10-26 03:48:25

目前,默认的 Executors(基于 ThreadPoolExecutor)可以使用一组固定大小的预先创建的线程和某个大小的 BlockingQueue当(且仅当)该队列已满时,任何溢出或创建达到最大大小的线程。

这导致了一些令人惊讶的特性。例如,由于仅在达到队列容量后才会创建额外的线程,因此使用 LinkedBlockingQueue(无界)意味着新线程将永远被创建,即使当前池大小为零。如果您使用 ArrayBlockingQueue ,则仅当队列已满时才会创建新线程,并且如果池尚未清除空间,后续作业很可能会被拒绝。

SynchronousQueue 的容量为零,因此生产者会阻塞,直到消费者可用或创建线程为止。这意味着,尽管 @axtavt 生成的数据令人印象深刻,但从生产者的角度来看,缓存线程池通常具有最差的性能。

不幸的是,目前还没有一个好的库版本可以在突发或活动期间从最低限度创建线程到最高限度。您要么拥有一个可扩展的池,要么拥有一个固定的池。我们内部有一个,但尚未准备好供公众使用。

Currently the default Executors (ThreadPoolExecutor based) can either use a set of pre-created threads of a fixed size and a BlockingQueue of some size for any overflow or create threads up to a max size size if (and only if) that queue is full.

This leads to some surprising properties. For instance, as additional threads are only created once the queue's capacity is reached, using a LinkedBlockingQueue (which is unbounded) means that new threads will never get created, even if the current pool size is zero. If you use an ArrayBlockingQueue then the new threads are created only if it is full, and there is a reasonable likelihood that subsequent jobs will be rejected if the pool hasn't cleared space by then.

A SynchronousQueue has zero capacity so a producer blocks until a consumer is available, or a thread is created. This means that despite the impressive looking figures produced by @axtavt a cached thread pool generally has the worst performance from the producer's point of view.

Unfortunately there isn't currently a nice library version of a compromise implementation that will create threads during bursts or activity up to some maximum from a low minimum. You either have a growable pool or a fixed one. We have one internally, but it isn't ready for public consumption yet.

生生不灭 2024-10-26 03:48:25

缓存线程池按需创建线程。它需要一个队列,要么将任务传递给等待的消费者,要么失败。如果没有等待的消费者,它会创建一个新线程。 SynchronousQueue不保存元素,而是传递元素或失败。

The cache thread pool creates threads on demand. It needs a queue which either passes the task to a waiting consumer or fails. If there is no waiting consumer, it creates a new thread. SynchronousQueue does not hold an element, instead it passes on the element or fails.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文