锁定互惠符在无限循环或更新功能中有多糟糕

发布于 2025-01-23 12:39:06 字数 625 浏览 0 评论 0原文

std::queue<double> some_q;
std::mutex mu_q;

/* an update function may be an event observer */
void UpdateFunc()
{
    /* some other processing */
    std::lock_guard lock{ mu_q };
    while (!some_q.empty())
    {
        const auto& val = some_q.front();
        /* update different states according to val */
        some_q.pop();
    }
    /* some other processing */
}

/* some other thread might add some values after processing some other inputs */
void AddVal(...)
{
    std::lock_guard lock{ mu_q };
    some_q.push(...);
}

对于这种情况,可以这样处理队列吗? 还是如果我尝试使用像Boost One这样的无锁队列,会更好吗?

std::queue<double> some_q;
std::mutex mu_q;

/* an update function may be an event observer */
void UpdateFunc()
{
    /* some other processing */
    std::lock_guard lock{ mu_q };
    while (!some_q.empty())
    {
        const auto& val = some_q.front();
        /* update different states according to val */
        some_q.pop();
    }
    /* some other processing */
}

/* some other thread might add some values after processing some other inputs */
void AddVal(...)
{
    std::lock_guard lock{ mu_q };
    some_q.push(...);
}

For this case is it okay to handle the queue this way?
Or would it be better if I try to use a lock-free queue like the boost one?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

孤凫 2025-01-30 12:39:06

将互在锁定在无限循环或更新功能

中有多糟糕

。无限循环实际上使您的程序具有不确定的行为,除非它执行以下操作:

  • 终止
  • 呼叫库I/O函数
  • 访问权限
  • 通过挥发性Glvalue执行同步操作或原子操作执行

在输入循环之前获取Mutex锁定并仅将其保持为执行同步操作(在循环中)。另外,当持有互音时,没有人可以在队列中添加信息,因此在处理您提取的信息时,所有想要添加到队列的线程都必须等待 - 而且没有其他想要共享负载的工作人员线程可以从队列。通常最好从队列中提取一个任务,释放锁,然后使用您的收入。

常见的方法是使用condition_variable,该允许其他线程获取锁定,然后通知其他使用相同的procention_variable等待的线程。在等待时,CPU将非常接近空闲,并在需要时醒来完成工作。

将您的程序用作基础,它看起来像这样:

#include <chrono>
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <queue>
#include <thread>

std::queue<double> some_q;
std::mutex mu_q;
std::condition_variable cv_q; // the condition variable
bool stop_q = false;          // something to signal the worker thread to quit

/* an update function may be an event observer */
void UpdateFunc() {
    while(true) {
        double val;
        {
            std::unique_lock lock{mu_q};

            // cv_q.wait lets others acquire the lock to work with the queue
            // while it waits to be notified.
            while (not stop_q && some_q.empty()) cv_q.wait(lock);

            if(stop_q) break; // time to quit

            val = std::move(some_q.front());
            some_q.pop();
        } // lock released so others can use the queue

        // do time consuming work with "val" here
        std::cout << "got " << val << '\n';
    }
}

/* some other thread might add some values after processing some other inputs */
void AddVal(double val) {
    std::lock_guard lock{mu_q};
    some_q.push(val);
    cv_q.notify_one(); // notify someone that there's a new value to work with
}

void StopQ() { // a function to set the queue in shutdown mode
    std::lock_guard lock{mu_q};
    stop_q = true;
    cv_q.notify_all(); // notify all that it's time to stop
}

int main() {
    auto th = std::thread(UpdateFunc);
    
    // simulate some events coming with some time apart
    std::this_thread::sleep_for(std::chrono::seconds(1));
    AddVal(1.2);
    std::this_thread::sleep_for(std::chrono::seconds(1));
    AddVal(3.4);
    std::this_thread::sleep_for(std::chrono::seconds(1));
    AddVal(5.6);
    std::this_thread::sleep_for(std::chrono::seconds(1));

    StopQ();    
    th.join();
}

如果您真的想处理当前队列中的所有内容,请先提取所有内容,然后释放锁,然后与提取的内容一起使用。通过仅交换另一个std :: queue来快速完成从队列中提取所有内容。例子:

#include <atomic>

std::atomic<bool> stop_q{}; // needs to be atomic in this version

void UpdateFunc() {
    while(not stop_q) {
        std::queue<double> work; // this will be used to swap with some_q
        {
            std::unique_lock lock{mu_q};

            // cv_q.wait lets others acquire the lock to work with the queue
            // while it waits to be notified.
            while (not stop_q && some_q.empty()) cv_q.wait(lock);

            std::swap(work, some_q); // extract everything from the queue at once
        } // lock released so others can use the queue

        // do time consuming work here
        while(not stop_q && not work.empty()) {
            auto val = std::move(work.front());
            work.pop();

            std::cout << "got " << val << '\n';
        }
    }
}

How bad it is to lock a mutex in an infinite loop or an update function

It's pretty bad. Infinite loops actually make your program have undefined behavior unless it does one of the following:

  • terminate
  • make a call to a library I/O function
  • perform an access through a volatile glvalue
  • perform a synchronization operation or an atomic operation

