使用 ConcurrentLinkedQueue 的 Java 线程问题

发布于 2024-11-01 20:07:12 字数 1562 浏览 1 评论 0 原文

我对以下代码片段有疑问。它旨在处理添加到事件队列 (ConcurrentLinkedQueue) 的事件(通过调用 processEvent 方法提供)。事件被添加到事件队列中并在 run 方法中定期进行处理。

几乎总是一切都很好。但有时在调用 processEvent 方法后,当事件添加到队列中时,运行部分无法看到有新事件。

知道哪里出了问题吗?除了使用字符串常量作为锁的明显错误之外?

import java.util.concurrent.ConcurrentLinkedQueue;

public class MyCommunicator implements Runnable {

private ConcurrentLinkedQueue<MyEvent> eventQueue = null;

private boolean stopped = false;

private String lock = "";
private Thread thread = null;

public MyCommunicator() {

    eventQueue = new ConcurrentLinkedQueue<MyEvent>();
}

public void start() {
    thread = new Thread(this, "MyCommunicatorThread");
    thread.start();
}

public void stop() {
    stopped = true;
    synchronized (lock) {
        lock.notifyAll();
    }
    eventQueue.clear();
}

public void run() {
    while (!stopped) {
        try {

            MyEvent event = null;
            while (!stopped && ((event = eventQueue.peek()) != null)) {
                sendEvent(event);
                eventQueue.poll();
            }

            if (!stopped) {
                synchronized (lock) {
                    lock.wait(10000L);
                }
            }
        }

        catch (Exception e) {

        }
    }
}

/**
 * START EVENT JOB - ADD A NEW EVENT TO BE PROCESSED
 */
public void processEvent(MyEvent event) {
    eventQueue.offer(event);
    synchronized (lock) {
        lock.notifyAll();
    }
}

/**
 * END EVENT JOB
 */
private void sendEvent(MyEvent event) {
    // do send event job
}

}

I have a problem with the following code fragment. It's intended to handle events (provided via calls on processEvent method) which are added to an event queue (ConcurrentLinkedQueue). Events are added to an event queue and processed periodically in the run method.

All is fine almost always. But sometimes after a call on the processEvent method, when an event is added to the queue, the run part fails to see there is a new event.

Any idea on what is wrong? Besides the obvious mistake in using a String constant as a lock?

import java.util.concurrent.ConcurrentLinkedQueue;

public class MyCommunicator implements Runnable {

private ConcurrentLinkedQueue<MyEvent> eventQueue = null;

private boolean stopped = false;

private String lock = "";
private Thread thread = null;

public MyCommunicator() {

    eventQueue = new ConcurrentLinkedQueue<MyEvent>();
}

public void start() {
    thread = new Thread(this, "MyCommunicatorThread");
    thread.start();
}

public void stop() {
    stopped = true;
    synchronized (lock) {
        lock.notifyAll();
    }
    eventQueue.clear();
}

public void run() {
    while (!stopped) {
        try {

            MyEvent event = null;
            while (!stopped && ((event = eventQueue.peek()) != null)) {
                sendEvent(event);
                eventQueue.poll();
            }

            if (!stopped) {
                synchronized (lock) {
                    lock.wait(10000L);
                }
            }
        }

        catch (Exception e) {

        }
    }
}

/**
 * START EVENT JOB - ADD A NEW EVENT TO BE PROCESSED
 */
public void processEvent(MyEvent event) {
    eventQueue.offer(event);
    synchronized (lock) {
        lock.notifyAll();
    }
}

/**
 * END EVENT JOB
 */
private void sendEvent(MyEvent event) {
    // do send event job
}

}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(4

苹果你个爱泡泡 2024-11-08 20:07:12

为什么要使用锁和通知?

使用 LinkedBlockingQueue 代替并保存自己所有的麻烦。

通过 poll() 超时将完成您想要做的一切。


编辑:关于当前代码;

您需要定义“无法看到新事件”。您的 run() 方法每 10 秒查看一次队列;如果队列中有东西,它会“看到它”并将其拉出。

  • 如果您的意思是“它在收到通知时没有立即看到它,只有 10 秒后”,那么这很容易回答,因为您有一个竞争条件,很容易导致这种情况发生。当该线程完成检查/处理队列和获取锁之间时,可以将某些内容插入队列中。如果 wait() 没有超时,您将阻塞,直到插入下一个事件。如果在此期间调用 stop() 方法,您将丢失队列中的所有事件。使用 LinkedBlockingQueue 而不是所有不必要的锁定和通知可以解决此问题。这不是一个“简单”的解决方案,它是针对此用例和问题的正确解决方案。

  • 如果情况并非如此,那么您根本没有向队列中插入任何内容,问题出在您未在此处发布的代码中。在不了解该代码的情况下猜测您正在尝试在 eventQueue.offer(event) 处插入 null MyEvent。因为你没有尝试/捕获 offer() 你不会知道它。忽略所有异常并且不检查返回值既不是一个好主意,也不是一个好做法。

  • 第三种可能性是您在某处有一些其他代码锁定了同一个确切的内部字符串文字引用,这将导致该代码挂起。你提到了它,但我会在这里重申 - 这是一件非常糟糕的事情,特别是考虑到它是空字符串。 java.util.concurrent 包提供了真正的锁定条件(如果您坚持在此处使用它们)。请注意,这仍然无法消除您有时会错过某个事件 10 秒的竞争条件,但它至少会更干净。为了消除竞争条件,您需要放弃并发队列而使用常规队列,并在访问它之前简单地获取锁(以及获取插入锁)。这将同步您的线程,因为插入器将被阻止插入,除非该线程正在等待锁定条件。在同一代码块中混合使用锁和无锁方法来进行线程同步通常会导致这些问题。

Why are you using locks and notifications?

Use a LinkedBlockingQueue instead and save yourself all the hassle.

That with a timeout on the poll() will accomplish everything you're trying to do.


Edit: In regard to the current code;

You would need to define "fails to see there is a new event". Your run() method looks at the queue every 10 seconds; if there's something in the queue it'll "see it" and pull it out.

  • If you mean "It doesn't see it immediately when notified, only 10 seconds later", then that's fairly easy to answer as you have a race condition which could easily cause that to occur. Something can be inserted into the queue while this thread is between when it has finished checking/processing the queue and acquiring the lock. Without a timeout on wait() you would block until the next event was inserted. If the stop() method was calling during this time, you'd lose any events in the queue. Using the LinkedBlockingQueue rather than all the unnecessary locking and notifying solves this problem. This isn't an "easy" solution, it's the correct one for this use case and problem.

  • If that's not the case, then you're simply not inserting anything into the queue and the problem lies in code you didn't post here. A guess without knowing anything about that code would be that you're attempting to insert a null MyEvent at eventQueue.offer(event). Since you aren't try/catch'ing offer() you wouldn't know it. Ignoring all exceptions and not checking returned values is neither a good idea or practice.

  • A third possibility would be that you have some other code somewhere locking on the same exact interned string literal reference which would cause this code to hang. You mention it but I'll reiterate here - that's a REALLY bad thing to be doing, especially given that it's the empty string. The java.util.concurrent package provides real locks with conditions if you insist on using them here. Note that this will still not eliminate the race condition you have in regard to sometimes missing an event for 10 seconds, but it'll at least be cleaner. To eliminate your race condition you'd want to ditch the concurrent queue for a regular one and simply acquire the lock before accessing it (as well as acquiring the lock for inserts). This will synchronize your threads as an inserter will be prevented from inserting unless this thread is waiting on the lock condition. Mixing lock and lock-free approaches to thread synchronization in the same chunk of code will often lead to these issues.

金橙橙 2024-11-08 20:07:12

您遇到了所谓的丢失信号。您轮询队列,然后等待监视器(获取锁定)。生产者线程添加事件,然后调用notifyAll()(获取锁)。事件排队/轮询和条件等待/通知之间不存在happens-before关系。

因此,线程 A 可以在空时进行轮询,然后尝试获取锁,同时线程 B 添加一个元素并获取锁,通知所有等待线程然后释放锁。然后线程 A 获取锁并等待它,但信号已丢失。

由于您纯粹使用锁来发送信号,因此您可能会考虑另一种机制,例如可重复使用的锁存器,例如 Doug Lea 的新 jdk7 Phaser,或者直接使用BlockingQueue

或者,我们有几个 ReusableLatch 例如 BooleanLatch 用于单个读取器线程或 PhasedLatch 用于多方支持。

You have what's known as a missed signal. You poll the queue and then wait on the monitor (taking the lock). The producer threads add events and then call notifyAll() (taking the lock). There is no happens-before relationship between event queuing/poll and the conditional await/notification.

It is therefore possible for thread A to poll while empty and then try to acquire the lock, meanwhile thread B adds an element and acquires the lock, notifying all awaiting threads then releasing the lock. Thread A then acquires the lock and awaits it, but the signal has been missed.

As you are using the lock purely for signalling, you might consider another mechanism such as a reusable latch like Doug Lea's new jdk7 Phaser, or just use a BlockingQueue directly.

Alternatively we have a couple of ReusableLatch such as a BooleanLatch for a single reader thread or a PhasedLatch for multi-party support.

愛上了 2024-11-08 20:07:12

乍一看没有什么特别的想法,但在您不知情的情况下,许多事情都可能出错,因为:

    catch (Exception e) {

    }

捕获任何 Exception 的处理程序(其中包括 RuntimeException 及其各种子类)然后忽略它通常是一个坏主意。如果这是为了捕获特定类型的异常(例如,可能由 lock.wait() 引发的 InterruptedException),那么您应该限制它到该异常类型。如果您出于某种原因捕获任何异常,那么您至少应该在异常发生时记录一些内容。

No particular idea on first glance, but any number of things could be going wrong without your knowledge, because of this:

    catch (Exception e) {

    }

A handler that catches any Exception (which include RuntimeException and its various subclasses) and then ignores it is generally a bad idea. If this is meant to catch a specific type of exception (like, say, the InterruptedException that can probably be thrown by lock.wait()), then you should limit it to that exception type. If you have some reason for catching any exception, then you should at least log something when an exception occurs.

天暗了我发光 2024-11-08 20:07:12

我在使用 ConcurrentLinkedQueue 时遇到的一个问题,我真的怀疑这是一个真正的错误,因为它并不是真正完全同步的证明。

我还没有完全测试过这一点,但我查看了代码,我很确定如果队列实际上是空的,.isEmpty() 不会同步。当一个线程调用 .isEmpty() 并返回 true 时,队列可能已经包含元素。

An issue I had with ConcurrentLinkedQueue that i really suspect to be a genuine bug, in that it's not really full proof synchronized.

I haven't fully tested this yet, but i looked at the code and I am quite sure that .isEmpty() is not synchronized if the queue actually is empty. While one thread invokes .isEmpty() and true is returned, the queue might already contain elements.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文