多生产者多消费者无锁(甚至无等待)队列

发布于 2024-11-08 23:07:45 字数 99 浏览 6 评论 0原文

我正在寻找有关如何将 MP/MC 队列编写为无锁甚至无等待的文档。我正在使用.Net 4.0。发现了很多C++代码,但我对内存模型不是很熟悉,所以移植到C#时很有可能会引入一些bug。

I'm searching for documentation on how to write MP/MC queue to be lock-free or even wait-free. I'm using .Net 4.0. Found a lot of C++ code, but I'm not very familiar with memory models, so there is a big chance I will introduce some bugs while porting to C#.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

把时间冻结 2024-11-15 23:07:45

作为一个可供考虑的选项,有一种有界算法多生产者多消费者队列,作者:Dmitry Vyukov。我已将该算法移植到 .NET,您可以在 github 上找到源代码。速度非常快。

入队算法:

public bool TryEnqueue(object item)
{
    do
    {
        var buffer = _buffer; // prefetch the buffer pointer
        var pos = _enqueuePos; // fetch the current position where to enqueue the item
        var index = pos & _bufferMask; // precalculate the index in the buffer for that position
        var cell = buffer[index]; // fetch the cell by the index
        // If its sequence wasn't touched by other producers
        // and we can increment the enqueue position
        if (cell.Sequence == pos && Interlocked.CompareExchange(ref _enqueuePos, pos + 1, pos) == pos)
        {
            // write the item we want to enqueue
            Volatile.Write(ref buffer[index].Element, item);
            // bump the sequence
            buffer[index].Sequence = pos + 1;
            return true;
        }

        // If the queue is full we cannot enqueue and just return false
        if (cell.Sequence < pos)
        {
            return false;
        }

        // repeat the process if other producer managed to enqueue before us
    } while (true);
}

出队算法:

public bool TryDequeue(out object result)
{
    do
    {
        var buffer = _buffer; // prefetch the buffer pointer
        var bufferMask = _bufferMask; // prefetch the buffer mask
        var pos = _dequeuePos; // fetch the current position from where we can dequeue an item
        var index = pos & bufferMask; // precalculate the index in the buffer for that position
        var cell = buffer[index]; // fetch the cell by the index
        // If its sequence was changed by a producer and wasn't changed by other consumers
        // and we can increment the dequeue position
        if (cell.Sequence == pos + 1 && Interlocked.CompareExchange(ref _dequeuePos, pos + 1, pos) == pos)
        {
            // read the item
            result = Volatile.Read(ref cell.Element);
            // update for the next round of the buffer
            buffer[index] = new Cell(pos + bufferMask + 1, null);
            return true;
        }

        // If the queue is empty return false
        if (cell.Sequence < pos + 1)
        {
            result = default(object);
            return false;
        }

        // repeat the process if other consumer managed to dequeue before us
    } while (true);
}

As an option to consider, there's an algorithm of the bounded Multiple Producer Multiple Consumer queue by Dmitry Vyukov. I've ported the algorithm to .NET, you can find the sources on github. It's very fast.

The enqueue algorithm:

public bool TryEnqueue(object item)
{
    do
    {
        var buffer = _buffer; // prefetch the buffer pointer
        var pos = _enqueuePos; // fetch the current position where to enqueue the item
        var index = pos & _bufferMask; // precalculate the index in the buffer for that position
        var cell = buffer[index]; // fetch the cell by the index
        // If its sequence wasn't touched by other producers
        // and we can increment the enqueue position
        if (cell.Sequence == pos && Interlocked.CompareExchange(ref _enqueuePos, pos + 1, pos) == pos)
        {
            // write the item we want to enqueue
            Volatile.Write(ref buffer[index].Element, item);
            // bump the sequence
            buffer[index].Sequence = pos + 1;
            return true;
        }

        // If the queue is full we cannot enqueue and just return false
        if (cell.Sequence < pos)
        {
            return false;
        }

        // repeat the process if other producer managed to enqueue before us
    } while (true);
}

The dequeue algorithm:

public bool TryDequeue(out object result)
{
    do
    {
        var buffer = _buffer; // prefetch the buffer pointer
        var bufferMask = _bufferMask; // prefetch the buffer mask
        var pos = _dequeuePos; // fetch the current position from where we can dequeue an item
        var index = pos & bufferMask; // precalculate the index in the buffer for that position
        var cell = buffer[index]; // fetch the cell by the index
        // If its sequence was changed by a producer and wasn't changed by other consumers
        // and we can increment the dequeue position
        if (cell.Sequence == pos + 1 && Interlocked.CompareExchange(ref _dequeuePos, pos + 1, pos) == pos)
        {
            // read the item
            result = Volatile.Read(ref cell.Element);
            // update for the next round of the buffer
            buffer[index] = new Cell(pos + bufferMask + 1, null);
            return true;
        }

        // If the queue is empty return false
        if (cell.Sequence < pos + 1)
        {
            result = default(object);
            return false;
        }

        // repeat the process if other consumer managed to dequeue before us
    } while (true);
}
傲影 2024-11-15 23:07:45

为什么你认为你需要无锁队列?您是否尝试过使用 ConcurrentQueue,可能包含在 BlockingCollection

编写多线程代码很困难。编写无锁代码更加困难,除非确实必要,否则您不应该自己编写。

Why do you think you need lock-free queue? Have you tried using ConcurrentQueue<T>, possibly enclosed within a BlockingCollection<T>?

Writing multi-threaded code is hard. Writing lock-free code is even harder and you shouldn't do it yourself unless you really have to.

木緿 2024-11-15 23:07:45

我的第一个选择是使用 ConcurrentQueue,但您可以将数据存储抽象到接口后面,以便可以轻松更改实现。然后对典型场景进行基准测试,看看在哪里遇到问题。请记住:过早的优化是万恶之源。设计您的系统,使其不依赖于实现,而是依赖于合同,然后您可以根据需要优化您的实现。

我用 ILSpy 查看了 ConcurrentQueue ,乍一看似乎是一个无锁实现 - 很有可能它正是您正在寻找的东西。

My first go would be with ConcurrentQueue<T> but you can abstract your data store away behind an interface so you can easily change implementations. Then benchmark typical scenarios and see where you run into problems. Remember: Premature optimzation is the root of all evil. Design your system so it's not tied to an implementation but to a contract and then you can optimize your implementations all you want.

I had a look at ConcurrentQueue<T> with ILSpy and seems to be a lock free implementation at first glance - so good chance it's exactly what your are looking for.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文