Acquiring the mutex lock before entering the loop and just holding it does not count as performing a synchronization operation (in the loop). Also, when holding the mutex, noone can add information to the queue, so while processing the information you extract, all threads wanting to add to the queue will have to wait - and no other worker threads wanting to share the load can extract from the queue either. It's usually better to extract one task from the queue, release the lock and then work with what you got.

The common way is to use a condition_variable that lets other threads acquire the lock and then notify other threads waiting with the same condition_variable. The CPU will be pretty close to idle while waiting and wake up to do the work when needed.

Using your program as a base, it could look like this:

#include <chrono>
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <queue>
#include <thread>

std::queue<double> some_q;
std::mutex mu_q;
std::condition_variable cv_q; // the condition variable
bool stop_q = false;          // something to signal the worker thread to quit

/* an update function may be an event observer */
void UpdateFunc() {
    while(true) {
        double val;
        {
            std::unique_lock lock{mu_q};

            // cv_q.wait lets others acquire the lock to work with the queue
            // while it waits to be notified.
            while (not stop_q && some_q.empty()) cv_q.wait(lock);

            if(stop_q) break; // time to quit

            val = std::move(some_q.front());
            some_q.pop();
        } // lock released so others can use the queue

        // do time consuming work with "val" here
        std::cout << "got " << val << '\n';
    }
}

/* some other thread might add some values after processing some other inputs */
void AddVal(double val) {
    std::lock_guard lock{mu_q};
    some_q.push(val);
    cv_q.notify_one(); // notify someone that there's a new value to work with
}

void StopQ() { // a function to set the queue in shutdown mode
    std::lock_guard lock{mu_q};
    stop_q = true;
    cv_q.notify_all(); // notify all that it's time to stop
}

int main() {
    auto th = std::thread(UpdateFunc);
    
    // simulate some events coming with some time apart
    std::this_thread::sleep_for(std::chrono::seconds(1));
    AddVal(1.2);
    std::this_thread::sleep_for(std::chrono::seconds(1));
    AddVal(3.4);
    std::this_thread::sleep_for(std::chrono::seconds(1));
    AddVal(5.6);
    std::this_thread::sleep_for(std::chrono::seconds(1));

    StopQ();    
    th.join();
}

If you really want to process everything that is currently in the queue, then extract everything first and then release the lock, then work with what you extracted. Extracting everything from the queue is done quickly by just swapping in another std::queue. Example:

#include <atomic>

std::atomic<bool> stop_q{}; // needs to be atomic in this version

void UpdateFunc() {
    while(not stop_q) {
        std::queue<double> work; // this will be used to swap with some_q
        {
            std::unique_lock lock{mu_q};

            // cv_q.wait lets others acquire the lock to work with the queue
            // while it waits to be notified.
            while (not stop_q && some_q.empty()) cv_q.wait(lock);

            std::swap(work, some_q); // extract everything from the queue at once
        } // lock released so others can use the queue

        // do time consuming work here
        while(not stop_q && not work.empty()) {
            auto val = std::move(work.front());
            work.pop();

            std::cout << "got " << val << '\n';
        }
    }
}
不奢求什么 2025-01-30 12:39:06

您可以像当前在所有线程中正确使用锁一样使用它。但是,您可能会对如何调用UpdateFunc()感到沮丧。

  • 您要使用回调吗?
  • 您要使用ISR吗?
  • 您要进行投票吗?

您使用第三方lib,它通常会使线程同步和队列进行微不足道

如果 “ rel =“ nofollow noreferrer”> cmsis rtos(v2)。这是一个相当直接的过程,可以获取多个线程以相互传递信息。您可以有多个生产商和一个消费者。

单个消费者可以在一个永远等待的循环中等待,在执行工作之前等待收到消息

当超时设置为OSWAIT时,功能将等待
无限的时间直到获取消息(即等待语义)。

// Two producers
osMessageQueuePut(X,Y,Z,timeout=0)
osMessageQueuePut(X,Y,Z,timeout=0)
    
// One consumer which will run only once something enters the queue
osMessageQueueGet(X,Y,Z,osWaitForever)

tldr;您可以安全地进行,但是使用库可能会使您的同步问题更容易。

You can use it like you currently are assuming proper use of the lock across all threads. However, you may run into some frustrations about how you want to call updateFunc().

  • Are you going to be using a callback?
  • Are you going to be using an ISR?
  • Are you going to be polling?

If you use a 3rd party lib it often trivializes thread synchronization and queues

For example, if you are using a CMSIS RTOS(v2). It is a fairly straight forward process to get multiple threads to pass information between each other. You could have multiple producers, and a single consumer.

The single consumer can wait in a forever loop where it waits to receive a message before performing its work

when timeout is set to osWaitForever the function will wait for an
infinite time until the message is retrieved (i.e. wait semantics).

// Two producers
osMessageQueuePut(X,Y,Z,timeout=0)
osMessageQueuePut(X,Y,Z,timeout=0)
    
// One consumer which will run only once something enters the queue
osMessageQueueGet(X,Y,Z,osWaitForever)

tldr; You are safe to proceed, but using a library will likely make your synchronization problems easier.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文