多处理器系统中的线程饥饿

发布于 2025-02-07 14:52:24 字数 3641 浏览 3 评论 0原文

我正在实施生产者解决问题。要实施此功能,我们需要与std :: portion_variable一起以及std :: mutex通知线程以唤醒。使用这2个原语,生产商可以通知消费者,反之亦然。通常需要这是避免线程饥饿问题。但是我认为,在多处理器系统的情况下,这个问题真的持续存在吗?

这个问题之所以出现,是因为我正在使用无锁的环缓冲区实施此问题,而且我不想在生产者和消费者方面使用std :: mutex and std :: condition_variable。由于该队列不能有数据竞争问题,请致电inqueue()和Dequeue()。以下是代码。

template<typename MessageType>
class MessageProcessor
{
public:
    ~MessageProcessor()
    {
        stop();

        if (workerThread_.joinable())
            workerThread_.join();
    }

    bool postMessage(MessageType const &msg)
    {
        return queue_.enqueue(msg);
    }

    void registerHandler(std::function<void(MessageType)> handler, int32_t coreId=-1, std::string_view const &name="")
    {
        std::call_once(init_, [&](){
            handler_ = std::move(handler);
            workerThread_ = std::thread{&MessageProcessor::process, this};

            if (!setAffinity(coreId, workerThread_))
                LOG("Msg Processing thread couldn't be pinned to core: " << coreId);
            else
                LOG("Msg Processing thread pinned to core: " << coreId);

            if (! name.empty())
                pthread_setname_np(workerThread_.native_handle(), name.data());
        });
    }

    void stop()
    {
        stop_ = true;
    }

private:
    void process() //This is a consumer, runs in a separate thread
    {
        while(!stop_.load(std::memory_order_acquire))
        {
            MessageType msg;

            if (! queue_.dequeue(msg))
                continue;

            try
            {
                handler_(msg);
            }
            catch(std::exception const &ex)
            {
                LOG("Error while processing data: " << msg << ", Exception: " << ex.what());
            }
            catch(...)
            {
                LOG("UNKOWN Error while processing data: " << msg);
            }
        }
    }

    bool setAffinity(int32_t const coreId, std::thread &thread)
    {
        int cpuCoreCount = __sysconf(_GLIBCXX_USE_SC_NPROCESSORS_ONLN);

        if (coreId < 0 || coreId >= cpuCoreCount)
            return false;

        cpu_set_t cpuset;

        CPU_ZERO(&cpuset);
        CPU_SET(coreId, &cpuset);

        pthread_t currentThread = thread.native_handle();

        return pthread_setaffinity_np(currentThread, sizeof(cpu_set_t), &cpuset) == 0;
    }

    std::thread workerThread_;
    std::atomic<bool> stop_{false};
    MPMC_Circular_Queue<MessageType, 1024> queue_;
    std::function<void(MessageType)> handler_{};
    std::once_flag init_;
};

int main()
{
    pthread_setname_np(pthread_self(), "MAIN");

    MessageProcessor<int> processor;

    processor.registerHandler([](int i){
        LOG("Received value: " << i);
    }, 2, "PROCESSOR");

    std::thread t1([&]() { //Producer thread1
        for (int i = 1; i <= 100000; i += 2)
        {
            LOG("Submitting value: " << i);
            processor.postMessage(i);
        }
    });

    pthread_setname_np(t1.native_handle(), "ODD ");

    std::thread t2([&]() { //Producer thread2
        for (int i = 2; i <= 100000; i += 2)
        {
            LOG("Submitting value: " << i);
            processor.postMessage(i);
        }
    });

    pthread_setname_np(t2.native_handle(), "EVEN");

    for (int i = 1; i <= 100000; ++i)
    {
        LOG("Runing main thread: " << i);
    }

    t1.join();
    t2.join();

    return 0;
}

此代码可以在现代多处理器系统中引发线程饥饿问题吗? mpmc_circular_queue是一个无锁界的队列。

I am implementing the producer-consumer problem. To implement this, we need to have std::condition_variable along with std::mutex to notify threads to wake up. Using these 2 primitives, the producer can notify to consumer and vice-versa to wake up. This is generally required to avoid thread starvation issues. But I am thinking does this issue really persist in the case of the multiprocessors system?

This question comes because I am implementing this using lock-free ring buffer and I don't want to use std::mutex and std::condition_variable at the producer and consumer sides. Since this queue can't have a data-race issue calling enqueue() and dequeue(). Below is the code.

template<typename MessageType>
class MessageProcessor
{
public:
    ~MessageProcessor()
    {
        stop();

        if (workerThread_.joinable())
            workerThread_.join();
    }

    bool postMessage(MessageType const &msg)
    {
        return queue_.enqueue(msg);
    }

    void registerHandler(std::function<void(MessageType)> handler, int32_t coreId=-1, std::string_view const &name="")
    {
        std::call_once(init_, [&](){
            handler_ = std::move(handler);
            workerThread_ = std::thread{&MessageProcessor::process, this};

            if (!setAffinity(coreId, workerThread_))
                LOG("Msg Processing thread couldn't be pinned to core: " << coreId);
            else
                LOG("Msg Processing thread pinned to core: " << coreId);

            if (! name.empty())
                pthread_setname_np(workerThread_.native_handle(), name.data());
        });
    }

    void stop()
    {
        stop_ = true;
    }

private:
    void process() //This is a consumer, runs in a separate thread
    {
        while(!stop_.load(std::memory_order_acquire))
        {
            MessageType msg;

            if (! queue_.dequeue(msg))
                continue;

            try
            {
                handler_(msg);
            }
            catch(std::exception const &ex)
            {
                LOG("Error while processing data: " << msg << ", Exception: " << ex.what());
            }
            catch(...)
            {
                LOG("UNKOWN Error while processing data: " << msg);
            }
        }
    }

    bool setAffinity(int32_t const coreId, std::thread &thread)
    {
        int cpuCoreCount = __sysconf(_GLIBCXX_USE_SC_NPROCESSORS_ONLN);

        if (coreId < 0 || coreId >= cpuCoreCount)
            return false;

        cpu_set_t cpuset;

        CPU_ZERO(&cpuset);
        CPU_SET(coreId, &cpuset);

        pthread_t currentThread = thread.native_handle();

        return pthread_setaffinity_np(currentThread, sizeof(cpu_set_t), &cpuset) == 0;
    }

    std::thread workerThread_;
    std::atomic<bool> stop_{false};
    MPMC_Circular_Queue<MessageType, 1024> queue_;
    std::function<void(MessageType)> handler_{};
    std::once_flag init_;
};

int main()
{
    pthread_setname_np(pthread_self(), "MAIN");

    MessageProcessor<int> processor;

    processor.registerHandler([](int i){
        LOG("Received value: " << i);
    }, 2, "PROCESSOR");

    std::thread t1([&]() { //Producer thread1
        for (int i = 1; i <= 100000; i += 2)
        {
            LOG("Submitting value: " << i);
            processor.postMessage(i);
        }
    });

    pthread_setname_np(t1.native_handle(), "ODD ");

    std::thread t2([&]() { //Producer thread2
        for (int i = 2; i <= 100000; i += 2)
        {
            LOG("Submitting value: " << i);
            processor.postMessage(i);
        }
    });

    pthread_setname_np(t2.native_handle(), "EVEN");

    for (int i = 1; i <= 100000; ++i)
    {
        LOG("Runing main thread: " << i);
    }

    t1.join();
    t2.join();

    return 0;
}

Can this code raise thread starvation issue in modern multiprocessors system? MPMC_Circular_Queue is a lock free bounded queue.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文