(如何)我可能会“错过信号”吗?使用这个 ConcurrentLinkedQueue 和 sleep()?

发布于 2024-11-05 14:40:34 字数 1169 浏览 1 评论 0原文

在我的 Java 应用程序中,多个线程将数据放入队列中,另一个线程(只有一个)从中获取对象并分派它们。

有时,使用线程似乎没有注意到新项目已添加到队列中,因为指示轮询的日志消息不再出现。来自生产线程的日志消息表明这些项目确实到达。谷歌搜索告诉我,这似乎被称为“丢失信号”。由于我既不等待也不使用锁,因此我不确定这是否适用于我。

真正让我困惑的是,当我中断消费者线程时,它会处理队列中的所有项目,然后再次保持沉默,而不会退出。

这是消费者线程的主循环,生产者线程执行诸如从套接字读取之类的操作,它们唯一的共同点是它们在消费者轮询的队列上使用 add()

public class FieldFrontService
{
    // ...

    private ConcurrentLinkedQueue<Transmission> _qRec;

    // ...

    private Runnable _createReceptionist()
    {
        return new Runnable() {
            @Override
            public void run()
            {
                while (!Thread.currentThread().isInterrupted()) {
                    Transmission tx = FieldFrontService.this._qRec.poll();
                    if (null == tx) {
                        try {
                            Thread.sleep(250);
                        } catch (InterruptedException e) {
                            break;
                        }
                    } else {
                        FieldFrontService.this._receiveTransmission(tx);
                    }
                }
            }
        }

    // ...
}

In my Java application, several threads put data in a queue from which another thread (just one) takes objects and dispatches them.

Occasionally, the consuming thread seems to not notice that new items have been added to the queue, as the log messages indicating a poll cease to appear. Log messages from the producing threads indicate that these items do indeed arrive. Googling some taught me that this seems to be known as "missed signal". Since I'm neither waiting nor using locks, I'm not sure if this applies to me.

What really puzzles me is that when I interrupt the consumer thread, it processes all items in the queue and then stays silent again, without ever exiting.

Here's the main loop of the consumer thread, the producer threads do stuff like reading from sockets and their only common thing is that they use add() on the queue from which the consumer is polling.

public class FieldFrontService
{
    // ...

    private ConcurrentLinkedQueue<Transmission> _qRec;

    // ...

    private Runnable _createReceptionist()
    {
        return new Runnable() {
            @Override
            public void run()
            {
                while (!Thread.currentThread().isInterrupted()) {
                    Transmission tx = FieldFrontService.this._qRec.poll();
                    if (null == tx) {
                        try {
                            Thread.sleep(250);
                        } catch (InterruptedException e) {
                            break;
                        }
                    } else {
                        FieldFrontService.this._receiveTransmission(tx);
                    }
                }
            }
        }

    // ...
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

忆沫 2024-11-12 14:40:34

也许您最好使用结合了线程池和队列的 ExecutorService。它也有效。 ;)

public class FieldFrontService {
    private final ExecutorService executor = Executors.newSingleThreadExecutor();

    void receiveTransmission(final Transmission tx) {
        executor.execute(new Runnable() {
            public void run() {
                FieldFrontService.this._receiveTransmission(tx);
            }
        });
    }
}

Perhaps you are better off using an ExecutorService which combines a Thread pool and a Queue. Also it works. ;)

public class FieldFrontService {
    private final ExecutorService executor = Executors.newSingleThreadExecutor();

    void receiveTransmission(final Transmission tx) {
        executor.execute(new Runnable() {
            public void run() {
                FieldFrontService.this._receiveTransmission(tx);
            }
        });
    }
}
旧瑾黎汐 2024-11-12 14:40:34

我认为你的 InterruptedException 正在你的调用堆栈中的某个地方被吃掉。

这种模式是我所关心的:

try {                             
    Thread.sleep(250);
} catch (InterruptedException e) {
    break;
} 

如果您在代码中使用类似的模式,则线程中断状态会丢失。例如,如果在调用堆栈中使用了 Thread.interrupted(),则将 重置线程的中断状态。因此,您的 while 循环将永远不会退出:状态被重置为 false。

这两行是需要关注的地方:

Transmission tx = FieldFrontService.this._qRec.poll();
// ....
FieldFrontService.this._receiveTransmission(tx); 

如果其中任何一个调用调用了 Thread.currentThread().interrupted(),您将丢失 isInterrupted 布尔值。我会检查这两种方法的实现,以防异常/状态被消耗。

我对 InterruptedExceptions 的处理是不同的。我更喜欢重新抛出中断:

try {                             
    Thread.sleep(250);
} catch (InterruptedException e) {
    Thread.currentThread().interrupt();
} 

这样,我可以确保我的中断不可能丢失。基本上,我想说的是,不,说真的,停止线程停止!

I think your InterruptedException is getting eaten somewhere in your call stack.

This pattern is what concerns me:

try {                             
    Thread.sleep(250);
} catch (InterruptedException e) {
    break;
} 

If you are using a similar pattern in your code, that the thread interrupted state was lost. For example, if Thread.interrupted() were used in your call stack, that will reset the interrupted status of your thread. As a result, your while loop will never exit: the status was reset to false.

These two lines are the points of concern:

Transmission tx = FieldFrontService.this._qRec.poll();
// ....
FieldFrontService.this._receiveTransmission(tx); 

If either of those calls makes a call to Thread.currentThread().interrupted(), you will lose the isInterrupted boolean. I would look over the implementation of both of those methods in case the Exception / state is being consumed.

My handling of InterruptedExceptions is different. I prefer to rethrow the interrupt:

try {                             
    Thread.sleep(250);
} catch (InterruptedException e) {
    Thread.currentThread().interrupt();
} 

That way, I ensure that there is no possibility that my interrupt was lost. Basically, I'm trying to say, no, seriously, STOP Thread STOP!

苏璃陌 2024-11-12 14:40:34

难道不是因为break语句退出了while循环,导致线程停止了?如果删除break语句会发生什么?或者这就是你所期望的?

我建议线程看不到新项目,因为线程已经由于您没有预料到的异常而停止。

Is it not because the break statement exits the while loop, causing the thread to stop? What happens if you remove the break statement? Or is that what you expect?

I'm suggesting the thread doesn't see the new items, because the thread has already stopped due to an exception you didn't expect.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文