寻找无锁RT安全的单读单写结构

发布于 2024-08-23 03:20:06 字数 587 浏览 5 评论 0原文

我正在寻找符合这些要求的无锁设计

  • 单个写入器写入结构,单个读取器从中读取结构(该结构已经存在,并且对于同时读/写是安全的),
  • 但在某些时候,该结构需要由编写者更改,然后初始化、切换并写入一个新的结构(相同类型但具有新内容),
  • 并且在下次读取时,它切换到这个新结构(如果写入者多次切换到新的无锁结构,则读取器会丢弃这些结构,忽略他们的数据)。
  • 这些结构必须被重用,即在写/读/切换操作期间不允许堆内存分配/释放,以用于RT目的

我目前已经实现了一个包含这些结构的多个实例的环形缓冲区;但是这个实现的问题是,当编写者使用了环形缓冲区中存在的所有结构时,没有更多的地方可以更改结构......但是环形缓冲区的其余部分包含一些不必读取的数据读者可以使用,但作者不能重复使用。因此,环形缓冲区不适合此目的。

任何想法(名称或伪实现)无锁设计?感谢您考虑这个问题。

I'm looking for a lock-free design conforming to these requisites:

  • a single writer writes into a structure and a single reader reads from this structure (this structure exists already and is safe for simultaneous read/write)
  • but at some time, the structure needs to be changed by the writer, which then initialises, switches and writes into a new structure (of the same type but with new content)
  • and at the next time the reader reads, it switches to this new structure (if the writer multiply switches to a new lock-free structure, the reader discards these structures, ignoring their data).
  • The structures must be reused, i.e. no heap memory allocation/free is allowed during write/read/switch operation, for RT purposes.

I have currently implemented a ringbuffer containing multiple instances of these structures; but this implementation suffers from the fact that when the writer has used all the structures present in the ringbuffer, there is no more place to change from structure... But the rest of the ringbuffer contains some data which don't have to be read by the reader but can't be re-used by the writer. As a consequence, the ringbuffer does not fit this purpose.

Any idea (name or pseudo-implementation) of a lock-free design? Thanks for having considered this problem.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

心凉怎暖 2024-08-30 03:20:06

这是一个。关键是存在三个缓冲区,并且读取器保留正在读取的缓冲区。写入器写入其他两个缓冲区之一。碰撞的风险很小。另外,这还可以扩展。只需使您的成员数组比读取者数量加上写入者数量长一个元素即可。

class RingBuffer
{
  RingBuffer():lastFullWrite(0)
  { 
    //Initialize the elements of dataBeingRead to false
    for(unsigned int i=0; i<DATA_COUNT; i++)
    {
      dataBeingRead[i] = false;
    } 
  }

  Data read()
  {
    // You may want to check to make sure write has been called once here
    // to prevent read from grabbing junk data. Else, initialize the elements
    // of dataArray to something valid.
    unsigned int indexToRead = lastFullWriteIndex;
    Data dataCopy;
    dataBeingRead[indexToRead] = true;
    dataCopy = dataArray[indexToRead];
    dataBeingRead[indexToRead] = false;
    return dataCopy;
  }

  void write( const Data& dataArg )
  {
    unsigned int writeIndex(0);

    //Search for an unused piece of data.
    // It's O(n), but plenty fast enough for small arrays.
    while( true == dataBeingRead[writeIndex] && writeIndex < DATA_COUNT )
    {
      writeIndex++;
    }  

    dataArray[writeIndex] = dataArg;

    lastFullWrite = &dataArray[writeIndex];
  }

private:
  static const unsigned int DATA_COUNT;
  unsigned int lastFullWrite;
  Data dataArray[DATA_COUNT];
  bool dataBeingRead[DATA_COUNT];
};

注意:这里写的方式是有两个副本来读取您的数据。如果通过引用参数将数据从读取函数中传递出来,则可以将其缩减为一份副本。

Here's one. The keys are that there are three buffers and the reader reserves the buffer it is reading from. The writer writes to one of the other two buffers. The risk of collision is minimal. Plus, this expands. Just make your member arrays one element longer than the number of readers plus the number of writers.

class RingBuffer
{
  RingBuffer():lastFullWrite(0)
  { 
    //Initialize the elements of dataBeingRead to false
    for(unsigned int i=0; i<DATA_COUNT; i++)
    {
      dataBeingRead[i] = false;
    } 
  }

