在我的MPSC环缓冲区中有种族条件
我试图为学习目的构建一个无锁的环形缓冲区,并陷入比赛条件。
MPSC环缓冲区的描述:
- 可以保证,当缓冲区为空时,请勿调用Poll()。
- 它不是像传统的环缓冲区那样模块的头部和尾巴,而是让它们线性地进行,并且在使用它们之前(由于缓冲区的大小为2的功率,因此可以与溢出在一起)。
- 我们将Max_producers-1插槽保持在队列中,以便如果多个生产商来查看一个插槽并继续进行,他们都可以放置他们的条目。
- 它使用32位的头和尾巴,以便可以用64位原子读取而无需锁定。
我的测试涉及几个线程,将一些已知值集的值编写到队列中,并且消费者线程进行了轮询(当缓冲区不为空时)并总结所有内容,并验证正确的结果。有2个或以上的生产商,我得到了不一致的款项(并且有1个生产商,它起作用)。
任何帮助都将不胜感激。谢谢你!
这是代码:
struct ring_buf_entry {
uint32_t seqn;
};
struct __attribute__((packed, aligned(8))) ring_buf {
union {
struct {
volatile uint32_t tail;
volatile uint32_t head;
};
volatile uint64_t snapshot;
};
volatile struct ring_buf_entry buf[RING_BUF_SIZE];
};
#define RING_SUB(x,y) ((x)>=(y)?((x)-(y)):((x)+(1ULL<<32)-(y)))
static void ring_buf_push(struct ring_buf* rb, uint32_t seqn)
{
size_t pos;
while (1) {
// rely on aligned, packed, and no member-reordering properties
uint64_t snapshot = __atomic_load_n(&(rb->snapshot), __ATOMIC_SEQ_CST);
// little endian.
uint64_t snap_head = snapshot >> 32;
uint64_t snap_tail = snapshot & 0xffffffffULL;
if (RING_SUB(snap_tail, snap_head) < RING_BUF_SIZE - MAX_PRODUCERS + 1) {
uint32_t exp = snap_tail;
if (__atomic_compare_exchange_n(&(rb->tail), &exp, snap_tail+1, 0, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
pos = snap_tail;
break;
}
}
asm volatile("pause\n": : :"memory");
}
pos &= RING_BUF_SIZE-1;
rb->buf[pos].seqn = seqn;
asm volatile("sfence\n": : :"memory");
}
static struct ring_buf_entry ring_buf_poll(struct ring_buf* rb)
{
struct ring_buf_entry ret = rb->buf[__atomic_load_n(&(rb->head), __ATOMIC_SEQ_CST) & (RING_BUF_SIZE-1)];
__atomic_add_fetch(&(rb->head), 1, __ATOMIC_SEQ_CST);
return ret;
}
I was trying to build a MPSC lock-free ring buffer for learning purpose, and am running into race conditions.
A description of the MPSC ring buffer:
- It is guaranteed that poll() is never called when the buffer is empty.
- Instead of mod'ing head and tail like a traditional ring buffer, it lets them proceed linearly, and AND's them before using them (since the buffer size is a power of 2, this works ok with overflow).
- We keep MAX_PRODUCERS-1 slots open in the queue so that if multiple producers come and see one slot is available and proceed, they can all place their entries.
- It uses 32-bit quantities for head and tail, so that it can snapshot them with a 64-bit atomic read without a lock.
My test involves a couple of threads writing some known set of values to the queue, and a consumer thread polling (when the buffer is not empty) and summing all, and verifying the correct result is obtained. With 2 or more producers, I get inconsistent sums (and with 1 producer, it works).
Any help would be much appreciated. Thank you!
Here is the code:
struct ring_buf_entry {
uint32_t seqn;
};
struct __attribute__((packed, aligned(8))) ring_buf {
union {
struct {
volatile uint32_t tail;
volatile uint32_t head;
};
volatile uint64_t snapshot;
};
volatile struct ring_buf_entry buf[RING_BUF_SIZE];
};
#define RING_SUB(x,y) ((x)>=(y)?((x)-(y)):((x)+(1ULL<<32)-(y)))
static void ring_buf_push(struct ring_buf* rb, uint32_t seqn)
{
size_t pos;
while (1) {
// rely on aligned, packed, and no member-reordering properties
uint64_t snapshot = __atomic_load_n(&(rb->snapshot), __ATOMIC_SEQ_CST);
// little endian.
uint64_t snap_head = snapshot >> 32;
uint64_t snap_tail = snapshot & 0xffffffffULL;
if (RING_SUB(snap_tail, snap_head) < RING_BUF_SIZE - MAX_PRODUCERS + 1) {
uint32_t exp = snap_tail;
if (__atomic_compare_exchange_n(&(rb->tail), &exp, snap_tail+1, 0, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
pos = snap_tail;
break;
}
}
asm volatile("pause\n": : :"memory");
}
pos &= RING_BUF_SIZE-1;
rb->buf[pos].seqn = seqn;
asm volatile("sfence\n": : :"memory");
}
static struct ring_buf_entry ring_buf_poll(struct ring_buf* rb)
{
struct ring_buf_entry ret = rb->buf[__atomic_load_n(&(rb->head), __ATOMIC_SEQ_CST) & (RING_BUF_SIZE-1)];
__atomic_add_fetch(&(rb->head), 1, __ATOMIC_SEQ_CST);
return ret;
}
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论