Java中的并发和阻塞队列

发布于 2024-07-30 17:53:14 字数 755 浏览 2 评论 0原文

我遇到一个线程将事件推送到第二个线程的传入队列的经典问题。 只是这一次,我对表演很感兴趣。 我想要实现的是:

  • 我想要并发访问队列,生产者推送,接收者弹出。
  • 当队列为空时,我希望消费者阻塞到队列,等待生产者。

我的第一个想法是使用 LinkedBlockingQueue,但我很快意识到它不是并发的并且性能受到影响。 另一方面,我现在使用 ConcurrentLinkedQueue,但我仍然要为每个发布支付 wait() / notify() 的成本。 由于消费者在发现空队列时不会阻塞,因此我必须同步并在锁上 wait() 。 另一方面,生产者必须在每次发布时获得该锁和 notify()。 总的结果是我付出的成本 同步(锁定){lock.notify()} 在每个发布中,即使不需要。

我想这里需要的是一个既阻塞又并发的队列。 我想象 push() 操作就像在 ConcurrentLinkedQueue 中一样工作,当推送的元素是第一个元素时,向对象添加一个额外的 notify()在列表中。 我认为 ConcurrentLinkedQueue 中已经存在这样的检查,因为推送需要与下一个元素连接。 因此,这比每次在外部锁上同步要快得多。

这样的东西可用/合理吗?

I have the classic problem of a thread pushing events to the incoming queue of a second thread. Only this time, I am very interested about performance. What I want to achieve is:

  • I want concurrent access to the queue, the producer pushing, the receiver poping.
  • When the queue is empty, I want the consumer to block to the queue, waiting for the producer.

My first idea was to use a LinkedBlockingQueue, but I soon realized that it is not concurrent and the performance suffered. On the other hand, I now use a ConcurrentLinkedQueue, but still I am paying the cost of wait() / notify() on each publication. Since the consumer, upon finding an empty queue, does not block, I have to synchronize and wait() on a lock. On the other part, the producer has to get that lock and notify() upon every single publication. The overall result is that I am paying the cost of
sycnhronized (lock) {lock.notify()} in every single publication, even when not needed.

What I guess is needed here, is a queue that is both blocking and concurrent. I imagine a push() operation to work as in ConcurrentLinkedQueue, with an extra notify() to the object when the pushed element is the first in the list. Such a check I consider to already exist in the ConcurrentLinkedQueue, as pushing requires connecting with the next element. Thus, this would be much faster than synchronizing every time on the external lock.

