java BlockingQueue没有阻塞查看?

发布于 2024-08-11 05:58:51 字数 851 浏览 16 评论 0 原文

我有一个对象阻塞队列。

我想编写一个线程,该线程会阻塞直到队列中有一个对象。与BlockingQueue.take()提供的功能类似。

但是,由于我不知道是否能够成功处理该对象,因此我只想 peek() 而不是删除该对象。仅当我能够成功处理该对象时,我才想删除该对象。

所以,我想要一个阻塞 peek() 函数。目前,根据 javadoc,peek() 仅在队列为空时返回。

我错过了什么吗?还有其他方法可以实现此功能吗?

编辑:

如果我只是使用线程安全队列并偷看并睡觉,有什么想法吗?

public void run() {
    while (!exit) {
        while (queue.size() != 0) {
            Object o =  queue.peek();
            if (o != null) {
                if (consume(o) == true) {
                    queue.remove();
                } else {
                    Thread.sleep(10000); //need to backoff (60s) and try again
                }
            }
        }
        Thread.sleep(1000); //wait 1s for object on queue
    }
}

请注意,我只有一个消费者线程和一个(单独的)生产者线程。我想这不如使用 BlockingQueue 有效......任何评论表示赞赏。

I have a blocking queue of objects.

I want to write a thread that blocks till there is a object on the queue. Similar to the functionality provided by BlockingQueue.take().

However, since I do not know if I will be able to process the object successfully, I want to just peek() and not remove the object. I want to remove the object only if I am able to process it successfully.

So, I would like a blocking peek() function. Currently, peek() just returns if the queue is empty as per the javadocs.

Am I missing something? Is there another way to achieve this functionality?

EDIT:

Any thoughts on if I just used a thread safe queue and peeked and slept instead?

public void run() {
    while (!exit) {
        while (queue.size() != 0) {
            Object o =  queue.peek();
            if (o != null) {
                if (consume(o) == true) {
                    queue.remove();
                } else {
                    Thread.sleep(10000); //need to backoff (60s) and try again
                }
            }
        }
        Thread.sleep(1000); //wait 1s for object on queue
    }
}

Note that I only have one consumer thread and one (separate) producer thread. I guess this isn't as efficient as using a BlockingQueue... Any comments appreciated.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(8

凉栀 2024-08-18 05:58:51

您可以使用 LinkedBlockingDeque 并在物理上从队列中删除该项目(使用 takeLast()),但如果使用 putLast(E e) 处理失败,则在队列末尾再次替换该项目代码>.同时,您的“生产者”将使用 putFirst(E e) 将元素添加到队列的前面

您始终可以将此行为封装在您自己的 Queue 实现中,并提供一个 blockingPeek() 方法来执行 takeLast() ,然后执行 putLast () 在底层 LinkedBlockingDeque 的幕后。因此,从调用客户端的角度来看,该元素永远不会从队列中删除。

You could use a LinkedBlockingDeque and physically remove the item from the queue (using takeLast()) but replace it again at the end of the queue if processing fails using putLast(E e). Meanwhile your "producers" would add elements to the front of the queue using putFirst(E e).

You could always encapsulate this behaviour within your own Queue implementation and provide a blockingPeek() method that performs takeLast() followed by putLast() behind the scenes on the underlying LinkedBlockingDeque. Hence from the calling client's perspective the element is never removed from your queue.

难如初 2024-08-18 05:58:51

但是,由于我不知道是否能够成功处理该对象,因此我只想 peek() 而不是删除该对象。仅当我能够成功处理该对象时,我才想删除该对象。

一般来说,它不是线程安全的。如果在您 peek() 并确定可以成功处理该对象之后,但在 take() 删除并处理该对象之前,另一个线程获取该对象,该怎么办?

However, since I do not know if I will be able to process the object successfully, I want to just peek() and not remove the object. I want to remove the object only if I am able to process it successfully.

In general, it is not thread-safe. What if, after you peek() and determine that the object can be processed successfully, but before you take() it to remove and process, another thread takes that object?

◇流星雨 2024-08-18 05:58:51

您是否还可以将事件监听器队列添加到阻塞队列,然后当将某些内容添加到(阻塞)队列时,将事件发送给您的监听器?您可以让线程阻塞,直到调用它的 actionPerformed 方法。

Could you also just add an event listener queue to your blocking queue, then when something is added to the (blocking)queue, send an event off to your listeners? You could have your thread block until it's actionPerformed method was called.

翻身的咸鱼 2024-08-18 05:58:51

我唯一知道的是 Apache Commons Collections 中的 nofollow noreferrer">BlockingBuffer:

如果调用 get 或 remove
一个空的 Buffer,调用线程
等待添加或添加的通知
addAll操作已完成。

get() 相当于 peek(),并且可以通过修饰使 Buffer 表现得像 BlockingQueue一个 UnboundedFifoBuffer 阻塞缓冲区

