循环无锁缓冲区

发布于 2024-07-20 15:56:56 字数 1128 浏览 7 评论 0原文

我正在设计一种系统,该系统连接到一个或多个数据源流,并对数据进行一些分析,然后根据结果触发事件。 在典型的多线程生产者/消费者设置中,我将有多个生产者线程将数据放入队列,多个消费者线程读取数据,并且消费者只对最新数据点加上 n 个点感兴趣。 如果缓慢的消费者跟不上,生产者线程将不得不阻塞,当然,当没有未处理的更新时,消费者线程也会阻塞。 使用带有读取器/写入器锁的典型并发队列会很好地工作,但传入的数据率可能很大,因此我想减少锁定开销,尤其是生产者的写入器锁。 我认为我需要一个循环无锁缓冲区。

现在有两个问题:

  1. 循环无锁缓冲区是答案吗?

  2. 如果是这样,在我推出自己的版本之前,您知道有什么公共实现可以满足我的需要吗?

任何实现循环无锁缓冲区的指针总是受欢迎的。

顺便说一句,这是在 Linux 上用 C++ 实现的。

一些附加信息:

响应时间对于我的系统至关重要。 理想情况下,消费者线程希望尽快看到任何更新,因为额外的 1 毫秒延迟可能会使系统变得毫无价值,或者价值大大降低。

我倾向于的设计思想是半无锁循环缓冲区,其中生产者线程尽可能快地将数据放入缓冲区中,我们将缓冲区的头部称为A,除非缓冲区已满,否则不会阻塞,当A 与缓冲区 Z 的末尾相遇。每个消费者线程将保存两个指向循环缓冲区的指针,P 和 Pn,其中 P 是线程的本地缓冲区头,Pn > 是 P 之后的第 n 项。每个消费者线程一旦完成当前 P 的处理,就会将其 P 和 Pn 提前,并且缓冲区指针 Z 的末尾会以最慢的 Pn 提前>。 当 P 赶上 A 时,这意味着不再需要处理新的更新,消费者会旋转并忙于等待 A 再次前进。 如果消费者线程旋转时间太长,它可以进入睡眠状态并等待条件变量,但我可以接受消费者占用 CPU 周期等待更新,因为这不会增加我的延迟(我将拥有更多的 CPU 核心)比线程)。 想象一下,你有一个圆形轨道,生产者运行在一群消费者前面,关键是调整系统,使生产者通常只领先消费者几步,而这些操作中的大部分都可以使用无锁技术完成。 我知道正确实施的细节并不容易……好吧,非常困难,这就是为什么我想在犯一些自己的错误之前从别人的错误中吸取教训。

I'm in the process of designing a system which connects to one or more stream of data feeds and do some analysis on the data than trigger events based on the result. In a typical multi-threaded producer/consumer setup, I will have multiple producer threads putting data into a queue, and multiple consumer threads reading the data, and the consumers are only interested in the latest data point plus n number of points. The producer threads will have to block if slow consumer can not keep up, and of course consumer threads will block when there are no unprocessed updates. Using a typical concurrent queue with reader/writer lock will work nicely but the rate of data coming in could be huge, so i wanted to reduce my locking overhead especially writer locks for the producers. I think a circular lock-free buffer is what I needed.

Now two questions:

  1. Is circular lock-free buffer the answer?

  2. If so, before i roll my own, do you know any public implementation that will fit my need?

Any pointers in implementing a circular lock-free buffer are always welcome.

BTW, doing this in C++ on Linux.

Some additional info:

The response time is critical for my system. Ideally the consumer threads will want to see any updates coming in as soon as possible because an extra 1 millisecond delay could make the system worthless, or worth a lot less.

