多处理器系统中的线程饥饿
我正在实施生产者解决问题。要实施此功能,我们需要与std :: portion_variable一起以及std :: mutex通知线程以唤醒。使用这2个原语,生产商可以通知消费者,反之亦然。通常需要这是避免线程饥饿问题。但是我认为,在多处理器系统的情况下,这个问题真的持续存在吗?
这个问题之所以出现,是因为我正在使用无锁的环缓冲区实施此问题,而且我不想在生产者和消费者方面使用std :: mutex and std :: condition_variable。由于该队列不能有数据竞争问题,请致电inqueue()和Dequeue()。以下是代码。
template<typename MessageType>
class MessageProcessor
{
public:
~MessageProcessor()
{
stop();
if (workerThread_.joinable())
workerThread_.join();
}
bool postMessage(MessageType const &msg)
{
return queue_.enqueue(msg);
}
void registerHandler(std::function<void(MessageType)> handler, int32_t coreId=-1, std::string_view const &name="")
{
std::call_once(init_, [&](){
handler_ = std::move(handler);
workerThread_ = std::thread{&MessageProcessor::process, this};
if (!setAffinity(coreId, workerThread_))
LOG("Msg Processing thread couldn't be pinned to core: " << coreId);
else
LOG("Msg Processing thread pinned to core: " << coreId);
if (! name.empty())
pthread_setname_np(workerThread_.native_handle(), name.data());
});
}
void stop()
{
stop_ = true;
}
private:
void process() //This is a consumer, runs in a separate thread
{
while(!stop_.load(std::memory_order_acquire))
{
MessageType msg;
if (! queue_.dequeue(msg))
continue;
try
{
handler_(msg);
}
catch(std::exception const &ex)
{
LOG("Error while processing data: " << msg << ", Exception: " << ex.what());
}
catch(...)
{
LOG("UNKOWN Error while processing data: " << msg);
}
}
}
bool setAffinity(int32_t const coreId, std::thread &thread)
{
int cpuCoreCount = __sysconf(_GLIBCXX_USE_SC_NPROCESSORS_ONLN);
if (coreId < 0 || coreId >= cpuCoreCount)
return false;
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(coreId, &cpuset);
pthread_t currentThread = thread.native_handle();
return pthread_setaffinity_np(currentThread, sizeof(cpu_set_t), &cpuset) == 0;
}
std::thread workerThread_;
std::atomic<bool> stop_{false};
MPMC_Circular_Queue<MessageType, 1024> queue_;
std::function<void(MessageType)> handler_{};
std::once_flag init_;
};
int main()
{
pthread_setname_np(pthread_self(), "MAIN");
MessageProcessor<int> processor;
processor.registerHandler([](int i){
LOG("Received value: " << i);
}, 2, "PROCESSOR");
std::thread t1([&]() { //Producer thread1
for (int i = 1; i <= 100000; i += 2)
{
LOG("Submitting value: " << i);
processor.postMessage(i);
}
});
pthread_setname_np(t1.native_handle(), "ODD ");
std::thread t2([&]() { //Producer thread2
for (int i = 2; i <= 100000; i += 2)
{
LOG("Submitting value: " << i);
processor.postMessage(i);
}
});
pthread_setname_np(t2.native_handle(), "EVEN");
for (int i = 1; i <= 100000; ++i)
{
LOG("Runing main thread: " << i);
}
t1.join();
t2.join();
return 0;
}
此代码可以在现代多处理器系统中引发线程饥饿问题吗? mpmc_circular_queue是一个无锁界的队列。
I am implementing the producer-consumer problem. To implement this, we need to have std::condition_variable along with std::mutex to notify threads to wake up. Using these 2 primitives, the producer can notify to consumer and vice-versa to wake up. This is generally required to avoid thread starvation issues. But I am thinking does this issue really persist in the case of the multiprocessors system?
This question comes because I am implementing this using lock-free ring buffer and I don't want to use std::mutex and std::condition_variable at the producer and consumer sides. Since this queue can't have a data-race issue calling enqueue() and dequeue(). Below is the code.
template<typename MessageType>
class MessageProcessor
{
public:
~MessageProcessor()
{
stop();
if (workerThread_.joinable())
workerThread_.join();
}
bool postMessage(MessageType const &msg)
{
return queue_.enqueue(msg);
}
void registerHandler(std::function<void(MessageType)> handler, int32_t coreId=-1, std::string_view const &name="")
{
std::call_once(init_, [&](){
handler_ = std::move(handler);
workerThread_ = std::thread{&MessageProcessor::process, this};
if (!setAffinity(coreId, workerThread_))
LOG("Msg Processing thread couldn't be pinned to core: " << coreId);
else
LOG("Msg Processing thread pinned to core: " << coreId);
if (! name.empty())
pthread_setname_np(workerThread_.native_handle(), name.data());
});
}
void stop()
{
stop_ = true;
}
private:
void process() //This is a consumer, runs in a separate thread
{
while(!stop_.load(std::memory_order_acquire))
{
MessageType msg;
if (! queue_.dequeue(msg))
continue;
try
{
handler_(msg);
}
catch(std::exception const &ex)
{
LOG("Error while processing data: " << msg << ", Exception: " << ex.what());
}
catch(...)
{
LOG("UNKOWN Error while processing data: " << msg);
}
}
}
bool setAffinity(int32_t const coreId, std::thread &thread)
{
int cpuCoreCount = __sysconf(_GLIBCXX_USE_SC_NPROCESSORS_ONLN);
if (coreId < 0 || coreId >= cpuCoreCount)
return false;
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(coreId, &cpuset);
pthread_t currentThread = thread.native_handle();
return pthread_setaffinity_np(currentThread, sizeof(cpu_set_t), &cpuset) == 0;
}
std::thread workerThread_;
std::atomic<bool> stop_{false};
MPMC_Circular_Queue<MessageType, 1024> queue_;
std::function<void(MessageType)> handler_{};
std::once_flag init_;
};
int main()
{
pthread_setname_np(pthread_self(), "MAIN");
MessageProcessor<int> processor;
processor.registerHandler([](int i){
LOG("Received value: " << i);
}, 2, "PROCESSOR");
std::thread t1([&]() { //Producer thread1
for (int i = 1; i <= 100000; i += 2)
{
LOG("Submitting value: " << i);
processor.postMessage(i);
}
});
pthread_setname_np(t1.native_handle(), "ODD ");
std::thread t2([&]() { //Producer thread2
for (int i = 2; i <= 100000; i += 2)
{
LOG("Submitting value: " << i);
processor.postMessage(i);
}
});
pthread_setname_np(t2.native_handle(), "EVEN");
for (int i = 1; i <= 100000; ++i)
{
LOG("Runing main thread: " << i);
}
t1.join();
t2.join();
return 0;
}
Can this code raise thread starvation issue in modern multiprocessors system? MPMC_Circular_Queue is a lock free bounded queue.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论