一次停止多个线程

发布于 2025-01-17 14:01:45 字数 3137 浏览 4 评论 0原文

在下面的程序中,当线程等待 condition_variable_any 以确定何时停止时,我错过了什么?在下面列出的程序中,线程以不可预测的方式停止;有些调用notify_all之前,有些根本不停止。

使用的条件变量定义如下:

static std::mutex interrupt_mutex;
static std::condition_variable_any interrupt_cv;

线程检查是否到了停止的时间,如下所示:

std::unique_lock<std::mutex> lock(interrupt_mutex);
const auto cv_status = interrupt_cv.wait_for(lock, std::chrono::milliseconds(1000));
const auto timeout_expired = cv_status == std::cv_status::timeout;
if (!timeout_expired)
{
    quit = true;
}

主线程向线程发出停止信号,如下所示:

std::unique_lock<std::mutex> lock(interrupt_mutex);
interrupt_cv.notify_all();

可能的输出如下例如:

Thread  1> Received interrupt signal at iteration 2
Thread  1> Terminate
Thread  2> Received interrupt signal at iteration 2
Thread  2> Terminate
Thread  4> Received interrupt signal at iteration 2
Thread  4> Terminate
**** Requesting all threads to stop ****
Waiting for all threads to complete...

在重现问题的完整代码下方:

#include <thread>
#include <vector>
#include <mutex>
#include <condition_variable>

static std::mutex interrupt_mutex;
static std::condition_variable_any interrupt_cv;

int main()
{
    std::vector<std::thread> thread_handles;
    for (int thread_idx = 0; thread_idx < 4; ++thread_idx)
    {
        thread_handles.emplace_back(std::thread([thread_idx](const int thread_id)
        {
            int num_iterations = 0;
            auto quit = false;
            while (!quit)
            {
                // Fake processing time during the lock for testing purpose
                std::this_thread::sleep_for(std::chrono::milliseconds(200));
                ++num_iterations;

                // Check if need to stop with a timeout of 200ms 
                {
                    std::unique_lock<std::mutex> lock(interrupt_mutex);
                    const auto cv_status = interrupt_cv.wait_for(lock, std::chrono::milliseconds(1000));
                    if (const auto timeout_expired = cv_status == std::cv_status::timeout; !timeout_expired)
                    {
                        printf("Thread %2d> Received interrupt signal at iteration %d\n", thread_id, num_iterations);
                        quit = true;
                    }
                }
            }

            printf("Thread %2d> Terminate\n", thread_id);
        }, thread_idx + 1));
    }

    std::this_thread::sleep_for(std::chrono::seconds(5));

    // Signals all threads to stop
    {
        printf("**** Requesting all threads to stop ****\n");
        std::unique_lock<std::mutex> lock(interrupt_mutex);
        interrupt_cv.notify_all();
    }

    // Wait until all threads stop
    printf("Waiting for all threads to complete...\n");
    std::ranges::for_each(thread_handles, [](std::thread& thread_handle)
    {
        thread_handle.join();
    });

    printf("Program ends\n");
    return 0;
}

What do I miss in the program below with threads waiting for a condition_variable_any to determine when to stop ? In the program listed below, the threads stop in an impredictable way; some before the call to notify_all and some don't stop at all.

The condition variable used is defined as below:

static std::mutex interrupt_mutex;
static std::condition_variable_any interrupt_cv;

The threads check if it is time to stop as below:

std::unique_lock<std::mutex> lock(interrupt_mutex);
const auto cv_status = interrupt_cv.wait_for(lock, std::chrono::milliseconds(1000));
const auto timeout_expired = cv_status == std::cv_status::timeout;
if (!timeout_expired)
{
    quit = true;
}

The main thread signals the threads to stop as below:

std::unique_lock<std::mutex> lock(interrupt_mutex);
interrupt_cv.notify_all();

A possible output looks like:

Thread  1> Received interrupt signal at iteration 2
Thread  1> Terminate
Thread  2> Received interrupt signal at iteration 2
Thread  2> Terminate
Thread  4> Received interrupt signal at iteration 2
Thread  4> Terminate
**** Requesting all threads to stop ****
Waiting for all threads to complete...

Below the complete code that reproduces the problem:

#include <thread>
#include <vector>
#include <mutex>
#include <condition_variable>

static std::mutex interrupt_mutex;
static std::condition_variable_any interrupt_cv;

int main()
{
    std::vector<std::thread> thread_handles;
    for (int thread_idx = 0; thread_idx < 4; ++thread_idx)
    {
        thread_handles.emplace_back(std::thread([thread_idx](const int thread_id)
        {
            int num_iterations = 0;
            auto quit = false;
            while (!quit)
            {
                // Fake processing time during the lock for testing purpose
                std::this_thread::sleep_for(std::chrono::milliseconds(200));
                ++num_iterations;

                // Check if need to stop with a timeout of 200ms 
                {
                    std::unique_lock<std::mutex> lock(interrupt_mutex);
                    const auto cv_status = interrupt_cv.wait_for(lock, std::chrono::milliseconds(1000));
                    if (const auto timeout_expired = cv_status == std::cv_status::timeout; !timeout_expired)
                    {
                        printf("Thread %2d> Received interrupt signal at iteration %d\n", thread_id, num_iterations);
                        quit = true;
                    }
                }
            }

            printf("Thread %2d> Terminate\n", thread_id);
        }, thread_idx + 1));
    }

    std::this_thread::sleep_for(std::chrono::seconds(5));

    // Signals all threads to stop
    {
        printf("**** Requesting all threads to stop ****\n");
        std::unique_lock<std::mutex> lock(interrupt_mutex);
        interrupt_cv.notify_all();
    }

    // Wait until all threads stop
    printf("Waiting for all threads to complete...\n");
    std::ranges::for_each(thread_handles, [](std::thread& thread_handle)
    {
        thread_handle.join();
    });

    printf("Program ends\n");
    return 0;
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

眼泪都笑了 2025-01-24 14:01:45

code> condition_variable是指在条件更改时(例如,当a 共享变量更改值时)发出信号。但是您的代码没有条件。您正在尝试将condition_variable本身用作退出信号,而这并不是它的目的。 notify_all()只能唤醒在此精确时刻在code> procention_variable上积极等待的线程。没有等待它的线程,因为他们忙于做其他事情,不会收到该信号以终止。但是,这些线程一旦准备好等待,就需要检测条件。因此,条件需要更持久。这就是为什么您的代码无法正常工作的原因。

在这种情况下,您可以简单地将退出变量移至全局范围,在code> procention_variablemutex 。设置该退出变量将充当您的条件,您可以向等待线索发出信号。然后,您可以使用wait_for()的超载版本,该版本可以让您检查退出的当前状态(忽略虚假觉醒)。

尝试更多这样的东西:

#include <thread>
#include <vector>
#include <mutex>
#include <condition_variable>

static std::mutex interrupt_mutex;
static std::condition_variable_any interrupt_cv;
static bool quit = false;

int main()
{
    std::vector<std::thread> thread_handles;
    for (int thread_idx = 0; thread_idx < 4; ++thread_idx)
    {
        thread_handles.emplace_back(std::thread([thread_idx](const int thread_id)
        {
            int num_iterations = 0;
            while (true)
            {
                // Fake processing time outside the lock for testing purpose
                std::this_thread::sleep_for(std::chrono::milliseconds(200));
                ++num_iterations;

                // Check if need to stop with a timeout of 1s 
                {
                    std::unique_lock<std::mutex> lock(interrupt_mutex);
                    const bool signaled = interrupt_cv.wait_for(lock, std::chrono::seconds(1), [](){ return quit; });
                    if (signaled) break;
                }
            }

            printf("Thread %2d> Received interrupt signal at iteration %d\n", thread_id, num_iterations);
            printf("Thread %2d> Terminate\n", thread_id);
        }, thread_idx + 1));
    }

    std::this_thread::sleep_for(std::chrono::seconds(5));

    // Signals all threads to stop
    printf("**** Requesting all threads to stop ****\n");
    {
        std::lock_guard<std::mutex> lock(interrupt_mutex);
        quit = true;
    }
    interrupt_cv.notify_all();

    // Wait until all threads stop
    printf("Waiting for all threads to complete...\n");
    std::ranges::for_each(thread_handles, [](std::thread& thread_handle)
    {
        thread_handle.join();
    });

    printf("Program ends\n");
    return 0;
}

A condition_variable is meant to signal threads when a condition changes (ie, such as when a shared variable changes value). But your code has no condition. You are trying to use the condition_variable itself as a quit signal, and that is not what it is meant for. notify_all() will only wake up threads that are actively waiting on the condition_variable at that exact moment. Threads that are not waiting on it, because they are busy doing something else, will not receive that signal to terminate. But those threads will need to detect the condition once they are ready to wait. So the condition needs to be something more persistent. That is why your code is not working correctly.

In this case, you can simply move your quit variable to global scope, next to the condition_variable and mutex. Setting that quit variable will act as your condition that you can signal waiting threads about. You can then use the overloaded version of wait_for() that will let you check the current state of quit (to ignore spurious awakenings).

Try something more like this:

#include <thread>
#include <vector>
#include <mutex>
#include <condition_variable>

static std::mutex interrupt_mutex;
static std::condition_variable_any interrupt_cv;
static bool quit = false;

int main()
{
    std::vector<std::thread> thread_handles;
    for (int thread_idx = 0; thread_idx < 4; ++thread_idx)
    {
        thread_handles.emplace_back(std::thread([thread_idx](const int thread_id)
        {
            int num_iterations = 0;
            while (true)
            {
                // Fake processing time outside the lock for testing purpose
                std::this_thread::sleep_for(std::chrono::milliseconds(200));
                ++num_iterations;

                // Check if need to stop with a timeout of 1s 
                {
                    std::unique_lock<std::mutex> lock(interrupt_mutex);
                    const bool signaled = interrupt_cv.wait_for(lock, std::chrono::seconds(1), [](){ return quit; });
                    if (signaled) break;
                }
            }

            printf("Thread %2d> Received interrupt signal at iteration %d\n", thread_id, num_iterations);
            printf("Thread %2d> Terminate\n", thread_id);
        }, thread_idx + 1));
    }

    std::this_thread::sleep_for(std::chrono::seconds(5));

    // Signals all threads to stop
    printf("**** Requesting all threads to stop ****\n");
    {
        std::lock_guard<std::mutex> lock(interrupt_mutex);
        quit = true;
    }
    interrupt_cv.notify_all();

    // Wait until all threads stop
    printf("Waiting for all threads to complete...\n");
    std::ranges::for_each(thread_handles, [](std::thread& thread_handle)
    {
        thread_handle.join();
    });

    printf("Program ends\n");
    return 0;
}
庆幸我还是我 2025-01-24 14:01:45

您的代码有2个问题,并且都具有相同的解决方案。

  1. 虚假的唤醒。如果您的Wait_for由于SW而结束,则您的病情会满足您的状况,尽管没有人实际要求它唤醒/结束。
  2. 如果您的线程不握住锁,也没有睡觉,并且主线程通知全部呢?请注意,通知不会在任何地方存储,因此,如果您错过了一个通知,稍后再收到。因此,那些错过的线程永远不会终止。

为了解决这两个问题,您需要另一个标志,这将告诉您的线程工作已经完成,并且必须停止。

static bool stop = false;
//...
if (stop) // Instead of if (const auto timeout_expired = cv_status == std::cv_status::timeout; !timeout_expired)
//...
printf("**** Requesting all threads to stop ****\n");
std::unique_lock<std::mutex> lock(interrupt_mutex);
stop = true;
interrupt_cv.notify_all();

There are 2 issues with your code and both have the same solution.

  1. Spurious wake-up. If your wait_for is ended because of SW, you will have your condition satisfied, although no one actually requested it to wake-up/end.
  2. What if your thread isn't holding a lock and not sleeping and your main thread notifies all? Note, that notifications aren't stored anywhere so if you missed one, you won't get one later. So those threads that missed will never terminate.

To fix both issues you need another flag, which will tell your threads that job is done and they have to stop.

static bool stop = false;
//...
if (stop) // Instead of if (const auto timeout_expired = cv_status == std::cv_status::timeout; !timeout_expired)
//...
printf("**** Requesting all threads to stop ****\n");
std::unique_lock<std::mutex> lock(interrupt_mutex);
stop = true;
interrupt_cv.notify_all();
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文