The design idea I'm leaning toward is a semi-lock-free circular buffer where the producer thread put data in the buffer as fast as it can, let's call the head of the buffer A, without blocking unless the buffer is full, when A meets the end of buffer Z. Consumer threads will each hold two pointers to the circular buffer, P and Pn, where P is the thread's local buffer head, and Pn is nth item after P. Each consumer thread will advance its P and Pn once it finish processing current P and the end of buffer pointer Z is advanced with the slowest Pn. When P catch up to A, which means no more new update to process, the consumer spins and do busy wait for A to advance again. If consumer thread spin for too long, it can be put to sleep and wait for a condition variable, but i'm okay with consumer taking up CPU cycle waiting for update because that does not increase my latency (I'll have more CPU cores than threads). Imagine you have a circular track, and the producer is running in front of a bunch of consumers, the key is to tune the system so that the producer is usually runing just a few step ahead of the consumers, and most of these operation can be done using lock-free techniques. I understand getting the details of the implementation right is not easy...okay, very hard, that's why I want to learn from others' mistakes before making a few of my own.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(18

甜味拾荒者 2024-07-27 15:56:56

在过去的几年里,我对无锁数据结构进行了专门研究。 我已经阅读了该领域的大部分论文(只有大约四十篇左右 - 尽管只有大约十到十五篇是真正有用的:-)

AFAIK,无锁循环缓冲区尚未发明。 问题将在于处理读者超越作者或反之亦然的复杂情况。

如果您还没有花费至少六个月的时间来学习无锁数据结构,请不要尝试自己编写一个。 您可能会出错,并且您可能不会意识到错误的存在,直到您的代码在新平台上部署后失败为止。

但我相信有一个解决方案可以满足您的要求。

您应该将无锁队列与无锁空闲列表配对。

空闲列表将为您提供预分配,从而消除对无锁分配器的(财政上昂贵的)要求; 当空闲列表为空时,您可以通过立即从队列中出列一个元素并使用该元素来复制循环缓冲区的行为。

(当然,在基于锁的循环缓冲区中,一旦获得锁,获取元素非常快 - 基本上只是指针取消引用 - 但在任何无锁算法中都不会得到这一点;它们通常必须去完全不按他们的方式做事;自由列表弹出失败然后出队的开销与任何无锁算法需要做的工作量相当)。

Michael 和 Scott 早在 1996 年就开发了一个非常好的无锁队列。下面的链接将为您提供足够的详细信息来追踪他们论文的 PDF; Michael 和 Scott,先进先出

无锁空闲列表是最简单的无锁算法,事实上我认为我还没有见过关于它的实际论文。

I've made a particular study of lock-free data structures in the last couple of years. I've read most of the papers in the field (there's only about fourty or so - although only about ten or fifteen are any real use :-)

AFAIK, a lock-free circular buffer has not been invented. The problem will be dealing with the complex condition where a reader overtakes a writer or vis-versa.

If you have not spent at least six months studying lock-free data structures, do not attempt to write one yourself. You will get it wrong and it may not be obvious to you that errors exist, until your code fails, after deployment, on new platforms.

I believe however there is a solution to your requirement.

You should pair a lock-free queue with a lock-free free-list.

The free-list will give you pre-allocation and so obviate the (fiscally expensive) requirement for a lock-free allocator; when the free-list is empty, you replicate the behaviour of a circular buffer by instantly dequeuing an element from the queue and using that instead.

(Of course, in a lock-based circular buffer, once the lock is obtained, obtaining an element is very quick - basically just a pointer dereference - but you won't get that in any lock-free algorithm; they often have to go well out of their way to do things; the overhead of failing a free-list pop followed by a dequeue is on a par with the amount of work any lock-free algorithm will need to be doing).

Michael and Scott developed a really good lock-free queue back in 1996. A link below will give you enough details to track down the PDF of their paper; Michael and Scott, FIFO

A lock-free free-list is the simplest lock-free algorithm and in fact I don't think I've seen an actual paper for it.

安稳善良 2024-07-27 15:56:56

您想要的艺术术语是无锁队列。 Ross Bencina 有一套优秀的注释,其中包含代码和论文的链接。 我最信任的人是Maurice Herlihy(对于美国人来说,他宣称他的名字如“莫里斯”)。

The term of art for what you want is a lock-free queue. There's an excellent set of notes with links to code and papers by Ross Bencina. The guy whose work I trust the most is Maurice Herlihy (for Americans, he pronounces his first name like "Morris").

甚是思念 2024-07-27 15:56:56

如果缓冲区为空或已满,生产者或消费者将阻塞的要求表明您应该使用普通的锁定数据结构,并使用信号量或条件变量来使生产者和消费者阻塞,直到数据可用。 无锁代码通常不会在这种情况下阻塞 - 它会旋转或放弃无法完成的操作,而不是使用操作系统阻塞。 (如果您可以等到另一个线程生成或消耗数据,那么为什么还要等待另一个线程完成数据结构更新呢?)

在 (x86/x64) Linux 上,使用互斥体进行线程内同步是如果没有争议的话相当便宜。 集中精力最大限度地减少生产者和消费者持有锁所需的时间。 鉴于您说过您只关心最后 N 个记录的数据点,我认为循环缓冲区可以很好地做到这一点。 但是,我不太明白这如何符合阻塞要求以及消费者实际消费(删除)他们读取的数据的想法。 (你是否希望消费者只查看最后 N 个数据点,而不删除它们?你是否希望生产者不关心消费者是否跟不上,而只是覆盖旧数据?)

此外,正如 Zan Lynx 评论的那样,你当您收到大量数据时,可以将数据聚合/缓冲为更大的块。您可以缓冲固定数量的点,或在一定时间内收到的所有数据。 这意味着同步操作将会减少。 不过,它确实会带来延迟,但如果您不使用实时 Linux,那么无论如何您都必须在一定程度上处理这个问题。

The requirement that producers or consumers block if the buffer is empty or full suggests that you should use a normal locking data structure, with semaphores or condition variables to make the producers and consumers block until data is available. Lock-free code generally doesn't block on such conditions - it spins or abandons operations that can't be done instead of blocking using the OS. (If you can afford to wait until another thread produces or consumes data, then why is waiting on a lock for another thread to finish updating the data structure any worse?)

On (x86/x64) Linux, intra-thread synchronization using mutexes is reasonably cheap if there is no contention. Concentrate on minimizing the time that the producers and consumers need to hold onto their locks. Given that you've said that you only care about the last N recorded data points, I think a circular buffer would be do this reasonably well. However, I don't really understand how this fits in with the blocking requirement and the idea of consumers actually consuming (removing) the data they read. (Do you want consumers to only look at the last N data points, and not remove them? Do you want producers to not care if consumers can't keep up, and just overwrite old data?)

Also, as Zan Lynx commented, you can aggregate/buffer up your data into bigger chunks when you've got lots of it coming in. You could buffer up a fixed number of points, or all the data received within a certain amount of time. This means that there will be fewer synchronization operations. It does introduce latency, though, but if you're not using real-time Linux, then you'll have to deal with that to an extent anyway.

任性一次 2024-07-27 15:56:56

boost库中的实现值得考虑。 它易于使用且性能相当高。 我写了一个测试& 在四核 i7 笔记本电脑(8 线程)上运行它,每秒获得约 4M 入队/出队操作。 到目前为止还没有提到的另一个实现是 MPMC 队列,位于 http: //moodycamel.com/blog/2014/detailed-design-of-a-lock-free-queue。 我在同一台笔记本电脑上对这个实现进行了一些简单的测试,有 32 个生产者和 32 个消费者。 正如所宣传的那样,它比 boost 无锁队列更快。

由于大多数其他答案都表明无锁编程很困难。 大多数实现都很难检测需要大量测试和测试的极端情况。 调试修复。 这些通常可以通过在代码中仔细放置内存屏障来修复。 您还可以在许多学术文章中找到正确性的证明。 我更喜欢使用强力工具来测试这些实现。 您计划在生产中使用的任何无锁算法都应使用诸如 http://research.microsoft.com/en-us/um/people/lamport/tla/tla.html

The implementation in the boost library is worth considering. It's easy to use and fairly high performance. I wrote a test & ran it on a quad core i7 laptop (8 threads) and get ~4M enqueue/dequeue operations a second. Another implementation not mentioned so far is the MPMC queue at http://moodycamel.com/blog/2014/detailed-design-of-a-lock-free-queue. I have done some simple testing with this implementation on the same laptop with 32 producers and 32 consumers. It is, as advertised, faster that the boost lockless queue.

As most of the other answers state lockless programming is hard. Most implementations will have hard to detect corner cases that take a lot of testing & debugging to fix. These are typically fixed with careful placement of memory barriers in the code. You will also find proofs of correctness published in many of the academic articles. I prefer testing these implementations with a brute force tool. Any lockless algorithm you plan on using in production should be checked for correctness using a tool like http://research.microsoft.com/en-us/um/people/lamport/tla/tla.html.

我早已燃尽 2024-07-27 15:56:56

DDJ 上有一系列关于此的相当不错的文章。 作为这个东西有多困难的标志,这是对 之前的一篇文章有误。 在你自己动手之前,确保你理解了错误)-;

There is a pretty good series of articles about this on DDJ. As a sign of how difficult this stuff can be, it's a correction on an earlier article that got it wrong. Make sure you understand the mistakes before you roll your own )-;

⊕婉儿 2024-07-27 15:56:56

减少争用的一种有用技术是将项目散列到多个队列中,并让每个消费者专用于一个“主题”。

对于您的消费者感兴趣的最新数量的项目 - 您不想锁定整个队列并迭代它以查找要覆盖的项目 - 只需发布 N 元组中的项目,即所有 N 个最近的项目。 实现的奖励点是,生产者会在超时的情况下阻塞整个队列(当消费者无法跟上时),更新其本地元组缓存 - 这样就不会对数据源施加背压。

One useful technique to reduce contention is to hash the items into multiple queues and have each consumer dedicated to a "topic".

For most-recent number of items your consumers are interested in - you don't want to lock the whole queue and iterate over it to find an item to override - just publish items in N-tuples, i.e. all N recent items. Bonus points for implementation where producer would block on the full queue (when consumers can't keep up) with a timeout, updating its local tuple cache - that way you don't put back-pressure on the data source.

征﹌骨岁月お 2024-07-27 15:56:56

萨特的队列不是最优的,他知道这一点。 《多核编程的艺术》是一本很好的参考书,但不要相信 Java 人员的内存模型。 罗斯的链接不会给你明确的答案,因为他们的图书馆有这样的问题等等。

进行无锁编程就是自找麻烦,除非你想在解决问题之前花费大量时间在明显过度设计的东西上(从它的描述来看,这是一种“追求完美”的常见疯狂行为) ' 在缓存一致性中)。 需要花费数年时间,导致不先解决问题,然后再优化,这是一种常见病。

Sutter's queue is sub-optimal and he knows it. The Art of Multicore programming is a great reference but don't trust the Java guys on memory models, period. Ross's links will get you no definite answer because they had their libraries in such problems and so on.

Doing lock-free programming is asking for trouble, unless you want to spend a lot of time on something that you are clearly over-engineering before solving the problem (judging by the description of it, it is a common madness of 'looking for perfection' in cache coherency). It takes years and leads to not solving the problems first and optimising later, a common disease.

梦行七里 2024-07-27 15:56:56

我不是硬件内存模型和无锁数据结构方面的专家,我倾向于避免在我的项目中使用它们,而我会使用传统的锁定数据结构。

不过我最近注意到了那个视频:
基于环形缓冲区的无锁 SPSC 队列< /a>

这是基于交易系统使用的名为 LMAX disruptor 的开源高性能 Java 库: LMAX Disruptor

基于上面的演示,您可以使头指针和尾指针原子化,并原子地检查头从后面捕获尾部的情况,反之亦然。

下面您可以看到它的一个非常基本的 C++11 实现:

// USING SEQUENTIAL MEMORY
#include<thread>
#include<atomic>
#include <cinttypes>
using namespace std;

#define RING_BUFFER_SIZE 1024  // power of 2 for efficient %
class lockless_ring_buffer_spsc
{
    public :

        lockless_ring_buffer_spsc()
        {
            write.store(0);
            read.store(0);
        }

        bool try_push(int64_t val)
        {
            const auto current_tail = write.load();
            const auto next_tail = increment(current_tail);
            if (next_tail != read.load())
            {
                buffer[current_tail] = val;
                write.store(next_tail);
                return true;
            }

            return false;  
        }

        void push(int64_t val)
        {
            while( ! try_push(val) );
            // TODO: exponential backoff / sleep
        }

        bool try_pop(int64_t* pval)
        {
            auto currentHead = read.load();

            if (currentHead == write.load())
            {
                return false;
            }

            *pval = buffer[currentHead];
            read.store(increment(currentHead));

            return true;
        }

        int64_t pop()
        {
            int64_t ret;
            while( ! try_pop(&ret) );
            // TODO: exponential backoff / sleep
            return ret;
        }

    private :
        std::atomic<int64_t> write;
        std::atomic<int64_t> read;
        static const int64_t size = RING_BUFFER_SIZE;
        int64_t buffer[RING_BUFFER_SIZE];

        int64_t increment(int n)
        {
            return (n + 1) % size;
        }
};

int main (int argc, char** argv)
{
    lockless_ring_buffer_spsc queue;

    std::thread write_thread( [&] () {
             for(int i = 0; i<1000000; i++)
             {
                    queue.push(i);
             }
         }  // End of lambda expression
                                                );
    std::thread read_thread( [&] () {
             for(int i = 0; i<1000000; i++)
             {
                    queue.pop();
             }
         }  // End of lambda expression
                                                );
    write_thread.join();
    read_thread.join();

     return 0;
}

I am not expert of hardware memory models and lock free data structures and I tend to avoid using those in my projects and I go with traditional locked data structures.

However I`ve recently noticed that video :
Lockless SPSC queue based on ring buffer

This is based on an open source high performance Java library called LMAX distruptor used by a trading system : LMAX Distruptor

Based on the presentation above, you make head and tail pointers atomic and atomically check for the condition where head catches tail from behind or vice versa.

Below you can see a very basic C++11 implementation for it :

// USING SEQUENTIAL MEMORY
#include<thread>
#include<atomic>
#include <cinttypes>
using namespace std;

#define RING_BUFFER_SIZE 1024  // power of 2 for efficient %
class lockless_ring_buffer_spsc
{
    public :

        lockless_ring_buffer_spsc()
        {
            write.store(0);
            read.store(0);
        }

        bool try_push(int64_t val)
        {
            const auto current_tail = write.load();
            const auto next_tail = increment(current_tail);
            if (next_tail != read.load())
            {
                buffer[current_tail] = val;
                write.store(next_tail);
                return true;
            }

            return false;  
        }

        void push(int64_t val)
        {
            while( ! try_push(val) );
            // TODO: exponential backoff / sleep
        }

        bool try_pop(int64_t* pval)
        {
            auto currentHead = read.load();

            if (currentHead == write.load())
            {
                return false;
            }

            *pval = buffer[currentHead];
            read.store(increment(currentHead));

            return true;
        }

        int64_t pop()
        {
            int64_t ret;
            while( ! try_pop(&ret) );
            // TODO: exponential backoff / sleep
            return ret;
        }

    private :
        std::atomic<int64_t> write;
        std::atomic<int64_t> read;
        static const int64_t size = RING_BUFFER_SIZE;
        int64_t buffer[RING_BUFFER_SIZE];

        int64_t increment(int n)
        {
            return (n + 1) % size;
        }
};

int main (int argc, char** argv)
{
    lockless_ring_buffer_spsc queue;

    std::thread write_thread( [&] () {
             for(int i = 0; i<1000000; i++)
             {
                    queue.push(i);
             }
         }  // End of lambda expression
                                                );
    std::thread read_thread( [&] () {
             for(int i = 0; i<1000000; i++)
             {
                    queue.pop();
             }
         }  // End of lambda expression
                                                );
    write_thread.join();
    read_thread.join();

     return 0;
}
水染的天色ゝ 2024-07-27 15:56:56

I would agree with this article and recommend against using lock-free data structures. A relatively recent paper on lock-free fifo queues is this, search for further papers by the same author(s); there's also a PhD thesis on Chalmers regarding lock-free data structures (I lost the link). However, you did not say how large your elements are -- lock-free data structures work efficiently only with word-sized items, so you'll have to dynamically allocate your elements if they're larger than a machine word (32 or 64 bits). If you dynamically allocate elements, you shift the (supposed, since you haven't profiled your program and you're basically doing premature optimization) bottleneck to memory allocator, so you need a lock-free memory allocator, e.g., Streamflow, and integrate it with your application.

爱的故事 2024-07-27 15:56:56

这是一个旧线程,但由于尚未提及,因此 - 有一个无锁、循环、1 个生产者 -> 1 个消费者,JUCE C++ 框架中提供 FIFO。

https://www.juce.com/doc/classAbstractFifo#details

This is an old thread, but since it hasn't been mentioned, yet - there is a lock-free, circular, 1 producer -> 1 consumer, FIFO available in the JUCE C++ framework.

https://www.juce.com/doc/classAbstractFifo#details

感性不性感 2024-07-27 15:56:56

虽然这是一个老问题,但没有人提到 DPDK 的无锁环形缓冲区。 它是一个高吞吐量环形缓冲区,支持多个生产者和多个消费者。 它还提供单消费者和单生产者模式,并且环形缓冲区在SPSC模式下是无等待的。 它是用 C 语言编写的,支持多种架构。

此外,它还支持批量和突发模式,其中项目可以批量入队/出队。 该设计允许多个消费者或多个生产者通过简单地通过移动原子指针保留空间来同时写入队列。

Although this is an old question, no one mentioned DPDK's lockless ring buffer. It's a high throughput ring buffer that supports multiple producers and multiple consumers. It also provides single consumer and single producer modes, and the ring buffer is wait-free in SPSC mode. It's written in C and supports multiple architectures.

In addition, it supports Bulk and Burst modes where items can be enqueued/dequeued in bulk. The design let's multiple consumers or multiple producers write to the queue at the same time by simple reserving the space through moving an atomic pointer.

心欲静而疯不止 2024-07-27 15:56:56

不久前,我发现了这个问题的一个很好的解决方案。 我相信这是迄今为止发现的最小的。

存储库有一个示例,说明如何使用它创建 N 个线程(读取器和写入器),然后共享一个席位。

我在测试示例上做了一些基准测试,并得到了以下结果(以百万次操作/秒为单位):

按缓冲区大小

throughput

按线程数

在此处输入图像描述

请注意线程数不会改变吞吐量。

我认为这是这个问题的最终解决方案。 它可以工作,并且非常快速和简单。 即使有数百个线程和单个位置的队列。 它可以用作线程之间的管道,在队列内分配空间。

你能打破它吗?

Sometime ago, I've found a nice solution to this problem. I believe that it the smallest found so far.

The repository has a example of how use it to create N threads (readers and writers) and make then share a single seat.

I made some benchmarks, on the test example and got the following results (in million ops/sec) :

By buffer size

throughput

By number of threads

enter image description here

Notice how the number of threads do not change the throughput.

I think this is the ultimate solution to this problem. It works and is incredible fast and simple. Even with hundreds of threads and a queue of a single position. It can be used as a pipeline beween threads, allocating space inside the queue.

Can you break it?

最近可好 2024-07-27 15:56:56

查看 Disruptor (如何使用它),这是一个多个线程可以订阅的环形缓冲区:

Check out Disruptor (How to use it) which is a ring-buffer that multiple threads can subscribe to:

遇见了你 2024-07-27 15:56:56

我的做法如下:

  • 将队列映射到数组中,
  • 并在下一次读取和下一次写入索引
  • 周围保留一个空的完整位向量。

插入包括使用带有增量的 CAS,并在下一次写入时滚动。 一旦你有了一个槽,添加你的值,然后设置与其匹配的空/满位。

删除需要在测试下溢之前检查该位,但除此之外,与写入相同,但使用读取索引并清除空/满位。

请注意,

  1. 我不是这些方面的专家
  2. 当我使用它们时,原子 ASM 操作似乎非常慢,因此如果您最终使用了多个原子 ASM 操作,您可能会更快使用嵌入在插入/删除函数中的锁。 理论上来说,一个获取锁的原子操作,然后执行(非常)少量的非原子 ASM 操作,可能会比多个原子操作完成相同的操作更快。 但要完成这项工作,需要手动或自动内联,因此这只是 ASM 的一小部分。

Here is how I would do it:

  • map the queue into an array
  • keep state with a next read and next next write indexes
  • keep an empty full bit vector around

Insertion consists of using a CAS with an increment and roll over on the next write. Once you have a a slot, add your value and then set the empty/full bit that matches it.

Removals require a check of the bit before to test on underflows but other than that, are the same as for the write but using read index and clearing the empty/full bit.

Be warned,

  1. I'm no expert in these things
  2. atomic ASM ops seem to be very slow when I've used them so if you end up with more than a few of them, you might be faster to use locks embedded inside the insert/remove functions. The theory is that a single atomic op to grab the lock followed by (very) few non atomic ASM ops might be faster than the same thing done by several atomic ops. But to make this work would require manual or automatic inlineing so it's all one short block of ASM.
追星践月 2024-07-27 15:56:56

只是为了完整性:OtlContainers 中有经过充分测试的无锁循环缓冲区,但它是用Delphi编写的(TOmniBaseBoundedQueue是循环缓冲区,TOmniBaseBoundedStack是有界堆栈)。 同一单元中还有一个无界队列(TOmniBaseQueue)。 无界队列在 中描述动态无锁队列——正确执行。 有界队列(循环缓冲区)的初始实现在 终于实现了无锁队列! 但此后代码已更新。

Just for completeness: there's well tested lock-free circular buffer in OtlContainers, but it is written in Delphi (TOmniBaseBoundedQueue is circular buffer and TOmniBaseBoundedStack is bounded stack). There's also an unbounded queue in the same unit (TOmniBaseQueue). The unbounded queue is described in Dynamic lock-free queue – doing it right. The initial implementation of the bounded queue (circular buffer) was described in A lock-free queue, finally! but the code was updated since then.

眼睛会笑 2024-07-27 15:56:56

你可以尝试 lfqueue

使用简单,循环设计无锁

int *ret;

lfqueue_t results;

lfqueue_init(&results);

/** Wrap This scope in multithread testing **/
int_data = (int*) malloc(sizeof(int));
assert(int_data != NULL);
*int_data = i++;
/*Enqueue*/
while (lfqueue_enq(&results, int_data) != 1) ;

/*Dequeue*/
while ( (ret = lfqueue_deq(&results)) == NULL);

// printf("%d\n", *(int*) ret );
free(ret);
/** End **/

lfqueue_clear(&results);

You may try lfqueue

It is simple to use, it is circular design lock free

int *ret;

lfqueue_t results;

lfqueue_init(&results);

/** Wrap This scope in multithread testing **/
int_data = (int*) malloc(sizeof(int));
assert(int_data != NULL);
*int_data = i++;
/*Enqueue*/
while (lfqueue_enq(&results, int_data) != 1) ;

/*Dequeue*/
while ( (ret = lfqueue_deq(&results)) == NULL);

// printf("%d\n", *(int*) ret );
free(ret);
/** End **/

lfqueue_clear(&results);
我乃一代侩神 2024-07-27 15:56:56

在某些情况下,您不需要锁定来防止竞争条件,特别是当您只有一个生产者和消费者时。

考虑 LDD3 中的这一段:

如果仔细实现,循环缓冲区在没有多个生产者或消费者的情况下不需要锁定。 生产者是唯一允许修改写入索引及其指向的数组位置的线程。 只要写入者在更新写入索引之前将新值存储到缓冲区中,读取器将始终看到一致的视图。 反过来,读取器是唯一可以访问读取索引及其指向的值的线程。 只要小心确保两个指针不会互相溢出,生产者和消费者就可以同时访问缓冲区,而不会出现竞争条件。

There are situations that you don't need locking to prevent race condition, especially when you have only one producer and consumer.

Consider this paragraph from LDD3:

When carefully implemented, a circular buffer requires no locking in the absence of multiple producers or consumers. The producer is the only thread that is allowed to modify the write index and the array location it points to. As long as the writer stores a new value into the buffer before updating the write index, the reader will always see a consistent view. The reader, in turn, is the only thread that can access the read index and the value it points to. With a bit of care to ensure that the two pointers do not overrun each other, the producer and the consumer can access the buffer concurrently with no race conditions.

掐死时间 2024-07-27 15:56:56

如果您以缓冲区永远不会变满为先决条件,请考虑使用此无锁算法:

capacity must be a power of 2
buffer = new T[capacity] ~ on different cache line
mask = capacity - 1
write_index ~ on different cache line
read_index ~ on different cache line

enqueue:
    write_i = write_index.fetch_add(1) & mask
    buffer[write_i] = element ~ release store

dequeue:
    read_i = read_index.fetch_add(1) & mask
    element
    while ((element = buffer[read_i] ~ acquire load) == NULL) {
        spin loop
    }
    buffer[read_i] = NULL ~ relaxed store
    return element

If you take as a prerequisite that the buffer will never become full, consider using this lock-free algorithm:

capacity must be a power of 2
buffer = new T[capacity] ~ on different cache line
mask = capacity - 1
write_index ~ on different cache line
read_index ~ on different cache line

enqueue:
    write_i = write_index.fetch_add(1) & mask
    buffer[write_i] = element ~ release store

dequeue:
    read_i = read_index.fetch_add(1) & mask
    element
    while ((element = buffer[read_i] ~ acquire load) == NULL) {
        spin loop
    }
    buffer[read_i] = NULL ~ relaxed store
    return element
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文