如何在 C 中实现循环缓冲区?
我需要一个固定大小(在创建时可以在运行时选择,而不是编译时选择)的循环缓冲区,它可以容纳任何类型的对象,并且需要非常高性能。 我认为不会出现资源争用问题,因为尽管它处于多任务嵌入式环境中,但它是一个合作环境,因此任务本身可以管理它。
我最初的想法是在缓冲区中存储一个简单的结构,其中包含类型(简单枚举/定义)和指向有效负载的空指针,但我希望它尽可能快,所以我愿意接受涉及绕过的建议堆。
实际上,我很高兴绕过任何标准库来提高原始速度 - 从我所看到的代码来看,它并没有针对 CPU 进行大量优化:看起来他们只是为诸如 strcpy( )
等等,没有手工编码的程序集。
任何代码或想法将不胜感激。 所需的操作是:
- 创建一个特定大小的缓冲区。
- 放在尾巴上。
- 从头部得到。
- 返回计数。
- 删除一个缓冲区。
I have a need for a fixed-size (selectable at run-time when creating it, not compile-time) circular buffer which can hold objects of any type and it needs to be very high performance. I don't think there will be resource contention issues since, although it's in a multi-tasking embedded environment, it's a co-operative one so the tasks themselves can manage that.
My initial thought was to store a simple struct in the buffer which would contain the type (simple enum/define) and a void pointer to the payload but I want this to be as fast as possible so I'm open to suggestions that involve bypassing the heap.
Actually I'm happy to bypass any of the standard library for raw speed - from what I've seen of the code, it's not heavily optimized for the CPU : it looks like they just compiled C code for things like strcpy()
and such, there's no hand-coded assembly.
Any code or ideas would be greatly appreciated. The operations required are:
- create a buffer with specific size.
- put at the tail.
- get from the head.
- return the count.
- delete a buffer.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(9)
最简单的解决方案是跟踪项目大小和项目数量,然后创建适当字节数的缓冲区:
The simplest solution would be to keep track of the item size and the number of items, and then create a buffer of the appropriate number of bytes:
只要环形缓冲区的长度是 2 的幂,二进制“&”的速度就快得令人难以置信。 操作将为您环绕您的索引。
对于我的应用程序,我向用户显示从麦克风获取的音频环形缓冲区中的一段音频。
我始终确保屏幕上可以显示的最大音频量远小于环形缓冲区的大小。 否则,您可能会从同一个块中读取和写入。 这可能会给你带来奇怪的显示伪像。
As long as your ring buffer's length is a power of two, the incredibly fast binary "&" operation will wrap around your index for you.
For my application, I'm displaying a segment of audio to the user from a ring buffer of audio acquired from a microphone.
I always make sure that the maximum amount of audio that can be displayed on screen is much less than the size of the ring buffer. Otherwise you might be reading and writing from the same chunk. This would likely give you weird display artifacts.
您可以在对缓冲区进行编码时枚举所需的类型,还是需要能够在运行时通过动态调用添加类型? 如果是前者,那么我会将缓冲区创建为 n 个结构的堆分配数组,其中每个结构由两个元素组成:标识数据类型的枚举标记,以及所有数据类型的联合。 您在小元素的额外存储方面所损失的部分,可以通过不必处理分配/释放以及由此产生的内存碎片来弥补。 然后,您只需要跟踪定义缓冲区的头元素和尾元素的开始索引和结束索引,并确保在递增/递减索引时计算 mod n。
Can you enumerate the types needed at the time you code up the buffer, or do you need to be able to add types at run time via dynamic calls? If the former, then I would create the buffer as a heap-allocated array of n structs, where each struct consists of two elements: an enum tag identifying the data type, and a union of all the data types. What you lose in terms of extra storage for small elements, you make up in terms of not having to deal with allocation/deallocation and the resulting memory fragmentation. Then you just need to keep track of the start and end indices that define the head and tail elements of the buffer, and make sure to compute mod n when incrementing/decrementing the indices.
首先,标题。 如果您使用位整数来保存头部和头部,则不需要模算术来包装缓冲区。 尾“指针”,并调整它们的大小,使它们完全同步。 IE:4096 填充到 12 位无符号整数中本身就是 0,没有任何干扰。 消除模运算,即使是 2 的幂,速度也几乎完全加倍。
在我的第三代 i7 Dell XPS 8500 上,使用具有默认内联功能的 Visual Studio 2010 的 C++ 编译器,对任何类型的数据元素的 4096 缓冲区进行 1000 万次填充和排空迭代需要 52 秒,而处理数据只需要 1/8192 秒。
我会 RX 重写 main() 中的测试循环,这样它们就不再控制流程 - 这是而且应该由指示缓冲区已满或空的返回值以及随之而来的中断来控制; 声明。 即:填料和滤水器应该能够相互碰撞而不会损坏或不稳定。 在某些时候,我希望对该代码进行多线程处理,此时该行为将至关重要。
QUEUE_DESC(队列描述符)和初始化函数强制此代码中的所有缓冲区都是 2 的幂。否则上述方案将不起作用。 在讨论这个主题时,请注意 QUEUE_DESC 不是硬编码的,它使用一个清单常量 (#define BITS_ELE_KNT) 进行构造。 (我假设 2 的幂在这里已经足够灵活)
为了使缓冲区大小运行时可选,我尝试了不同的方法(此处未显示),并决定对能够管理 FIFO 的 Head、Tail、EleKnt 使用 USHRT缓冲区[USHRT]。 为了避免模运算,我创建了一个掩码来 && 与 Head, Tail,但该掩码结果是 (EleKnt -1),所以只需使用它。 在安静的机器上使用 USHRTS 代替位整数可提高约 15% 的性能。 英特尔 CPU 内核始终比其总线更快,因此在繁忙的共享计算机上,打包数据结构可以让您先于其他竞争线程加载和执行。 权衡。
请注意,缓冲区的实际存储空间是使用 calloc() 在堆上分配的,并且指针位于结构的基址,因此结构和指针具有完全相同的地址。 IE; 不需要将偏移量添加到结构地址来绑定寄存器。
同样,与缓冲区服务相关的所有变量在物理上都与缓冲区相邻,绑定到同一结构中,因此编译器可以编写漂亮的汇编语言。 您必须终止内联优化才能看到任何程序集,否则它会被压垮。
为了支持任何数据类型的多态性,我使用了 memcpy() 而不是赋值。 如果您只需要灵活地在每次编译时支持一种随机变量类型,那么此代码可以完美运行。
对于多态性,您只需要知道类型及其存储要求。 DATA_DESC 描述符数组提供了一种方法来跟踪放入 QUEUE_DESC.pBuffer 中的每个数据,以便可以正确检索它。 我只需分配足够的 pBuffer 内存来保存最大数据类型的所有元素,但跟踪给定数据在 DATA_DESC.dBytes 中实际使用了多少存储空间。 另一种选择是重新发明堆管理器。
这意味着 QUEUE_DESC 的 UCHAR *pBuffer 将有一个并行伴随数组来跟踪数据类型和大小,而 pBuffer 中数据的存储位置将保持不变。 新成员可能类似于 DATA_DESC *pDataDesc,或者可能是 DATA_DESC DataDesc[2^BITS_ELE_KNT](如果您能找到一种方法来使用此类前向引用来击败编译器提交)。 Calloc() 在这些情况下总是更加灵活。
您仍然可以在 Q_Put()、Q_Get 中使用 memcpy(),但实际复制的字节数将由 DATA_DESC.dBytes 确定,而不是 QUEUE_DESC.EleBytes。 对于任何给定的 put 或 get,这些元素可能具有不同的类型/大小。
我相信这段代码满足了速度和缓冲区大小的要求,并且可以满足6种不同数据类型的要求。 我以 printf() 语句的形式留下了许多测试装置,因此您可以确信(或不确信)代码正常工作。 随机数生成器演示了该代码适用于任何随机头/尾组合。
First, the headline. You don't need modulo arithmetic to wrap the buffer if you use bit ints to hold the head & tail "pointers", and size them so they are perfectly in synch. IE: 4096 stuffed into a 12-bit unsigned int is 0 all by itself, unmolested in any way. Eliminating modulo arithmetic, even for powers of 2, doubles the speed - almost exactly.
10 million iterations of filling and draining a 4096 buffer of any type of data elements takes 52 seconds on my 3rd Gen i7 Dell XPS 8500 using Visual Studio 2010's C++ compiler with default inlining, and 1/8192nd of that to service a datum.
I'd RX rewriting the test loops in main() so they no longer control the flow - which is, and should be, controlled by the return values indicating the buffer is full or empty, and the attendant break; statements. IE: the filler and drainer should be able to bang against each other without corruption or instability. At some point I hope to multi-thread this code, whereupon that behavior will be crucial.
The QUEUE_DESC (queue descriptor) and initialization function forces all buffers in this code to be a power of 2. The above scheme will NOT work otherwise. While on the subject, note that QUEUE_DESC is not hard-coded, it uses a manifest constant (#define BITS_ELE_KNT) for its construction. (I'm assuming a power of 2 is sufficient flexibility here)
To make the buffer size run-time selectable, I tried different approaches (not shown here), and settled on using USHRTs for Head, Tail, EleKnt capable of managing a FIFO buffer[USHRT]. To avoid modulo arithmetic I created a mask to && with Head, Tail, but that mask turns out to be (EleKnt -1), so just use that. Using USHRTS instead of bit ints increased performance ~ 15% on a quiet machine. Intel CPU cores have always been faster than their buses, so on a busy, shared machine, packing your data structures gets you loaded and executing ahead of other, competing threads. Trade-offs.
Note the actual storage for the buffer is allocated on the heap with calloc(), and the pointer is at the base of the struct, so the struct and the pointer have EXACTLY the same address. IE; no offset required to be added to the struct address to tie up registers.
In that same vein, all of the variables attendant with servicing the buffer are physically adjacent to the buffer, bound into the same struct, so the compiler can make beautiful assembly language. You'll have to kill the inline optimization to see any assembly, because otherwise it gets crushed into oblivion.
To support the polymorphism of any data type, I've used memcpy() instead of assignments. If you only need the flexibility to support one random variable type per compile, then this code works perfectly.
For polymorphism, you just need to know the type and it's storage requirement. The DATA_DESC array of descriptors provides a way to keep track of each datum that gets put in QUEUE_DESC.pBuffer so it can be retrieved properly. I'd just allocate enough pBuffer memory to hold all of the elements of the largest data type, but keep track of how much of that storage a given datum is actually using in DATA_DESC.dBytes. The alternative is to reinvent a heap manager.
This means QUEUE_DESC's UCHAR *pBuffer would have a parallel companion array to keep track of data type, and size, while a datum's storage location in pBuffer would remain just as it is now. The new member would be something like DATA_DESC *pDataDesc, or, perhaps, DATA_DESC DataDesc[2^BITS_ELE_KNT] if you can find a way to beat your compiler into submission with such a forward reference. Calloc() is always more flexible in these situations.
You'd still memcpy() in Q_Put(),Q_Get, but the number of bytes actually copied would be determined by DATA_DESC.dBytes, not QUEUE_DESC.EleBytes. The elements are potentially all of different types/sizes for any given put or get.
I believe this code satisfies the speed and buffer size requirements, and can be made to satisfy the requirement for 6 different data types. I've left the many test fixtures in, in the form of printf() statements, so you can satisfy yourself (or not) that the code works properly. The random number generator demonstrates that the code works for any random head/tail combo.
这是一个简单的 C 解决方案。假设每个函数的中断都被关闭。
无多态性& 东西,只是常识。
Here is a simple solution in C. Assume interrupts are turned off for each function.
No polymorphism & stuff, just common sense.
一个简单的实现可能包括:
每次写入数据时,都会前进写指针并递增计数器。 读取数据时,增加读取指针并减少计数器。 如果任一指针到达 n,则将其设置为零。
if counter = n 则无法写入。 如果 counter = 0,则无法读取。
A simple implementation could consist of:
Every time you write data, you advance the write pointer and increment the counter. When you read data, you increase the read pointer and decrement the counter. If either pointer reaches n, set it to zero.
You can't write if counter = n. You can't read if counter = 0.
C 风格,简单的整数环形缓冲区。 首先使用 init 而不是使用 put 和 get。 如果缓冲区不包含任何数据,则返回“0”零。
C style, simple ring buffer for integers. First use init than use put and get. If buffer does not contain any data it returns "0" zero.
@Adam Rosenfield 的解决方案虽然是正确的,但可以使用更轻量级的
circular_buffer
结构来实现不涉及count
和capacity
。该结构只能保存以下 4 个指针:
buffer
:指向内存中缓冲区的开头。buffer_end
:指向内存中缓冲区的末尾。head
:指向存储数据的末尾。tail
:指向存储数据的开始。我们可以保留 sz 属性以允许存储单元的参数化。
count
和capacity
值都应该可以使用上述指针导出。容量
capacity
很简单,因为它可以通过将buffer_end
指针和buffer
指针之间的距离除以存储单位来得出< code>sz (下面的片段是伪代码):Count
对于计数,事情会变得有点复杂。 例如,在
head
和tail
指向同一位置的情况下,无法确定缓冲区是空还是满。为了解决这个问题,缓冲区应该为附加元素分配内存。 例如,如果我们的循环缓冲区所需的容量是
10 * sz
,那么我们需要分配11 * sz
。容量公式将变为(下面的片段是伪代码):
这个额外的元素语义允许我们构造评估缓冲区是空还是满的条件。
空状态条件
为了使缓冲区为空,
head
指针指向与tail
指针相同的位置:如果以上计算结果为 true,则缓冲区为空。
满状态条件
为了使缓冲区满,
head
指针应位于tail
指针后面 1 个元素。 因此,从head
位置跳转到tail
位置所需覆盖的空间应等于1 * sz
。if
tail
大于head
:如果以上计算结果为 true,则缓冲区已满。
如果
head
大于tail
:buffer_end - head
返回从head
跳转到末尾的空间缓冲区。tail - buffer
返回从缓冲区开头跳转到“tail”所需的空间。head
跳转到tail
所需的空间1 * sz
如果以上计算结果为 true,则缓冲区已满。
在实践中
修改@Adam Rosenfield以使用上面的circular_buffer结构:
@Adam Rosenfield's solution, although correct, could be implemented with a more lightweight
circular_buffer
structure that does not involvecount
andcapacity
.The structure could only hold the following 4 pointers:
buffer
: Points to the start of the buffer in memory.buffer_end
: Points to the end of the buffer in memory.head
: Points to the end of stored data.tail
: Points to the start of stored data.We could keep the
sz
attribute to allow the parametrisation of the unit of storage.Both the
count
and thecapacity
values should be derive-able using the above pointers.Capacity
capacity
is straight forward, as it can be derived by dividing the distance between thebuffer_end
pointer and thebuffer
pointer by the unit of storagesz
(snippet below is pseudocode):Count
For count though, things get a bit more complicated. For example, there is no way to determine whether the buffer is empty or full, in the scenario of
head
andtail
pointing to the same location.To tackle that, the buffer should allocate memory for an additional element. For example, if the desired capacity of our circular buffer is
10 * sz
, then we need to allocate11 * sz
.Capacity formula will then become (snippet below is pseudocode):
This extra element semantic allows us to construct conditions that evaluate whether the buffer is empty or full.
Empty state conditions
In order for the buffer to be empty, the
head
pointer points to the same location as thetail
pointer:If the above evaluates to true, the buffer is empty.
Full state conditions
In order for the buffer to be full, the
head
pointer should be 1 element behind thetail
pointer. Thus, the space needed to cover in order to jump from thehead
location to thetail
location should be equal to1 * sz
.if
tail
is larger thanhead
:If the above evaluates to true, the buffer is full.
if
head
is larger thantail
:buffer_end - head
returns the space to jump from thehead
to the end of the buffer.tail - buffer
returns the space needed to jump from the start of the buffer to the `tail.head
to thetail
1 * sz
If the above evaluates to true, the buffer is full.
In practice
Modifying @Adam Rosenfield's to use the above
circular_buffer
structure:扩展 adam-rosenfield 的解决方案,我认为以下内容适用于多线程单生产者 - 单消费者场景。
Extending adam-rosenfield's solution, i think the following will work for multithreaded single producer - single consumer scenario.