STD ::队列生产者/消费者的最小静音

发布于 2025-01-29 04:35:23 字数 1031 浏览 1 评论 0原文

我有两个线程可以使用std :: queue的生​​产者和消费者方面。队列不经常满,所以我想避免消费者抓住正在守卫队列的静音的静音。

可以调用 empty()在穆特克斯外面,然后仅在队列中有东西时抓住utex?

例如:

struct MyData{
   int a;
   int b;
};

class SpeedyAccess{
public:
   void AddDataFromThread1(MyData data){
      const std::lock_guard<std::mutex> queueMutexLock(queueAccess);
      workQueue.push(data);
   }

   void CheckFromThread2(){
      if(!workQueue.empty()) // Un-protected access...is this dangerous?
      {
         queueAccess.lock();
         MyData data = workQueue.front();
         workQueue.pop();
         queueAccess.unlock();

         ExpensiveComputation(data);
       }

   }

private:
   void ExpensiveComputation(MyData& data);

   std::queue<MyData> workQueue;
   std::mutex queueAccess;
}

线程2的检查和不是特别关键时期的检查,但会被称为 local (500/sec?)。线程1非常关键,很多东西需要在那里运行,但并不频繁地调用(最大20/秒)。

如果我围绕空()添加一个静音罩,如果在线程2到达时队列为空,则不会长时间持有静音,因此可能不会很大。但是,由于它被如此频繁地打电话,因此偶尔可能同时发生某些事情是在试图放在背上的事情....这会导致线程1中的大量等待吗?

I have two threads that work the producer and consumer sides of a std::queue. The queue isn't often full, so I'd like to avoid the consumer grabbing the mutex that is guarding mutating the queue.

Is it okay to call empty() outside the mutex then only grab the mutex if there is something in the queue?

For example:

struct MyData{
   int a;
   int b;
};

class SpeedyAccess{
public:
   void AddDataFromThread1(MyData data){
      const std::lock_guard<std::mutex> queueMutexLock(queueAccess);
      workQueue.push(data);
   }

   void CheckFromThread2(){
      if(!workQueue.empty()) // Un-protected access...is this dangerous?
      {
         queueAccess.lock();
         MyData data = workQueue.front();
         workQueue.pop();
         queueAccess.unlock();

         ExpensiveComputation(data);
       }

   }

private:
   void ExpensiveComputation(MyData& data);

   std::queue<MyData> workQueue;
   std::mutex queueAccess;
}

Thread 2 does the check and isn't particularly time-critical, but will get called a lot (500/sec?). Thread 1 is very time critical, a lot of stuff needs to run there, but isn't called as frequently (max 20/sec).

If I add a mutex guard around empty(), if the queue is empty when thread 2 comes, it won't hold the mutex for long, so might not be a big hit. However, since it gets called so frequently, it might occasionally happen at the same time something is trying to get put on the back....will this cause a substantial amount of waiting in thread 1?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

≈。彩虹 2025-02-05 04:35:23

如上所述,您应仅在锁定下调用empty()

但是我相信有一种更好的方法。
您可以使用 std :: prenti其condition_variable 一起使用 std :: mutex ,以实现同步访问队列的访问,而无需锁定静音的情况。

但是 - 使用std :: condition_variable时,您必须意识到它遭受 spurious Wakeups 。您可以在此处阅读有关它的信息: spurious wakeup -wikipedia -wikipedia

您可以在此处查看一些代码示例:条件变量示例

下面说明了使用std :: procenty_variable的正确方法(带有一些注释)。
这只是显示原则的一个最小例子。

#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <iostream>

using MyData = int;

std::mutex mtx;
std::condition_variable cond_var;
std::queue<MyData> q;

void producer()
{
    MyData produced_val = 0;
    while (true)
    {
        std::this_thread::sleep_for(std::chrono::milliseconds(1000));   // simulate some pause between productions
        ++produced_val;
        std::cout << "produced: " << produced_val << std::endl;
        {
            // Access the Q under the lock:
            std::unique_lock<std::mutex> lck(mtx);
            q.push(produced_val);
            cond_var.notify_all();  // It's not a must to nofity under the lock but it might be more efficient (see @DavidSchwartz's comment below).
        }
    }
}

void consumer()
{
    while (true)
    {
        MyData consumed_val;
        {
            // Access the Q under the lock:
            std::unique_lock<std::mutex> lck(mtx);
            // NOTE: The following call will lock the mutex only when the the condition_varible will cause wakeup
            //       (due to `notify` or spurious wakeup).
            //       Then it will check if the Q is empty.
            //       If empty it will release the lock and continue to wait. 
            //       If not empty, the lock will be kept until out of scope.
            //       See the documentation for std::condition_variable.
            cond_var.wait(lck, []() { return !q.empty(); }); // will loop internally to handle spurious wakeups
            consumed_val = q.front();
            q.pop();
        }
        std::cout << "consumed: " << consumed_val << std::endl;
        std::this_thread::sleep_for(std::chrono::milliseconds(200));    // simulate some calculation
    }
}