Is something like this available/reasonable?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(6

浅暮の光 2024-08-06 17:53:14

我认为无论您有什么疑问,您都可以坚持使用 java.util.concurrent.LinkedBlockingQueue。 它是并发的。 不过,我不知道它的性能。 也许,BlockingQueue 的其他实现会更适合您。 它们并不太多,因此请进行性能测试和测量。

I think you can stick to java.util.concurrent.LinkedBlockingQueue regardless of your doubts. It is concurrent. Though, I have no idea about its performance. Probably, other implementation of BlockingQueue will suit you better. There's not too many of them, so make performance tests and measure.

猫七 2024-08-06 17:53:14

与此答案类似 https://stackoverflow.com/a/1212515/1102730 但有点不同..我最终使用ExecutorService。 您可以使用 Executors.newSingleThreadExecutor() 实例化一个。 我需要一个并发队列来读取/写入 BufferedImages 到文件,以及读取和写入的原子性。 我只需要一个线程,因为文件 IO 比源网络 IO 快几个数量级。 另外,我更关心操作的原子性和正确性而不是性能,但这种方法也可以通过池中的多个线程来完成,以加快速度。

获取图像(Try-Catch-Finally 省略):

Future<BufferedImage> futureImage = executorService.submit(new Callable<BufferedImage>() {
    @Override
        public BufferedImage call() throws Exception {
            ImageInputStream is = new FileImageInputStream(file);
            return  ImageIO.read(is);
        }
    })

image = futureImage.get();

保存图像(Try-Catch-Finally 省略):

Future<Boolean> futureWrite = executorService.submit(new Callable<Boolean>() {
    @Override
    public Boolean call() {
        FileOutputStream os = new FileOutputStream(file); 
        return ImageIO.write(image, getFileFormat(), os);  
    }
});

boolean wasWritten = futureWrite.get();

需要注意的是,您应该在finally 块中刷新并关闭流。 我不知道它与其他解决方案相比表现如何,但它非常通用。

Similar to this answer https://stackoverflow.com/a/1212515/1102730 but a bit different.. I ended up using an ExecutorService. You can instantiate one by using Executors.newSingleThreadExecutor(). I needed a concurrent queue for reading/writing BufferedImages to files, as well as atomicity with reads and writes. I only need a single thread because the file IO is orders of magnitude faster than the source, net IO. Also, I was more concerned about atomicity of actions and correctness than performance, but this approach can also be done with multiple threads in the pool to speed things up.

To get an image (Try-Catch-Finally omitted):

Future<BufferedImage> futureImage = executorService.submit(new Callable<BufferedImage>() {
    @Override
        public BufferedImage call() throws Exception {
            ImageInputStream is = new FileImageInputStream(file);
            return  ImageIO.read(is);
        }
    })

image = futureImage.get();

To save an image (Try-Catch-Finally omitted):

Future<Boolean> futureWrite = executorService.submit(new Callable<Boolean>() {
    @Override
    public Boolean call() {
        FileOutputStream os = new FileOutputStream(file); 
        return ImageIO.write(image, getFileFormat(), os);  
    }
});

boolean wasWritten = futureWrite.get();

It's important to note that you should flush and close your streams in a finally block. I don't know about how it performs compared to other solutions, but it is pretty versatile.

一百个冬季 2024-08-06 17:53:14

我建议您查看 ThreadPoolExecutor 新的SingleThreadExecutor。 它将处理为您安排任务的顺序,如果您提交 Callables 给你的执行者,你也将能够获得你正在寻找的阻塞行为。

I would suggest you look at ThreadPoolExecutor newSingleThreadExecutor. It will handle keeping your tasks ordered for you, and if you submit Callables to your executor, you will be able to get the blocking behavior you are looking for as well.

南街女流氓 2024-08-06 17:53:14

您可以尝试来自 jsr166 的 LinkedTransferQueue: http:// gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/jsr166y/

它满足您的要求,并且提供/投票操作的开销更少。
从代码中可以看出,当队列不为空时,它使用原子操作来轮询元素。 当队列为空时,它会旋转一段时间,如果不成功则停放线程。
我认为这对你的情况有帮助。

You can try LinkedTransferQueue from jsr166: http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/jsr166y/

It fulfills your requirements and have less overhead for offer/poll operations.
As I can see from the code, when the queue is not empty, it uses atomic operations for polling elements. And when the queue is empty, it spins for some time and park the thread if unsuccessful.
I think it can help in your case.

夏雨凉 2024-08-06 17:53:14

每当我需要将数据从一个线程传递到另一个线程时,我都会使用 ArrayBlockingQueue。 使用 put 和 take 方法(如果满/空将会阻塞)。

I use the ArrayBlockingQueue whenever I need to pass data from one thread to another. Using the put and take methods (which will block if full/empty).

红尘作伴 2024-08-06 17:53:14

这是一个 列表实现 BlockingQueue 的类。

我建议查看 SynchronousQueue

就像 @Rorick 在他的评论中提到的那样,我相信所有这些实现都是并发的。 我认为您对 LinkedBlockingQueue 的担忧可能是不恰当的。

Here is a list of classes implementing BlockingQueue.

I would recommend checking out SynchronousQueue.

Like @Rorick mentioned in his comment, I believe all of those implementations are concurrent. I think your concerns with LinkedBlockingQueue may be out of place.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文