在我的MPSC环缓冲区中有种族条件

发布于 2025-02-12 10:44:55 字数 1909 浏览 0 评论 0原文

我试图为学习目的构建一个无锁的环形缓冲区,并陷入比赛条件。

MPSC环缓冲区的描述:

  • 可以保证,当缓冲区为空时,请勿调用Poll()。
  • 它不是像传统的环缓冲区那样模块的头部和尾巴,而是让它们线性地进行,并且在使用它们之前(由于缓冲区的大小为2的功率,因此可以与溢出在一起)。
  • 我们将Max_producers-1插槽保持在队列中,以便如果多个生产商来查看一个插槽并继续进行,他们都可以放置他们的条目。
  • 它使用32位的头和尾巴,以便可以用64位原子读取而无需锁定。

我的测试涉及几个线程,将一些已知值集的值编写到队列中,并且消费者线程进行了轮询(当缓冲区不为空时)并总结所有内容,并验证正确的结果。有2个或以上的生产商,我得到了不一致的款项(并且有1个生产商,它起作用)。

任何帮助都将不胜感激。谢谢你!

这是代码:

struct ring_buf_entry {
  uint32_t seqn;
};

struct __attribute__((packed, aligned(8))) ring_buf {
  union {
    struct {
      volatile uint32_t tail;
      volatile uint32_t head;
    };
    volatile uint64_t snapshot;
  };
  volatile struct ring_buf_entry buf[RING_BUF_SIZE];
};

#define RING_SUB(x,y) ((x)>=(y)?((x)-(y)):((x)+(1ULL<<32)-(y)))

static void ring_buf_push(struct ring_buf* rb, uint32_t seqn)
{
  size_t pos;
   
  while (1) {
    // rely on aligned, packed, and no member-reordering properties
    uint64_t snapshot = __atomic_load_n(&(rb->snapshot), __ATOMIC_SEQ_CST);
    // little endian.
    uint64_t snap_head = snapshot >> 32;
    uint64_t snap_tail = snapshot & 0xffffffffULL;

    if (RING_SUB(snap_tail, snap_head) < RING_BUF_SIZE - MAX_PRODUCERS + 1) {
      uint32_t exp = snap_tail;
      if (__atomic_compare_exchange_n(&(rb->tail), &exp, snap_tail+1, 0, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
        pos = snap_tail;
        break;
      }
    }

    asm volatile("pause\n": : :"memory");
  }

  pos &= RING_BUF_SIZE-1;

  rb->buf[pos].seqn = seqn;
    
  asm volatile("sfence\n": : :"memory");
}

static struct ring_buf_entry ring_buf_poll(struct ring_buf* rb)
{
  struct ring_buf_entry ret = rb->buf[__atomic_load_n(&(rb->head), __ATOMIC_SEQ_CST) & (RING_BUF_SIZE-1)];
  __atomic_add_fetch(&(rb->head), 1, __ATOMIC_SEQ_CST);
  return ret;
}

I was trying to build a MPSC lock-free ring buffer for learning purpose, and am running into race conditions.

A description of the MPSC ring buffer:

  • It is guaranteed that poll() is never called when the buffer is empty.
  • Instead of mod'ing head and tail like a traditional ring buffer, it lets them proceed linearly, and AND's them before using them (since the buffer size is a power of 2, this works ok with overflow).
  • We keep MAX_PRODUCERS-1 slots open in the queue so that if multiple producers come and see one slot is available and proceed, they can all place their entries.
  • It uses 32-bit quantities for head and tail, so that it can snapshot them with a 64-bit atomic read without a lock.

My test involves a couple of threads writing some known set of values to the queue, and a consumer thread polling (when the buffer is not empty) and summing all, and verifying the correct result is obtained. With 2 or more producers, I get inconsistent sums (and with 1 producer, it works).

Any help would be much appreciated. Thank you!

Here is the code:

struct ring_buf_entry {
  uint32_t seqn;
};

struct __attribute__((packed, aligned(8))) ring_buf {
  union {
    struct {
      volatile uint32_t tail;
      volatile uint32_t head;
    };
    volatile uint64_t snapshot;
  };
  volatile struct ring_buf_entry buf[RING_BUF_SIZE];
};

#define RING_SUB(x,y) ((x)>=(y)?((x)-(y)):((x)+(1ULL<<32)-(y)))

static void ring_buf_push(struct ring_buf* rb, uint32_t seqn)
{
  size_t pos;
   
  while (1) {
    // rely on aligned, packed, and no member-reordering properties
    uint64_t snapshot = __atomic_load_n(&(rb->snapshot), __ATOMIC_SEQ_CST);
    // little endian.
    uint64_t snap_head = snapshot >> 32;
    uint64_t snap_tail = snapshot & 0xffffffffULL;

    if (RING_SUB(snap_tail, snap_head) < RING_BUF_SIZE - MAX_PRODUCERS + 1) {
      uint32_t exp = snap_tail;
      if (__atomic_compare_exchange_n(&(rb->tail), &exp, snap_tail+1, 0, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
        pos = snap_tail;
        break;
      }
    }

    asm volatile("pause\n": : :"memory");
  }

  pos &= RING_BUF_SIZE-1;

  rb->buf[pos].seqn = seqn;
    
  asm volatile("sfence\n": : :"memory");
}

static struct ring_buf_entry ring_buf_poll(struct ring_buf* rb)
{
  struct ring_buf_entry ret = rb->buf[__atomic_load_n(&(rb->head), __ATOMIC_SEQ_CST) & (RING_BUF_SIZE-1)];
  __atomic_add_fetch(&(rb->head), 1, __ATOMIC_SEQ_CST);
  return ret;
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文