C++在“线程屏障”上进行适当的原子记忆订购同步模式
我需要在预定义的工作线程(通过应用程序配置静态知道)和预定义的控制平面线程之间正确同步对某些共享资源的访问。控制平面线程从外部接收请求,并基于该可能修改共享资源。工作线程只需运行一个无限循环,仅读取共享资源的内部。要以线程安全的方式执行此操作,并给定实际应用程序用例(网络数据包处理,多数据平面线程和多控制平面线程),决定实现一种“线程屏障”类型的模式。假设应用程序配置为产生2个工作线程和2个控制平面线程:
std::atomic_bool barrier{};
std::atomic_uint32_t workers_at_barrier{};
// called by control-plane threads only!
void barrier_lock()
{
// optimized spinlock implementation: rigtorp.se/spinlock/
while (true)
{
if (!barrier.exchange(true, std::memory_order_acquire))
break;
while (barrier.load(std::memory_order_relaxed))
__builtin_ia32_pause();
}
assert(barrier);
// wait for ALL worker (data-plane) threads to arrive at the barrier!
while (workers_at_barrier.load() != 2);
assert(workers_at_barrier.load() == 2);
}
// called by control-plane threads only!
void barrier_unlock()
{
assert(barrier && workers_at_barrier.load() == 2);
barrier.store(false, std::memory_order_release);
// wait for ALL workers to get out of the barrier!
while (workers_at_barrier.load() != 0);
}
struct barrier_lock_guard
{
barrier_lock_guard()
{
barrier_lock();
}
~barrier_lock_guard()
{
barrier_unlock();
}
};
// control-plane threads receive some requests and handles them here
void handle_stuff()
{
// ... stuff
{
barrier_lock_guard blg;
// barrier should be set and all workers (2 in this case) should be waiting at the barrier for its release
assert(barrier && workers_at_barrier.load() == 2);
// ... writes to shared resource
}
// ... stuff
}
// called by worker threads only!
void wait_at_barrier()
{
// immediately return if barrier is not set
if (!barrier.load(std::memory_order_acquire))
return;
++workers_at_barrier;
// block at the barrier until it gets released
while (barrier.load(std::memory_order_acquire));
--workers_at_barrier;
}
// function run by the worker threads
void workers_stuff()
{
while (true)
{
wait_at_barrier();
// ... reads from shared resource
}
}
问题是servert servert(barrier&& workers_at_barrier.load()== 2) ;
在handle_stuff()
中被击中。这很少发生,因此一定有问题,我试图确切地了解什么和何处。可以肯定的是,尽管它与std :: memory_order
的不正确使用有关。有任何C ++原子Pro可以指出我的确切问题以及正确的解决方案是什么?提前致谢。
I need to properly synchronize access to some shared resource between a predefined number of worker threads (statically known via application config) and a predefined number of control-plane threads. The control-plane threads receive requests from the outside, and based on that potentially modify the shared resource. Worker threads simply run an infinite loop inside of which the shared resource is read only. To do this in a thread-safe way, and given the actual application use-case (network packet processing, multi data-plane threads and multi control-plane threads), it was decided to implement a "thread barrier" kind of pattern. Here's a snippet for how it's done, assuming the application is configured to spawn 2 worker threads and 2 control-plane threads:
std::atomic_bool barrier{};
std::atomic_uint32_t workers_at_barrier{};
// called by control-plane threads only!
void barrier_lock()
{
// optimized spinlock implementation: rigtorp.se/spinlock/
while (true)
{
if (!barrier.exchange(true, std::memory_order_acquire))
break;
while (barrier.load(std::memory_order_relaxed))
__builtin_ia32_pause();
}
assert(barrier);
// wait for ALL worker (data-plane) threads to arrive at the barrier!
while (workers_at_barrier.load() != 2);
assert(workers_at_barrier.load() == 2);
}
// called by control-plane threads only!
void barrier_unlock()
{
assert(barrier && workers_at_barrier.load() == 2);
barrier.store(false, std::memory_order_release);
// wait for ALL workers to get out of the barrier!
while (workers_at_barrier.load() != 0);
}
struct barrier_lock_guard
{
barrier_lock_guard()
{
barrier_lock();
}
~barrier_lock_guard()
{
barrier_unlock();
}
};
// control-plane threads receive some requests and handles them here
void handle_stuff()
{
// ... stuff
{
barrier_lock_guard blg;
// barrier should be set and all workers (2 in this case) should be waiting at the barrier for its release
assert(barrier && workers_at_barrier.load() == 2);
// ... writes to shared resource
}
// ... stuff
}
// called by worker threads only!
void wait_at_barrier()
{
// immediately return if barrier is not set
if (!barrier.load(std::memory_order_acquire))
return;
++workers_at_barrier;
// block at the barrier until it gets released
while (barrier.load(std::memory_order_acquire));
--workers_at_barrier;
}
// function run by the worker threads
void workers_stuff()
{
while (true)
{
wait_at_barrier();
// ... reads from shared resource
}
}
The problem is that the assert assert(barrier && workers_at_barrier.load() == 2);
in handle_stuff()
is getting hit. This occurs very very rarely, so there must be something wrong, and I'm trying to understand exactly what and where. Pretty sure though it has something to do with an incorrect use of std::memory_order
. Any C++ atomics pro out there that can point me to the exact issue and what the proper fix would be? Thanks in advance.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
这不是记忆订购问题,而只是一个简单的种族。即使将所有内存顺序升级为顺序一致性,我也可以再现它。 这是我在Godbolt上的版本。
注释
等待所有工人摆脱障碍!
barrier_unlock 似乎都指向了问题。该循环不会强制另一个控制线程等待。该线程可以立即占据障碍。另外,观察值
workers_at_barrier == 2
在barrier_lock()
中并未证明两个线程现在在屏障等待;他们可能已经在以前倒下的时候已经通过了它,但尚未到处降低原子柜台。因此,想象以下事件序列。我们有控制线C1,C2和Worker Threads W1,W2。 C1采用了障碍,并且仅输入
barrier_unlock()
。 C2只是输入barrier_lock()
。 W1和W2都在中旋转,而(barrier.load())
inwait_at_barrier()
和worker_at_at_barrier
具有值2
。c1:
barrier.store(false)
w1:
barrier.load()
:false
,旋转循环退出c2:
barrier.exchange(true)
:返回false
。突破循环。现在barrier == true
。c2:
断言(屏障)
(通过)c2:
workers_at_barrier.load()
:2。while
循环立即退出。<<<<<<<<<<<<< /p>c2:
assert(workers_at_barrier.load()== 2)
(通过)c2从
barrier_lock()
返回
w1:
- workers_at_barrier
:1c2 in
handle_stuff()
:现在barrier == true
and code> andworkers_at_barrier == 1
。断言失败。我不确定最好的修复程序。也许
屏障
应该具有第三个“排水”状态,其中控制线仍然拥有障碍,但工人可以离开它。只有在完成后,控制线才能完全释放屏障。This is not a memory ordering issue, just a plain race. I can reproduce it even after upgrading all the memory orderings to sequential consistency. Here is my version on godbolt though I can only reproduce the failure locally (godbolt only runs on one core).
The comment
wait for ALL workers to get out of the barrier!
inbarrier_unlock
seems to point to the problem. This loop doesn't force another control thread to wait; that other thread could take the barrier right away.Alternatively, observing the value
workers_at_barrier == 2
inbarrier_lock()
does not prove that both threads are now waiting at the barrier; they may have already passed it while it was previously down, but not yet gotten around to decrementing the atomic counter.So imagine the following sequence of events. We have control threads C1,C2, and worker threads W1,W2. C1 has taken the barrier and is just entering
barrier_unlock()
. C2 is just enteringbarrier_lock()
. W1 and W2 are both spinning in thewhile(barrier.load())
inwait_at_barrier()
, andworkers_at_barrier
has the value2
.C1:
barrier.store(false)
W1:
barrier.load()
:false
, spin loop exitsC2:
barrier.exchange(true)
: returnsfalse
. Break out of loop. Nowbarrier == true
.C2:
assert(barrier)
(passes)C2:
workers_at_barrier.load()
: 2. Thewhile
loop exits immediately.C2:
assert(workers_at_barrier.load() == 2)
(passes)C2 returns from
barrier_lock()
W1:
--workers_at_barrier
: 1C2 in
handle_stuff()
: Nowbarrier == true
andworkers_at_barrier == 1
. The assertion fails.I'm not sure of the best fix offhand. Perhaps
barrier
should have a third "draining" state, in which the control thread still owns the barrier but the workers can leave it. Only after they have done so does the control thread fully release the barrier.