仅当有等待线程时才notify_one是否正确?

发布于 2025-01-11 01:07:11 字数 1208 浏览 0 评论 0 原文

我已经多次看到以下类型的调度队列实现,其中将新元素推送到队列的线程仅在推送元素之前队列为空时才调用notify_one。这种情况会减少不必要的notify_one调用,因为在推送新元素之前q.size() != 0意味着只有活动线程(假设有多个消费者线程)。

#include <queue>
#include <condition_variable>
#include <mutex>

using Item = int;
std::queue<Item> q;
std::condition_variable cv;
std::mutex m;

void process(Item i){}

void pop() {
  while (true) {
    std::unique_lock<std::mutex> lock(m);
    // This thread releases the lock and start waiting.
    // When notified, the thread start trying to re-acquire the lock and exit wait().
    // During the attempt (and thus before `pop()`), can another `push()` call acquire the lock? 
    cv.wait(lock, [&]{return !q.empty();});
    auto item = q.front();
    q.pop();
    process(item);
  }
}

void push(Item i) {
  std::lock_guard<std::mutex> lock(m);
  q.push(i);
  if (q.size() == 1) cv.notify_one();
}

int main() { /* ... */ }

但是,以下情况可能吗?假设所有消费者线程都在等待。

  1. 推送线程获取锁并推送新元素并调用notify_one,因为队列为空。
  2. 被通知的线程尝试重新获取锁(并退出 wait()),
  3. 推送线程再次获取锁并在被通知的线程重新获取锁之前推送另一个元素。

在这种情况下,3.之后就不会发生notify_one调用,并且当队列不为空时,只有一个活动线程。

I've seen the following type of a dispatch queue implementation several times where the thread pushing a new element to the queue calls notify_one only when the queue was empty before pushing the element. This condition would reduce unnecessary notify_one calls because q.size() != 0 before pushing a new element means there are only active threads (assuming there are multiple consumer threads).

#include <queue>
#include <condition_variable>
#include <mutex>

using Item = int;
std::queue<Item> q;
std::condition_variable cv;
std::mutex m;

void process(Item i){}

void pop() {
  while (true) {
    std::unique_lock<std::mutex> lock(m);
    // This thread releases the lock and start waiting.
    // When notified, the thread start trying to re-acquire the lock and exit wait().
    // During the attempt (and thus before `pop()`), can another `push()` call acquire the lock? 
    cv.wait(lock, [&]{return !q.empty();});
    auto item = q.front();
    q.pop();
    process(item);
  }
}

void push(Item i) {
  std::lock_guard<std::mutex> lock(m);
  q.push(i);
  if (q.size() == 1) cv.notify_one();
}

int main() { /* ... */ }

However, is the following scenario possible ? Suppose that all the consumer threads are waiting.

  1. the pushing thread acquires the lock and push a new element and calls notify_one because the queue was empty.
  2. a notified thread tries to re-acquire the lock (and exit wait())
  3. the pushing thread acquires the lock again and pushes another element before a notified thread re-acquires the lock.

In this case, no notify_one call wouldn't occur after 3. and there would be only one active thread when the queue isn't empty.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

冬天旳寂寞 2025-01-18 01:07:12

对于唤醒后的操作

根据wake's doc

  1. 以原子方式解锁锁,阻塞当前正在执行的线程,并将其添加到等待 *this 的线程列表中。当执行notify_all()或notify_one()时,线程将被解除阻塞。也可能被虚假解锁。解锁后,无论何种原因,重新获取锁定并等待退出。

意味着您的代码在唤醒时在 #1#2 之间是原子的。无需担心同步,因为您的编译器应该处理它。

void pop() {
  while (true) {
    std::unique_lock<std::mutex> lock(m);

    // When sleep, cv release the lock
    cv.wait(lock, [&]{return !q.empty();});   // <----#1
    // When wake up, it acquires the lock
    auto item = q.front();
    q.pop();
    process(item);                            // <----#2
  }
}

对于虚假唤醒

第二个谓词重载将为您处理虚假唤醒。
模板<类谓词> void wait( std::unique_lock& 锁,谓词 stop_waiting ); (2) (C++11 起)

  • 相当于
  • while (!stop_waiting()) {
        wait(lock);
    }
    

    此重载可用于在等待特定条件成立时忽略虚假唤醒。

    注意,进入该方法之前必须获取锁,并且在 wait(lock) 退出后重新获取锁,这意味着锁可以用来保护对 stop_waiting() 的访问。

    这就是带有谓词的 condition_variable 重载可以让您避免虚假唤醒。
    不需要用于检查虚假唤醒的手工 while 循环。

    For the operation after wake up

    According to wake's doc

    1. Atomically unlocks lock, blocks the current executing thread, and adds it to the list of threads waiting on *this. The thread will be unblocked when notify_all() or notify_one() is executed. It may also be unblocked spuriously. When unblocked, regardless of the reason, lock is reacquired and wait exits.

    Means your code is atomic between #1 to #2 when it's woken up. Don't need to worry about the synchronization since your compiler should handle it.

    void pop() {
      while (true) {
        std::unique_lock<std::mutex> lock(m);
    
        // When sleep, cv release the lock
        cv.wait(lock, [&]{return !q.empty();});   // <----#1
        // When wake up, it acquires the lock
        auto item = q.front();
        q.pop();
        process(item);                            // <----#2
      }
    }
    

    For spurious wakeup

    The second overload with predicate would handle spurious wakeup for you.
    template< class Predicate > void wait( std::unique_lock<std::mutex>& lock, Predicate stop_waiting ); (2) (since C++11)

    1. Equivalent to
    while (!stop_waiting()) {
        wait(lock);
    }
    

    This overload may be used to ignore spurious awakenings while waiting for a specific condition to become true.

    Note that lock must be acquired before entering this method, and it is reacquired after wait(lock) exits, which means that lock can be used to guard access to stop_waiting().

    That is the condition_variable overload with predicate would save you from spurious wake up.
    The handmade while loop for checking spurious wake up is not needed.

    ~没有更多了~
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文