C /C++无锁(或非阻塞)环形缓冲区覆盖最旧的数据?

发布于 2024-10-07 02:57:18 字数 1421 浏览 8 评论 0原文

我正在尝试找到一种方法,以无锁或非阻塞的方式为单个消费者/单个消费者创建环形缓冲区,该缓冲区将覆盖缓冲区中最旧的数据。我读过很多无锁算法,当缓冲区已满时“返回 false”时,这些算法就会起作用——即,不添加;但我什至找不到伪代码来说明当您需要覆盖最旧的数据时如何执行此操作。

我正在使用GCC 4.1.2(工作中的限制,我无法升级版本...)并且我有Boost库,过去我制作了自己的Atomic< T>变量类型非常接近即将到来的规范(它并不完美,但它是线程安全的并且可以满足我的需要)。

当我思考时,我认为使用这些原子确实应该解决这个问题。关于我的想法的一些粗略的伪代码:

template< typename T , unsigned int Size>
class RingBuffer {
private:
Atomic<unsigned int> readIndex;
Atomic<unsigned int> writeIndex;
enum Capacity { size = Size };
T* buf;

unsigned int getNextIndex(unsigned int i)
{
 return (i + 1 ) % size;
}

public:
RingBuffer() { //create array of size, set readIndex = writeIndex = 0 }
~RingBuffer() { //delete data }
void produce(const T& t)
{
 if(writeIndex == getNextIndex(readIndex)) //1
 {
  readIndex = getNextIndex(readIndex); //2
  }
  buf[writeIndex] = t;
  writeIndex = getNextIndex(writeIndex);  //3
}

bool consume(T& t)
{
  if(readIndex == writeIndex)  //4
   return false;
  t = buf[readIndex];  
  readIndex = getNexIndex(readIndex);  //5
  return true;
}

};

据我所知,这里不存在死锁情况,因此我们可以避免这种情况(如果我上面的实现即使在伪代码级别上也是错误的,请提出建设性批评总是受到赞赏)。 然而,我能找到的最大竞争条件是:

假设缓冲区已满。即writeIndex +1 = readIndex; (1) 发生,就像调用consume 一样。这是真的 (4) 为 false,因此我们从缓冲区中读取 (5) 发生,并且 readIndex 提前一位(因此实际上缓冲区中有空间 (2) 发生,再次推进 readIndex,从而丢失该值。

基本上,这是一个经典问题,作者必须修改读者,从而导致竞争条件。如果每次访问它时都没有实际阻止整个列表,我想不出一种方法来防止这种情况发生。我缺少什么?

I'm trying to find a way to make a Lock Free OR Non-blocking way to make a Ring Buffer for single consumer / single consumer that will over-write the oldest data int the buffer. I've read a lot of lock-free algorithms that work when you "return false" if the buffer is full--ie, don't add; but I can't find even pseudo-code that talks about how to do it when you need to overwrite the oldest data.

I am using GCC 4.1.2 (restriction at work, i can't upgrade the version...) and I have the Boost libraries, and in the past I made my own Atomic< T > variable type that follows pretty closely to the upcomming specification (its not perfect, but it is thread-safe and does what i need).

When I thought about it, I figured using these atomics should really take care of the problem. some rough psuedo-code as to what i was thinking:

template< typename T , unsigned int Size>
class RingBuffer {
private:
Atomic<unsigned int> readIndex;
Atomic<unsigned int> writeIndex;
enum Capacity { size = Size };
T* buf;

unsigned int getNextIndex(unsigned int i)
{
 return (i + 1 ) % size;
}

public:
RingBuffer() { //create array of size, set readIndex = writeIndex = 0 }
~RingBuffer() { //delete data }
void produce(const T& t)
{
 if(writeIndex == getNextIndex(readIndex)) //1
 {
  readIndex = getNextIndex(readIndex); //2
  }
  buf[writeIndex] = t;
  writeIndex = getNextIndex(writeIndex);  //3
}

bool consume(T& t)
{
  if(readIndex == writeIndex)  //4
   return false;
  t = buf[readIndex];  
  readIndex = getNexIndex(readIndex);  //5
  return true;
}

};

As far as I can tell, there is no deadlock situations here, so we're safe from that (If my implementation above is wrong even on its pseudo-code leve, constructive criticism is always appreciated).
However,the BIG race condition I can find is:

lets assume the buffer is full. that is, writeIndex +1 = readIndex;
(1) occurs, just as consume is being called. and is true
(4) is false, so we move to read from the buffer
(5) occurs, and the readIndex is advanced one (so there is, in fact, space in the buffer
(2) occurs, advancing readIndex AGAIN, thus LOSING the value.

Basically, its a classic problem of the writter must modify the reader, causing a race condition. Without actually blocking the entire list everytime I access it, I can't think of a way to prevent this from happening. What am I missing??

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

等风也等你 2024-10-14 02:57:18
  1. 从具有适当进度保证的单个生产者/多个消费者队列开始。
  2. 如果队列已满并且推送失败,则弹出一个值。那么就会有空间来推动新的价值。
  1. Start with a single producer/multiple consumer queue with appropriate progress guarantees.
  2. If the queue is full and the push would fail, pop one value. Then there will be space to push the new value.
ˇ宁静的妩媚 2024-10-14 02:57:18

我错过了什么?

很多:

  • 假设你在被生产者覆盖时消费了 - 你如何检测/处理它?
    • 许多选项 - 例如do {复制值;使用修改序列号等检查副本是否完整性。 } while (corrupt)
  • 使用原子序数的 } while (corrupt) 是不够的 - 您还需要使用 CAS 样式的循环来影响索引增量(尽管我确实假设您知道这一点,因为您说您已经对此进行了广泛的阅读)
  • 内存障碍

但是,让我们将其记为低于您的伪代码级别,并考虑您的明确问题:

  • 点(5)将实际上需要CAS操作。如果在复制(可能损坏的)t 之前,在 consume() 上正确采样/复制了 readIndex,那么如果 CAS 指令已经通过制片人。无需进行通常的重新采样和重试 CAS,只需继续即可。

What am I missing??

Lots:

  • say you consume a t while it's being overwritten by the producer - how're you detecting/handling that?
    • many options - e.g. do { copy the value out; check copy has integrity using modification sequence num etc. } while (corrupt)
  • using atomic numbers isn't enough - you also need to use CAS-style loops to affect the index increments (though I do assume you know that, given you say you've read extensively on this already)
  • memory barriers

But, let's write that off as being below your pseudo-code level, and consider your explicit question:

  • point (5) will actually require a CAS operation. If the readIndex was correctly sampled/copied atop consume() - before the (possibly corrupt) t was copied - then the CAS instruction will fail if it's already been incremented by the producer. Instead of the usual resample and retry CAS, just continue.
爱要勇敢去追 2024-10-14 02:57:18

这是我最近创建的原子变量循环缓冲区的代码。我已将其修改为“覆盖”数据而不是返回 false。
免责声明 - 尚未经过生产级测试。

    template<int capacity, int gap, typename T> class nonblockigcircular {
  /*
   * capacity - size of cicular buffer
   * gap - minimum safety distance between head and tail to start insertion operation
   *       generally gap should exceed number of threads attempting insertion concurrently 
   *       capacity should be gap plus desired buffer size 
   * T   - is a data type for buffer to keep
   */
  volatile T buf[capacity];  // buffer

  std::atomic<int> t, h, ph, wh; 
  /* t to h data available for reading
   * h to ph - zone where data is likely written but h is not updated yet
   *   to make sure data is written check if ph==wh 
   * ph to wh - zone where data changes in progress 
   */

  bool pop(T &pwk) {
    int td, tnd;

    do {
      int hd=h.load()%capacity;
      td=t.load()%capacity;
      if(hd==td) return false;
      tnd=(td+1)%capacity;
    } while(!t.compare_exchange_weak(td, tnd));

    pwk=buf[td];
    return true;
  }


  const int  count() {
    return ( h.load()+capacity-t.load() ) % capacity;
    }

  bool push(const T &pwk) {
    const int tt=t.load();
    int hd=h.load();

    if(  capacity - (hd+capacity-tt) % capacity < gap) {
       // Buffer is too full to insert
       // return false; 
       // or delete last record as below
       int nt=t.fetch_add(1);
       if(nt==capacity-1) t.fetch_sub(capacity);
       }


    int nwh=wh.fetch_add(1);
    if(nwh==capacity-1) wh.fetch_sub(capacity);

    buf[nwh%capacity]=pwk;

    int nph=ph.fetch_add(1);
    if(nph==capacity-1) ph.fetch_sub(capacity);

    if(nwh==nph) {
      int ohd=hd;
      while(! h.compare_exchange_weak(hd, nwh) ) {
        hd=h.load();
        if( (ohd+capacity-hd) % capacity > (ohd+capacity-nwh) % capacity ) break;
      }
    }
    return true;
  }

};

Here is a code of circular buffer on atomic variables I have recently created. I have modified it to "override" data instead of returning false.
Disclaimer - it is not production grade tested yet.

    template<int capacity, int gap, typename T> class nonblockigcircular {
  /*
   * capacity - size of cicular buffer
   * gap - minimum safety distance between head and tail to start insertion operation
   *       generally gap should exceed number of threads attempting insertion concurrently 
   *       capacity should be gap plus desired buffer size 
   * T   - is a data type for buffer to keep
   */
  volatile T buf[capacity];  // buffer

  std::atomic<int> t, h, ph, wh; 
  /* t to h data available for reading
   * h to ph - zone where data is likely written but h is not updated yet
   *   to make sure data is written check if ph==wh 
   * ph to wh - zone where data changes in progress 
   */

  bool pop(T &pwk) {
    int td, tnd;

    do {
      int hd=h.load()%capacity;
      td=t.load()%capacity;
      if(hd==td) return false;
      tnd=(td+1)%capacity;
    } while(!t.compare_exchange_weak(td, tnd));

    pwk=buf[td];
    return true;
  }


  const int  count() {
    return ( h.load()+capacity-t.load() ) % capacity;
    }

  bool push(const T &pwk) {
    const int tt=t.load();
    int hd=h.load();

    if(  capacity - (hd+capacity-tt) % capacity < gap) {
       // Buffer is too full to insert
       // return false; 
       // or delete last record as below
       int nt=t.fetch_add(1);
       if(nt==capacity-1) t.fetch_sub(capacity);
       }


    int nwh=wh.fetch_add(1);
    if(nwh==capacity-1) wh.fetch_sub(capacity);

    buf[nwh%capacity]=pwk;

    int nph=ph.fetch_add(1);
    if(nph==capacity-1) ph.fetch_sub(capacity);

    if(nwh==nph) {
      int ohd=hd;
      while(! h.compare_exchange_weak(hd, nwh) ) {
        hd=h.load();
        if( (ohd+capacity-hd) % capacity > (ohd+capacity-nwh) % capacity ) break;
      }
    }
    return true;
  }

};
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文