多消费者条件变量在同一实例中等待
我很难想到一种方法,可以适当地实现一个在同一功能中等待的多个侦听器,以便生产者连续发出一些新数据,而无需对同一以前的数据“发出信号” -
我希望所有听众始终请参阅最新的可用数据(如果不忙,不关心错失信号),而无需重复。
到目前为止,我的尝试:
#include <functional>
#include <shared_mutex>
#include <condition_variable>
#include <thread>
class Signaller {
public:
// Used by producer, will hold on to the mutex uniquely as it modifies data
void Signal(const std::function<void()>& fnIn) {
std::unique_lock lock(m_mtx);
fnIn();
m_newData = true;
m_cv.notify_all();
}
// Used by consumers, will only hold shared mutex to read data
void Wait(const std::function<void()>& fnIn) {
{
std::shared_lock lock(m_mtx);
m_cv.wait(lock, [this](){ return m_newData; });
fnIn();
}
// Need some way to flip m_newData to false when all threads are "done"
// (or some other method of preventing spurious wakeups)
// I don't think this is particularly ideal
{
std::unique_lock lock(m_mtx);
m_newData = false;
}
}
private:
std::condition_variable_any m_cv;
std::shared_mutex m_mtx;
bool m_newData{false}; // To prevent spurious wakeups
};
class Example {
public:
// Multiple threads will call this function in the same instance of Example
void ConsumerLoop()
{
int latestData{0};
while (true){
m_signaller.Wait([this, &latestData](){ latestData = m_latestData; });
// process latestData...
// I want to make sure latestData here is always the latest
// (It's OK to miss a few signals in between if its off processing this latest data)
}
}
// One thread will be using this to signal new data
void ProducerLoop(){
while(true){
int newData = rand();
m_signaller.Signal([this, newData](){ m_latestData = newData; });
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
}
private:
Signaller m_signaller;
int m_latestData;
};
我的主要问题(我认为)是如何防止虚假唤醒,同时防止重复的数据醒来同一线程。我考虑过在每个线程中使用某种计数器来跟踪它是否正在接收相同的数据,但无法使用该想法(除非我也许使用std :: This_thread进行了某种映射:: get_id
?)。有更好的方法吗?
编辑: 在我的线程ID的想法地图上扩展,我认为我找到了一个解决方案:
#include <functional>
#include <shared_mutex>
#include <condition_variable>
#include <unordered_map>
#include <thread>
class Signaller {
public:
// Used by producer, will hold on to the mutex uniquely as it modifies data
void Signal(const std::function<void()>& fnIn) {
std::unique_lock lock(m_mtx);
fnIn();
m_ctr++;
m_cv.notify_all();
}
void RegisterWaiter(){
std::unique_lock lock(m_mtx);
auto [itr, emplaced] = m_threadCtrMap.try_emplace(std::this_thread::get_id(), m_ctr);
if (!emplaced) {
itr->second = m_ctr;
}
}
// Used by consumers, will only hold shared mutex to read data
void Wait(const std::function<void()>& fnIn) {
std::shared_lock lock(m_mtx);
m_cv.wait(lock, [this](){ return m_threadCtrMap[std::this_thread::get_id()] != m_ctr; });
fnIn();
m_threadCtrMap[std::this_thread::get_id()] = m_ctr;
}
private:
std::condition_variable_any m_cv;
std::shared_mutex m_mtx;
std::uint32_t m_ctr{0};
std::unordered_map<std::thread::id, std::uint32_t> m_threadCtrMap; // Stores the last signalled ctr for that thread
};
class Example {
public:
// Multiple threads will call this function in the same instance of Example
void ConsumerLoop()
{
int latestData{0};
m_signaller.RegisterWaiter();
while (true){
m_signaller.Wait([this, &latestData](){ latestData = m_latestData; });
}
}
// One thread will be using this to signal new data
void ProducerLoop(){
while(true){
int newData = rand();
m_signaller.Signal([this, newData](){ m_latestData = newData; });
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
}
private:
Signaller m_signaller;
int m_latestData;
};
I'm having trouble thinking of a way to properly implement a signalling mechanism for multiple listeners waiting in the same function for a producer to signal some new data continuously, without getting "signalled" for the same previous data-
I want all listeners to always see the latest available data (not caring about missed signals if they are busy), without repeats.
My attempt so far:
#include <functional>
#include <shared_mutex>
#include <condition_variable>
#include <thread>
class Signaller {
public:
// Used by producer, will hold on to the mutex uniquely as it modifies data
void Signal(const std::function<void()>& fnIn) {
std::unique_lock lock(m_mtx);
fnIn();
m_newData = true;
m_cv.notify_all();
}
// Used by consumers, will only hold shared mutex to read data
void Wait(const std::function<void()>& fnIn) {
{
std::shared_lock lock(m_mtx);
m_cv.wait(lock, [this](){ return m_newData; });
fnIn();
}
// Need some way to flip m_newData to false when all threads are "done"
// (or some other method of preventing spurious wakeups)
// I don't think this is particularly ideal
{
std::unique_lock lock(m_mtx);
m_newData = false;
}
}
private:
std::condition_variable_any m_cv;
std::shared_mutex m_mtx;
bool m_newData{false}; // To prevent spurious wakeups
};
class Example {
public:
// Multiple threads will call this function in the same instance of Example
void ConsumerLoop()
{
int latestData{0};
while (true){
m_signaller.Wait([this, &latestData](){ latestData = m_latestData; });
// process latestData...
// I want to make sure latestData here is always the latest
// (It's OK to miss a few signals in between if its off processing this latest data)
}
}
// One thread will be using this to signal new data
void ProducerLoop(){
while(true){
int newData = rand();
m_signaller.Signal([this, newData](){ m_latestData = newData; });
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
}
private:
Signaller m_signaller;
int m_latestData;
};
My main issue (I think) is how to prevent spurious wakeups, while preventing repeated data from waking up the same thread. I've thought about using some sort of counter within each thread to keep track of whether it's receiving the same data, but couldn't get anywhere with that idea (unless I perhaps make some sort of map using std::this_thread::get_id
?). Is there a better way to do this?
EDIT:
Expanding on my map of thread ID's idea, I think I've found a solution:
#include <functional>
#include <shared_mutex>
#include <condition_variable>
#include <unordered_map>
#include <thread>
class Signaller {
public:
// Used by producer, will hold on to the mutex uniquely as it modifies data
void Signal(const std::function<void()>& fnIn) {
std::unique_lock lock(m_mtx);
fnIn();
m_ctr++;
m_cv.notify_all();
}
void RegisterWaiter(){
std::unique_lock lock(m_mtx);
auto [itr, emplaced] = m_threadCtrMap.try_emplace(std::this_thread::get_id(), m_ctr);
if (!emplaced) {
itr->second = m_ctr;
}
}
// Used by consumers, will only hold shared mutex to read data
void Wait(const std::function<void()>& fnIn) {
std::shared_lock lock(m_mtx);
m_cv.wait(lock, [this](){ return m_threadCtrMap[std::this_thread::get_id()] != m_ctr; });
fnIn();
m_threadCtrMap[std::this_thread::get_id()] = m_ctr;
}
private:
std::condition_variable_any m_cv;
std::shared_mutex m_mtx;
std::uint32_t m_ctr{0};
std::unordered_map<std::thread::id, std::uint32_t> m_threadCtrMap; // Stores the last signalled ctr for that thread
};
class Example {
public:
// Multiple threads will call this function in the same instance of Example
void ConsumerLoop()
{
int latestData{0};
m_signaller.RegisterWaiter();
while (true){
m_signaller.Wait([this, &latestData](){ latestData = m_latestData; });
}
}
// One thread will be using this to signal new data
void ProducerLoop(){
while(true){
int newData = rand();
m_signaller.Signal([this, newData](){ m_latestData = newData; });
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
}
private:
Signaller m_signaller;
int m_latestData;
};
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
这是我的实施:
Here's my implementation: