在单写入器多读取器线程中交换缓冲区

发布于 2024-12-10 12:46:24 字数 3941 浏览 1 评论 0原文

故事

有一个编写线程,定期从某个地方收集数据(实时,但这在问题中并不重要)。有很多读者正在阅读这些数据。通常的解决方案是使用两个读写器锁和两个缓冲区,如下所示:

Writer (case 1):
acquire lock 0                        
loop
    write to current buffer
    acquire other lock
    free this lock
    swap buffers
    wait for next period

或者

Writer (case 2):
acquire lock 0                        
loop
    acquire other lock
    free this lock
    swap buffers
    write to current buffer
    wait for next period

问题

在这两种方法中,如果获取其他锁操作失败,则不会完成交换,并且写入器将覆盖其先前的内容数据(因为写入器是实时的,它不能等待读取器)所以在这种情况下,所有读取器都会丢失该数据帧。

但这并不是什么大问题,读者是我自己的代码,而且它们很短,所以使用双缓冲区,这个问题就解决了,如果有问题,我可以将其设置为三缓冲区(或更多)。

问题是我想最小化延迟。想象一下情况 1:

writer writes to buffer0                reader is reading buffer1
writer can't acquire lock1              because reader is still reading buffer1
|                                       |
|                                       reader finishes reading,
| (writer waiting for next period)      <- **this point**
|
|
writer wakes up, and again writes to buffer0

在**此时**,如果只有写入器可以在读取器完成后进行交换而不是等待下一个周期,那么理论上其他读取器可以读取 buffer0 的数据。本例中发生的情况是,仅仅因为一个读者迟到了一点,所有读者就错过了一帧数据,而这个问题本来是可以完全避免的。

情况 2 类似:

writer writes to buffer0                reader is idle
|                                       |
|                                       reader finishes reading,
| (writer waiting for next period)
|
|                                       reader starts reading buffer1
writer wakes up                         |
it can't acquire lock1                  because reader is still reading buffer1
overwrites buffer0

我尝试混合解决方案,因此编写者在写入后立即尝试交换缓冲区,如果不可能,则在下一个周期醒来后立即尝试。所以是这样的:

Writer (case 3):
acquire lock 0                        
loop
    if last buffer swap failed
        acquire other lock
        free this lock
        swap buffers
    write to current buffer
    acquire other lock
    free this lock
    swap buffers
    wait for next period

现在延迟的问题仍然存在:

writer writes to buffer0                reader is reading buffer1
writer can't acquire lock1              because reader is still reading buffer1
|                                       |
|                                       reader finishes reading,
| (writer waiting for next period)      <- **this point**
|
|
writer wakes up
swaps buffers
writes to buffer1

再次在**此时**,所有读者都可以开始读取 buffer0,这是在 buffer0 之后的短暂延迟> 已经写好了,但是还要等到作者的下一期。

问题 问题

是,我该如何处理这个问题?如果我希望编写器在所需的周期精确执行,则需要使用 RTAI 函数等待该周期,而我不能像这样执行,

Writer (case 4):
acquire lock 0                        
loop
    write to current buffer
    loop a few times or until the buffer has been swapped
        sleep a little
        acquire other lock
        free this lock
        swap buffers
    wait for next period

这会引入抖动。因为“几次”可能会比“等待下一个周期”更长,因此作者可能会错过该周期的开始。

为了更清楚起见,这就是我想要发生的事情:

writer writes to buffer0                reader is reading buffer1
|                                       |
|                                       reader finishes reading,
| (writer waiting for next period)      As soon as all readers finish reading,
|                                         the buffer is swapped
|                                       readers start reading buffer0
writer wakes up                         |
writes to buffer1

我发现了什么

我已经发现阅读-copy-update 据我所知,它会不断为缓冲区分配内存并释放它们,直到读者使用完它们为止,这对我来说是不可能的,原因有很多。第一,线程在内核空间和用户空间之间共享。其次,使用 RTAI,您无法在实时线程中分配内存(因为这样您的线程将调用 Linux 的系统调用,从而破坏实时性!(更不用说使用 Linux 自己的 RCU 实现是无用的,因为出于同样的原因)

我还考虑过有一个额外的线程以更高的频率尝试交换缓冲区,但这听起来不是一个好主意,首先,它本身需要与编写器同步,其次,嗯。我有很多这样的编写器-读取器在不同部分并行工作,并且为每个编写器提供一个额外的线程,对于所有编写器来说一个线程似乎对于与每个编写器的同步来说非常复杂。

The Story

There is a writer thread, periodically gathering data from somewhere (in real-time, but that doesn't matter much in the question). There are many readers then reading from these data. The usual solution for this is with two reader-writer's lock and two buffers like this:

Writer (case 1):
acquire lock 0                        
loop
    write to current buffer
    acquire other lock
    free this lock
    swap buffers
    wait for next period

Or

Writer (case 2):
acquire lock 0                        
loop
    acquire other lock
    free this lock
    swap buffers
    write to current buffer
    wait for next period

The Problem

In both methods, if the acquire other lock operation fails, no swap is done and writer would overwrite its previous data (because writer is real-time, it can't wait for readers) So in this case, all readers would lose that frame of data.

This is not such a big deal though, the readers are my own code and they are short, so with double buffer, this problem is solved, and if there was a problem I could make it triple buffer (or more).

The problem is the delay that I want to minimize. Imagine case 1:

writer writes to buffer0                reader is reading buffer1
writer can't acquire lock1              because reader is still reading buffer1
|                                       |
|                                       reader finishes reading,
| (writer waiting for next period)      <- **this point**
|
|
writer wakes up, and again writes to buffer0

At **this point**, other readers in theory could have read data of buffer0 if only the writer could do the swap after the reader finishes instead of waiting for its next period. What happened in this case is that just because one reader was a bit late, all readers missed one frame of data, while the problem could have been totally avoided.

Case 2 is similar:

writer writes to buffer0                reader is idle
|                                       |
|                                       reader finishes reading,
| (writer waiting for next period)
|
|                                       reader starts reading buffer1
writer wakes up                         |
it can't acquire lock1                  because reader is still reading buffer1
overwrites buffer0

I tried mixing the solutions, so the writer tries swapping buffers immediately after writing, and if not possible, just after waking up in the next period. So something like this:

Writer (case 3):
acquire lock 0                        
loop
    if last buffer swap failed
        acquire other lock
        free this lock
        swap buffers
    write to current buffer
    acquire other lock
    free this lock
    swap buffers
    wait for next period

Now the problem with delay still holds:

writer writes to buffer0                reader is reading buffer1
writer can't acquire lock1              because reader is still reading buffer1
|                                       |
|                                       reader finishes reading,
| (writer waiting for next period)      <- **this point**
|
|
writer wakes up
swaps buffers
writes to buffer1

Again at **this point**, all the readers could start reading buffer0, which is a short delay after buffer0 has been written, but instead they have to wait until the next period of the writer.

The Question

The question is, how do I handle this? If I want the writer to execute precisely at desired period, it needs to wait for the period using RTAI function and I can't do it like

Writer (case 4):
acquire lock 0                        
loop
    write to current buffer
    loop a few times or until the buffer has been swapped
        sleep a little
        acquire other lock
        free this lock
        swap buffers
    wait for next period

This introduces jitter. because the "few times" could happen to become longer than the "wait for next period" so the writer might miss the start of its period.

Just to be more clear, here's what I want to happen:

writer writes to buffer0                reader is reading buffer1
|                                       |
|                                       reader finishes reading,
| (writer waiting for next period)      As soon as all readers finish reading,
|                                         the buffer is swapped
|                                       readers start reading buffer0
writer wakes up                         |
writes to buffer1

What I Found Already

I found read-copy-update which as far as I understood keeps allocating memory for buffers and frees them until the readers are done with them, which is impossible for me for many reasons. One, the threads are shared between kernel and user space. Second, with RTAI, you can't allocate memory in a real-time thread (because then your thread would be calling Linux's system calls and hence break the real-time-itivity! (Not to mention using Linux's own RCU implementation is useless due to the same reasons)

I also thought about having an extra thread that at a higher frequency tries swapping buffers, but that doesn't sound like such a good idea. First, it would itself need to synchronize with the writer, and second, well I have many of these writer-readers working in different parts in parallel and one extra thread for each writer just seems too much. One thread for all writers seems very complicated regarding synchronization with each writer.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(5

梦里南柯 2024-12-17 12:46:24

您使用什么 API 来实现读写锁?您是否有定时锁,例如 pthread_rwlock_timedwrlock ?如果是,我认为这是解决您的问题的方法,就像下面的代码一样:

void *buf[2];

void
writer ()
{
  int lock = 0, next = 1;

  write_lock (lock);
  while (1)
    {
      abs_time tm = now() + period;

      fill (buf [lock]);
      if (timed_write_lock (next, tm))
        {
          unlock (lock);
          lock = next;
          next = (next + 1) & 1;
        }
      wait_period (tm);
    }
}


void
reader ()
{
  int lock = 0;
  while (1)
    {
      reade_lock (lock);
      process (buf [lock]);
      unlock (lock);
      lock = (lock + 1) & 1;
    }
}

这里发生的情况是,对于编写者来说,等待锁定还是等待下一个周期并不重要,只要它是一定要在下一个经期到来之前醒来。绝对超时可以确保这一点。

What API are you using for reader-writer locks? Do you have a a timed lock, like pthread_rwlock_timedwrlock? If yes, I think the it's a solution to your problem, like in the following code:

void *buf[2];

void
writer ()
{
  int lock = 0, next = 1;

  write_lock (lock);
  while (1)
    {
      abs_time tm = now() + period;

      fill (buf [lock]);
      if (timed_write_lock (next, tm))
        {
          unlock (lock);
          lock = next;
          next = (next + 1) & 1;
        }
      wait_period (tm);
    }
}


void
reader ()
{
  int lock = 0;
  while (1)
    {
      reade_lock (lock);
      process (buf [lock]);
      unlock (lock);
      lock = (lock + 1) & 1;
    }
}

What happens here, is that it does not really matter for the writer whether it waits for a lock or for the next period, as long as it is sure to wake up before the next period has come. The absolute timeout ensures this.

橘寄 2024-12-17 12:46:24

这不正是三重缓冲要解决的问题吗?所以你有 3 个缓冲区,我们称它们为 write1、write2 和 read。写入线程在写入 write1 和 write2 之间交替,确保它们永远不会阻塞,并且最后一个完整帧始终可用。然后,在读取线程中,在某个适当的点(例如,在读取帧之前或之后),读取缓冲区将与可用的写入缓冲区进行翻转。

虽然这将确保写入者永远不会阻塞(只需翻转两个指针就可以非常快速地完成缓冲区翻转,甚至可能使用 CAS 原子而不是锁),但仍然存在读取器必须等待其他读取器完成的问题与翻转之前的读缓冲区。我想这可以通过一个读缓冲区池来稍微解决 RCU 式的问题,其中可以翻转可用的缓冲区。

Isn't this exactly the problem triple buffering is supposed to solve. So you have 3 buffers, lets call them write1, write2, and read. The write thread alternates between writing to write1 and write2, ensuring that they never block, and that the last complete frame is always available. Then in the read threads, at some appropriate point (say, just before or after reading a frame), the read buffer is flipped with the available write buffer.

While this would ensure that writers never block (the buffer flipping can be done very quickly just by flipping two pointers, perhaps even with a CAS atomic instead of a lock), there is still the issue of readers having to wait for other readers to finish with the read buffer before flipping. I suppose this could be solved slightly RCU-esque with a pool of read buffers where an available one can be flipped.

2024-12-17 12:46:24
  • 使用队列(先进先出链表)
  • 实时写入器总是追加(入队)到队列末尾
  • 读取器总是从队列开头移除(出队)
  • 如果队列为空,读取器将阻塞

编辑以避免动态分配

我可能会使用循环队列......
我会使用内置的 __sync 原子操作。
http://gcc.gnu.org /onlinedocs/gcc-4.1.0/gcc/Atomic-Builtins.html#Atomic-Builtins

  • 循环队列(FIFO 二维数组)
    • 例如:字节[][]数组=新字节[MAX_SIZE][BUFFER_SIZE];
    • 开始和结束索引指针
  • Writer 覆盖 Array[End][] 处的缓冲区
    • 如果 Start 完全循环结束,Writer 可以递增 Start
  • 如果Reader 从 Array[Start][] 获取缓冲区
    • 如果 Start == End,则读取器会阻塞
  • Use a Queue (FIFO linked list)
  • The real-time writer will always append (enqueue) to the end of the queue
  • The readers will always remove (dequeue) from the beginning of the queue
  • The readers will block if the queue is empty

edit to avoid dynamic allocation

I would probably use a circular queue...
I would use the built in __sync atomic operations.
http://gcc.gnu.org/onlinedocs/gcc-4.1.0/gcc/Atomic-Builtins.html#Atomic-Builtins

  • Circular queue (FIFO 2d array)
    • ex: byte[][] Array = new byte[MAX_SIZE][BUFFER_SIZE];
    • Start and End index pointers
  • Writer overwrites buffer at Array[End][]
    • Writer can increment Start if it ends up looping all the way around
  • Reader gets buffer from Array[Start][]
    • Reader blocks if Start == End
揽月 2024-12-17 12:46:24

如果您不希望写入者等待,也许它不应该获取其他人可能持有的锁。不过,我会让它执行某种同步,以确保它写入的内容确实被写出 - 通常,大多数同步调用将导致执行内存刷新或屏障指令,但详细信息将取决于内存模型你的CPU和你的线程包的实现。

我会看看是否有任何其他同步原语更适合事情,但如果推送到了紧要关头,我会让编写器锁定并解锁其他人从未使用过的锁。

然后,读者必须做好偶尔会错过一些内容的准备,并且必须能够发现自己何时错过了一些内容。我会将有效性标志和长序列计数与每个缓冲区相关联,并让编写者执行类似“清除有效性标志、递增序列计数、同步、写入缓冲区、递增序列计数、设置有效性标志、同步”之类的操作。如果读取器读取序列计数、同步、看到有效性标志为真、读出数据、同步并重新读取相同的序列计数,那么也许有希望它没有得到乱码数据。

如果您要这样做,我会进行详尽的测试。在我看来,这似乎是合理的,但它可能不适用于从编译器到内存模型的所有内容的特定实现。

另一种想法或检查此方法的一种方法是向缓冲区添加校验和并最后写入。

另请参阅有关无锁算法的搜索,例如 http://www.rossbencina.com/code/lockfree

为此,您可能需要一种方式让作者向熟睡的读者发出信号。您可以为此使用 Posix 信号量 - 例如,让读者要求编写者在特定信号量达到给定序列号或缓冲区有效时调用 sem_post()。

If you don't want the writer to wait, perhaps it shouldn't acquire a lock that anybody else might hold. I would have it perform some sort of synchronisation, though, to make sure that what it writes really is written out - typically, most synchronisation calls will cause a memory flush or barrier instruction to be executed, but the details will depend on the memory model of your cpu and the implementation of your threads package.

I would have a look to see if there is any other synchronisation primitive around that fits things better, but if push comes to shove I would have the writer lock and unlock a lock that nobody else ever uses.

Readers must then be prepared to miss things now and then, and must be able to detect when they have missed stuff. I would associate a validity flag and a long sequence count with each buffer, and have the writer do something like "clear validity flag, increment sequence count, sync, write to buffer, increment sequence count, set validity flag, sync." If the reader reads a sequence count, syncs, sees the validity flag true, reads the data out, syncs, and re-reads the same sequence count, then perhaps there is some hope that it did not get garbled data.

If you are going to do this, I would test it exhaustively. It looks plausible to me, but it might not work with your particular implementation of everything from compiler to memory model.

Another idea, or a way to check this one, is to add a checksum to your buffer and write it last of all.

See also searches on lock free algorithms such as http://www.rossbencina.com/code/lockfree

To go with this, you probably want a way for the writer to signal to sleeping readers. You might be able to use Posix semaphores for this - e.g. have the reader ask the writer to call sem_post() on a particular semaphore when it reaches a given sequence number, or when a buffer becomes valid.

猫烠⑼条掵仅有一顆心 2024-12-17 12:46:24

另一种选择是坚持锁定,但确保读取器不会长时间持有锁。读者可以通过在持有锁时不执行任何其他操作,而是从写入者的缓冲区复制数据来保持持有锁所花费的时间较短且可预测。唯一的问题是,低优先级读取器可能会在写入过程中被更高优先级任务打断,解决方法是 http://en.wikipedia.org/wiki/Priority_ceiling_protocol

鉴于此,如果写入线程具有高优先级,则每个缓冲区要完成的最坏情况工作是写入线程填充缓冲区,以及每个读取线程将数据从该缓冲区复制到另一个缓冲区。如果您可以在每个周期中负担得起,那么写入器线程和一定量的读取器数据复制将始终完成,而处理其复制的数据的读取器可能会或可能不会完成其工作。如果他们不这样做,他们就会落后,并且当他们下次抓住锁并环顾四周以查看他们想要复制哪个缓冲区时会注意到这一点。

FWIW,我阅读实时代码的经验(当需要证明错误存在时,而不是在我们的代码中)是,它是令人难以置信的和故意简单的,布局非常清晰,并且不一定比它更有效需要满足其最后期限,因此如果您负担得起的话,为了使锁定能够直接工作而进行一些显然毫无意义的数据复制可能是一笔不错的交易。

Another option is to stick with locking, but ensure that readers never hang too long holding a lock. Readers can keep the time taken holding a lock short and predictable by doing nothing else while they hold that lock but copying the data from the writer's buffer. The only problem then is that a low priority reader can be interrupted by a higher priority task halfway through a write, and the cure for that is http://en.wikipedia.org/wiki/Priority_ceiling_protocol.

Given this, if the writer thread has a high priority, the worst case work to be done per buffer is for the writer thread to fill the buffer and for each reader thread to copy the data out of that buffer to another buffer. If you can afford that in each cycle, then the writer thread and some amount of reader data copying will always be completed, while readers processing the data they have copied may or may not get their work done. If they do not, they will lag behind and will notice this when they next grab a lock and look round to see which buffer they want to copy.

FWIW, my experience with reading real time code (when required to show that the bugs are there, and not in our code) is that it is incredibly and deliberately simple-minded, very clearly laid out, and not necessarily any more efficient than it needs to be to meet its deadlines, so some apparently pointless data-copying in order to get straightforward locking to work might be a good deal, if you can afford it.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文