int main()
{
    std::thread p(producer);
    std::thread c(consumer);
    while(true) {}
    p.join(); c.join(); // will never happen in our case but to remind us what is needed.
    return 0;
}

一些注释:

  1. 在您的真实代码中,任何线程都不应永远运行。您应该有一些机制来通知他们优雅退出。
  2. 全局变量(mtxq等)最好是某些上下文类的成员,或传递给producer() and <代码>消费者()作为参数。
  3. 为简单起见,此示例假设生产商的生产率总是相对低于消费者的速度。在您的真实代码中,您可以通过使消费者每次发出condition_variable时提取Q中的所有元素来使其更加笼统。
  4. 您可以使用sleep_for时间来“播放”生产者和消费者测试VARIOS时序案例的时间。
  5. 对于从移动语义中受益的更复杂的数据类型,您可以更改:
      opted_val = q.front();
     

    到:

      opted_val = std :: move(q.front());
     

    为了提高效率(由于队列中的正面元素是pop之后,无论如何)。

  6. 上面的代码假定mydata是默认可构造的。
    如果不是这种情况,您可以在消费者的环体中使用立即调用的lambda

      mydata convented_val = 
         [](){std :: simolor_lock&lt; std :: mutex&gt; LCK(MTX);
               cond_var.wait(lck,[](](){return!q.ementy();}); 
               mydata val = std :: move(q.front()); 
               q.pop(); 
               返回阀; }
         (); //&lt;  - 立即调用lambda初始化消费_VAL 
     

    使用std :: Move与NRVO一起使用效率。

As written in the comments above, you should call empty() only under a lock.

But I believe there is a better way to do it.
You can use a std::condition_variable together with a std::mutex, to achieve synchronization of access to the queue, without locking the mutex more than you must.

However - when using std::condition_variable, you must be aware that it suffers from spurious wakeups. You can read about it here: Spurious wakeup - Wikipedia.
You can see some code examples here: Condition variable examples.

The correct way to use a std::condition_variable is demonstrated below (with some comments).
This is just a minimal example to show the principle.

#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <iostream>

using MyData = int;

std::mutex mtx;
std::condition_variable cond_var;
std::queue<MyData> q;

void producer()
{
    MyData produced_val = 0;
    while (true)
    {
        std::this_thread::sleep_for(std::chrono::milliseconds(1000));   // simulate some pause between productions
        ++produced_val;
        std::cout << "produced: " << produced_val << std::endl;
        {
            // Access the Q under the lock:
            std::unique_lock<std::mutex> lck(mtx);
            q.push(produced_val);
            cond_var.notify_all();  // It's not a must to nofity under the lock but it might be more efficient (see @DavidSchwartz's comment below).
        }
    }
}

void consumer()
{
    while (true)
    {
        MyData consumed_val;
        {
            // Access the Q under the lock:
            std::unique_lock<std::mutex> lck(mtx);
            // NOTE: The following call will lock the mutex only when the the condition_varible will cause wakeup
            //       (due to `notify` or spurious wakeup).
            //       Then it will check if the Q is empty.
            //       If empty it will release the lock and continue to wait. 
            //       If not empty, the lock will be kept until out of scope.
            //       See the documentation for std::condition_variable.
            cond_var.wait(lck, []() { return !q.empty(); }); // will loop internally to handle spurious wakeups
            consumed_val = q.front();
            q.pop();
        }
        std::cout << "consumed: " << consumed_val << std::endl;
        std::this_thread::sleep_for(std::chrono::milliseconds(200));    // simulate some calculation
    }
}

int main()
{
    std::thread p(producer);
    std::thread c(consumer);
    while(true) {}
    p.join(); c.join(); // will never happen in our case but to remind us what is needed.
    return 0;
}

Some notes:

  1. In your real code, none of the threads should run forever. You should have some mechanism to notify them to gracefully exit.
  2. The global variables (mtx,q etc.) are better to be members of some context class, or passed to the producer() and consumer() as parameters.
  3. This example assumes for simplicity that the producer's production rate is always low relatively to the consumer's rate. In your real code you can make it more general, by making the consumer extract all elements in the Q each time the condition_variable is signaled.
  4. You can "play" with the sleep_for times for the producer and consumer to test varios timing cases.
  5. For more complex data types that benefit from move semantics, you can change:
    consumed_val = q.front();
    

    to:

    consumed_val = std::move(q.front());
    

    for better efficiency (since the front element in the queue is poped right afterwards anyway).

  6. The code above assumes that MyData is default constructable.
    If this is not the case, you can use an immediatly invoked lambda in the loop body in the consumer:

    MyData consumed_val = 
         [](){ std::unique_lock<std::mutex> lck(mtx);
               cond_var.wait(lck, []() { return !q.empty(); }); 
               MyData val = std::move(q.front()); 
               q.pop(); 
               return val; }
         ();   // <-- immediatly invoke the lambda to initialize consumed_val 
    

    The usage of std::move together with NRVO should make it efficient.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文