  Data read()
  {
    // You may want to check to make sure write has been called once here
    // to prevent read from grabbing junk data. Else, initialize the elements
    // of dataArray to something valid.
    unsigned int indexToRead = lastFullWriteIndex;
    Data dataCopy;
    dataBeingRead[indexToRead] = true;
    dataCopy = dataArray[indexToRead];
    dataBeingRead[indexToRead] = false;
    return dataCopy;
  }

  void write( const Data& dataArg )
  {
    unsigned int writeIndex(0);

    //Search for an unused piece of data.
    // It's O(n), but plenty fast enough for small arrays.
    while( true == dataBeingRead[writeIndex] && writeIndex < DATA_COUNT )
    {
      writeIndex++;
    }  

    dataArray[writeIndex] = dataArg;

    lastFullWrite = &dataArray[writeIndex];
  }

private:
  static const unsigned int DATA_COUNT;
  unsigned int lastFullWrite;
  Data dataArray[DATA_COUNT];
  bool dataBeingRead[DATA_COUNT];
};

Note: The way it's written here, there are two copies to read your data. If you pass your data out of the read function through a reference argument, you can cut that down to one copy.

故事还在继续 2024-08-30 03:20:06

你走在正确的轨道上。

线程/进程/处理器之间固定消息的无锁通信
替代文本

如果存在一个生产者和一个消费者,固定大小环形缓冲区可用于线程、进程或处理器之间的无锁通信。需要执行的一些检查:

head 变量仅由生产者写入(作为写入后的原子操作)

tail 变量仅由消费者写入(作为读取后的原子操作)

:引入大小变量或缓冲区满/空标志;这些通常是由生产者和消费者共同编写的,因此会给您带来问题。

我通常为此目的使用环形缓冲区。我学到的最重要的教训是,环形缓冲区永远不能包含多个元素。这样,headtail 变量分别由生产者和消费者写入。

大/可变尺寸块的扩展
要在实时环境中使用缓冲区,您可以使用内存池(通常在实时操作系统中以优化形式提供)或将分配与使用分离。我相信后者符合这个问题。

扩展队列

如果您需要交换大块,我建议使用带有缓冲区块的池,并使用队列将指针传递给缓冲区。因此,使用带有缓冲区指针的第三个队列。这样,分配可以在应用程序(后台)中完成,并且您的实时部分可以访问可变数量的内存。

应用

while (blockQueue.full != true)
{
    buf = allocate block of memory from heap or buffer pool
    msg = { .... , buf };
    blockQueue.Put(msg)
}

Producer:
   pBuf = blockQueue.Get()
   pQueue.Put()

Consumer
   if (pQueue.Empty == false)
   {
      msg=pQueue.Get()
      // use info in msg, with buf pointer
      // optionally indicate that buf is no longer used
   }

You're on the right track.

Lock free communication of fixed messages between threads/processes/processors
alt text

fixed size ring buffers can be used in lock free communications between threads, processes or processors if there is one producer and one consumer. Some checks to perform:

head variable is written only by producer (as an atomic action after writing)

tail variable is written only by consumer (as an atomic action after reading)

Pitfall: introduction of a size variable or buffer full/empty flag; these are typically written by both producer and consumer and hence will give you an issue.

I generally use ring buffers for this purpoee. Most important lesson I've learned is that a ring buffer of can never contain more than elements. This way a head and tail variable are written by producer respectively consumer.

Extension for large/variable size blocks
To use buffers in a real time environment, you can either use memory pools (often available in optimized form in real time operating systems) or decouple allocation from usage. The latter fits to the question, I believe.

extended queue

If you need to exchange large blocks, I suggest to use a pool with buffer blocks and communicate pointers to buffers using a queue. So use a 3rd queue with buffer pointers. This way the allocates can be done in application (background) and you real time portion has access to a variable amount of memory.

Application

while (blockQueue.full != true)
{
    buf = allocate block of memory from heap or buffer pool
    msg = { .... , buf };
    blockQueue.Put(msg)
}

Producer:
   pBuf = blockQueue.Get()
   pQueue.Put()

Consumer
   if (pQueue.Empty == false)
   {
      msg=pQueue.Get()
      // use info in msg, with buf pointer
      // optionally indicate that buf is no longer used
   }
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文