The only thing I'm aware of that does this is BlockingBuffer in Apache Commons Collections:

If either get or remove is called on
an empty Buffer, the calling thread
waits for notification that an add or
addAll operation has completed.

get() is equivalent to peek(), and a Buffer can be made to act like BlockingQueue by decorating a UnboundedFifoBuffer with a BlockingBuffer

随风而去 2024-08-18 05:58:51

本身不是答案,但是: JDK-6653412 声称这不是有效的用例。

Not an answer per se, but: JDK-6653412 claims this is not a valid use case.

征﹌骨岁月お 2024-08-18 05:58:51

简单的回答是,实际上没有办法进行阻塞查看,除非您自己使用阻塞查看()来实现阻塞队列。

我错过了什么吗?

peek() 可能会带来并发问题 -

  • 如果您无法处理您的 peek() 消息 - 它将被留在队列中,除非您有多个消费者。
  • 如果您无法处理该对象,谁将把该对象从队列中取出?
  • 如果您有多个消费者,您会在 peek() 和另一个也在处理项目的线程之间出现竞争条件,从而导致重复处理或更糟。

听起来你最好实际删除该项目并使用
责任链模式

编辑:回复:你的最后一个例子:如果你只有 1 个消费者,你永远不会删除队列中的对象 - 除非它同时更新 - 在这种情况下,你最好非常小心线程安全,并且可能不应该将项目放入无论如何都要排队。

The quick answer is, not there's not really a way have a blocking peek, bar implementing a blocking queue with a blocking peek() yourself.

Am I missing something?

peek() can be troublesome with concurrency -

  • If you can't process your peek()'d message - it'll be left in the queue, unless you have multiple consumers.
  • Who is going to get that object out of the queue if you can't process it ?
  • If you have multiple consumers, you get a race condition between you peek()'ing and another thread also processing items, resulting in duplicate processing or worse.

Sounds like you might be better off actually removing the item and process it using a
Chain-of-responsibility pattern

Edit: re: your last example: If you have only 1 consumer, you will never get rid of the object on the queue - unless it's updated in the mean time - in which case you'd better be very very careful about thread safety and probably shouldn't have put the item in the queue anyway.

香橙ぽ 2024-08-18 05:58:51

看起来 BlockingQueue 本身没有您指定的功能。

不过,我可能会尝试重新思考一下问题:对于无法“正确处理”的对象,您会怎么做?如果您只是将它们留在队列中,则必须在某个时候将它们拉出来并处理它们。我建议要么弄清楚如何处理它们(通常,如果 queue.get() 给出任何类型的无效或错误值,您可能可以将其放在地板上)或选择不同的数据结构先进先出。

Looks like BlockingQueue itself doesn't have the functionality you're specifying.

I might try to re-frame the problem a little though: what would you do with objects you can't "process correctly"? If you're just leaving them in the queue, you'll have to pull them out at some point and deal with them. I'd reccommend either figuring out how to process them (commonly, if a queue.get() gives any sort of invalid or bad value, you're probably OK to just drop it on the floor) or choosing a different data structure than a FIFO.

自我难过 2024-08-18 05:58:51

“最简单”的解决方案

在成功处理上一个元素之前,不要处理下一个元素。

public void run() {

Object lastSuccessfullyProcessedElement = null;

    while (!exit) {
        Object obj =  lastSuccessfullyProcessedElement == null ? queue.take() : lastSuccessfullyProcessedElement; // blocking
        
        boolean successful = process(obj);
        
        if(!successful) {
            lastSuccessfullyProcessedElement = obj;
        } else {
            lastSuccessfullyProcessedElement = null;
        }
    }
}
  1. 调用peek()并检查是否值为 null 时 CPU 效率不高。

当以下程序的队列为空时,我发现系统上的 CPU 使用率达到 10%。

while (true) {
   Object o = queue.peek();
   if(o == null) continue;
   // omitted for the sake of brevity
}
  1. 添加 sleep() 会增加速度。

  2. 使用putLast将其添加回队列会扰乱顺序。而且,这是一个阻塞操作,需要锁。

The 'simplest' solution

Do not process the next element until the previous element is processed succesfully.

public void run() {

Object lastSuccessfullyProcessedElement = null;

    while (!exit) {
        Object obj =  lastSuccessfullyProcessedElement == null ? queue.take() : lastSuccessfullyProcessedElement; // blocking
        
        boolean successful = process(obj);
        
        if(!successful) {
            lastSuccessfullyProcessedElement = obj;
        } else {
            lastSuccessfullyProcessedElement = null;
        }
    }
}
  1. Calling peek() and checking if the value is null is not CPU efficient.

I have seen CPU usage going to 10% on my system when the queue is empty for the following program.

while (true) {
   Object o = queue.peek();
   if(o == null) continue;
   // omitted for the sake of brevity
}
  1. Adding sleep() adds slowness.

  2. Adding it back to the queue using putLast will disturb the order. Moreover, it is a blocking operation which requires locks.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文