线程安全无锁互 ByteArray 队列

发布于 2024-08-28 12:50:35 字数 321 浏览 8 评论 0 原文

应该传输字节流,并且有一个生产者线程和一个消费者线程。 大多数时候,生产者的速度高于消费者,并且我需要足够的缓冲数据来保证应用程序的 QoS。 我读到了我的问题,并且有诸如共享缓冲区、PipeStream .NET 类之类的解决方案... 此类将在服务器上实例化多次,因此我需要优化的解决方案。 使用 ByteArray 队列是个好主意吗?

如果是,我将使用优化算法来猜测队列大小和每个 ByteArray 容量,理论上它适合我的情况。

如果不是,我最好的方法是什么?

请告诉我是否有 C# 或 VB 中 ByteArray 队列的良好无锁线程安全实现。

提前致谢

A byte stream should be transferred and there is one producer thread and a consumer one.
Speed of producer is higher than consumer most of the time, and I need enough buffered data for QoS of my application.
I read about my problem and there are solutions like shared buffer, PipeStream .NET class ...
This class is going to be instantiated many times on server so I need and optimized solution.
Is it good idea to use a Queue of ByteArray ?

If yes, I'll use an optimization algorithm to guess the Queue size and each ByteArray capacity and theoretically it fits my case.

If no, I what's the best approach ?

Please let me know if there's a good lock free thread safe implementation of ByteArray Queue in C# or VB.

Thanks in advance

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(6

_蜘蛛 2024-09-04 12:50:35

如果您不是逐字节地生成和消耗字节,而是按块工作,您可能会获得更多的加速。在这种情况下,代码的“无锁性”可能根本不重要——事实上,传统的锁定解决方案可能更可取。我会尝试演示一下。

C# 中给出了无锁、单个 生产者、单个 消费者、有界 队列。 (清单 A)
没有深奥的互锁操作,甚至没有显式的内存屏障。可以这么说,乍一看它是尽可能快且无锁的。不是吗?
现在让我们将其与 Marc Gravell 给出的锁定解决方案进行比较< /a>.

我们将使用双 CPU 机器,核心之间没有共享的 L3 缓存。
我们预计最多 2 倍加速。 2 倍加速确实意味着无锁解决方案在理论范围内表现理想。
为了为无锁代码创造一个理想的环境,我们甚至可以使用 此处
测试的结果代码在(清单 B)中。

它正在生产约。一个线程上 10MBytes,而另一线程上则消耗它。
队列大小固定为 32KBytes。如果已满,生产者就会等待。
在我的机器上典型的测试运行如下所示:

LockFreeByteQueue:799ms
字节队列:1843ms

无锁队列更快。哇,速度快了 2 倍多!这是值得夸耀的事情。 :)
让我们看看发生了什么。
Marc 的锁定队列就是这样做的。它锁住了。它对每个字节都执行此操作。

我们真的需要锁定每个字节并逐字节推送数据吗?它肯定会以块的形式到达网络(例如一些大约 1k 的数据包)。即使它确实从内部源逐字节到达,生产者也可以轻松地将其打包成漂亮的块。
让我们这样做 - 不要逐字节地生成和消费,而是分块工作并向微基准添加另外两个测试(清单 C,只需将其插入基准主体中)。
现在,典型的运行如下所示:

LockFreePageQueue:33ms
页面队列:25ms

现在,它们实际上都比原始无锁代码快 20 倍 - Marc 的添加分块的解决方案实际上比带有分块的无锁代码更快
我们没有采用会导致 2 倍加速的无锁结构,而是尝试了另一种解决方案,该解决方案与锁定配合得很好,并导致了 20 倍(!)的加速。
许多问题的关键并不在于避免锁定,而在于避免共享和最小化锁定。在上述情况下,我们可以在字节复制期间避免共享。
我们可以在大部分时间处理私有结构,然后将单个指针排入队列,从而将共享空间和时间缩小到将单个指针插入队列中。

清单 A,无锁、单生产者、单消费者队列:

public class BoundedSingleProducerSingleConsumerQueue<T>
{
    T[] queue;
    volatile int tail;
    volatile int head;

    public BoundedSingleProducerSingleConsumerQueue(int capacity)
    {
        queue = new T[capacity + 1];
        tail = head = 0;
    }

    public bool TryEnqueue(T item)
    {
        int newtail = (tail + 1) % queue.Length;
        if (newtail == head) return false;
        queue[tail] = item;
        tail = newtail;
        return true;
    }

    public bool TryDequeue(out T item)
    {
        item = default(T);
        if (head == tail) return false;
        item = queue[head];
        queue[head] = default(T);
        head = (head + 1) % queue.Length;
        return true;
    }
}

清单 B,微基准:

class Program
{
    static void Main(string[] args)
    {
        for (int numtrials = 3; numtrials > 0; --numtrials)
        {
            using (ProcessorAffinity.BeginAffinity(0))
            {
                int pagesize = 1024 * 10;
                int numpages = 1024;
                int totalbytes = pagesize * numpages;

                BoundedSingleProducerSingleConsumerQueue<byte> lockFreeByteQueue = new BoundedSingleProducerSingleConsumerQueue<byte>(1024 * 32);
                Stopwatch sw = new Stopwatch();
                sw.Start();
                ThreadPool.QueueUserWorkItem(delegate(object state)
                {
                    using (ProcessorAffinity.BeginAffinity(1))
                    {
                        for (int i = 0; i < totalbytes; i++)
                        {
                            while (!lockFreeByteQueue.TryEnqueue((byte)(i & 0xFF))) ;
                        }
                    }
                });
                for (int i = 0; i < totalbytes; i++)
                {
                    byte tmp;
                    while (!lockFreeByteQueue.TryDequeue(out tmp)) ;
                }
                sw.Stop();
                Console.WriteLine("LockFreeByteQueue: {0}ms", sw.ElapsedMilliseconds);


                SizeQueue<byte> byteQueue = new SizeQueue<byte>(1024 * 32);
                sw.Reset();
                sw.Start();
                ThreadPool.QueueUserWorkItem(delegate(object state)
                {
                    using (ProcessorAffinity.BeginAffinity(1))
                    {
                        for (int i = 0; i < totalbytes; i++)
                        {
                            byteQueue.Enqueue((byte)(i & 0xFF));
                        }
                    }
                });

                for (int i = 0; i < totalbytes; i++)
                {
                    byte tmp = byteQueue.Dequeue();
                }
                sw.Stop();
                Console.WriteLine("ByteQueue: {0}ms", sw.ElapsedMilliseconds);

                Console.ReadKey();
            }
        }
    }
}

清单 C,分块测试:

BoundedSingleProducerSingleConsumerQueue<byte[]> lockfreePageQueue = new BoundedSingleProducerSingleConsumerQueue<byte[]>(32);
sw.Reset();
sw.Start();
ThreadPool.QueueUserWorkItem(delegate(object state)
{
    using (ProcessorAffinity.BeginAffinity(1))
    {
        for (int i = 0; i < numpages; i++)
        {
            byte[] page = new byte[pagesize];
            for (int j = 0; j < pagesize; j++)
            {
                page[j] = (byte)(i & 0xFF);
            }
            while (!lockfreePageQueue.TryEnqueue(page)) ;
        }
    }
});
for (int i = 0; i < numpages; i++)
{
    byte[] page;
    while (!lockfreePageQueue.TryDequeue(out page)) ;
    for (int j = 0; j < pagesize; j++)
    {
        byte tmp = page[j];
    }
}
sw.Stop();
Console.WriteLine("LockFreePageQueue: {0}ms", sw.ElapsedMilliseconds);

SizeQueue<byte[]> pageQueue = new SizeQueue<byte[]>(32);

ThreadPool.QueueUserWorkItem(delegate(object state)
{
    using (ProcessorAffinity.BeginAffinity(1))
    {
        for (int i = 0; i < numpages; i++)
        {
            byte[] page = new byte[pagesize];
            for (int j = 0; j < pagesize; j++)
            {
                page[j] = (byte)(i & 0xFF);
            }
            pageQueue.Enqueue(page);
        }
    }
});
sw.Reset();
sw.Start();
for (int i = 0; i < numpages; i++)
{
    byte[] page = pageQueue.Dequeue();
    for (int j = 0; j < pagesize; j++)
    {
        byte tmp = page[j];
    }
}
sw.Stop();
Console.WriteLine("PageQueue: {0}ms", sw.ElapsedMilliseconds);

You will probably gain much more speedup if instead of producing and consuming byte-by-byte, you work in chunks. In that case, the "lock-freeness" of the code would probably not matter at all - in fact, the traditional, locking solution might be preferable. I'll try to demonstrate.

A lock-free, single producer, single consumer, bounded queue is given in C#. (Listing A)
There are no esoteric interlocked operations, even no explicit memory barriers. Let's say that, at first glance it is as fast and as lock-free as it gets. Isn't it?
Now let's compare it with a locking solution that Marc Gravell has given, here.

We will use a dual CPU machine that has no shared L3 cache between the cores.
We expect at most 2x speedup. A 2x speedup would indeed mean that the lock-free solution performs ideally, at theoretic bounds.
In order to make an ideal environment for the lock-free code, we will even set the CPU affinity of the producer and the consumer thread, by using the utility class from here.
The resulting code of the test is in (Listing B).

It is producing ca. 10MBytes on one thread while consuming it on another.
The queue size is fixed at 32KBytes. If it is full, the producer waits.
A typical run of the test on my machine looks like this:

LockFreeByteQueue: 799ms
ByteQueue: 1843ms

The lock-free queue is faster. Wow, it is more than 2x as fast! That is something to brag about. :)
Let's look at what is happening.
Marc's locking queue does just that. It locks. It does this for every byte.

Do we really need to lock for every byte and push the data byte by byte? It most assuredly arrives in chunks on the network (like some ca. 1k packets). Even if it really arrives byte by byte from an internal source, the producer could easily package it into nice chunks.
Let's just do that - instead of producing and consuming byte-by-byte, let's work in chunks and add two other tests to the micro-benchmark (Listing C, just insert it into the benchmark body).
Now a typical run looks like this:

LockFreePageQueue: 33ms
PageQueue: 25ms

Now, both of them are actually 20x faster than the original lock-free code - Marc's solution with the added chunking is actually faster now than the lock-free code with chunking!
Instead of going with a lock-free structure that would result in a 2x speedup, we attempted an another solution that works just fine with locking and resulted in a 20x(!) speedup.
The key to many problems is not so much avoiding locking - it is much more about avoiding sharing and minimizing locking. In the above case, we can avoid sharing for the duration of byte-copying.
We can work on a private structure for most of the time and then enqueue a single pointer, thereby shrinking shared space and time to a single insertion of a single pointer into a queue.

Listing A, a lock-free, single producer, single consumer queue:

public class BoundedSingleProducerSingleConsumerQueue<T>
{
    T[] queue;
    volatile int tail;
    volatile int head;

    public BoundedSingleProducerSingleConsumerQueue(int capacity)
    {
        queue = new T[capacity + 1];
        tail = head = 0;
    }

    public bool TryEnqueue(T item)
    {
        int newtail = (tail + 1) % queue.Length;
        if (newtail == head) return false;
        queue[tail] = item;
        tail = newtail;
        return true;
    }

    public bool TryDequeue(out T item)
    {
        item = default(T);
        if (head == tail) return false;
        item = queue[head];
        queue[head] = default(T);
        head = (head + 1) % queue.Length;
        return true;
    }
}

Listing B, a micro-benchmark:

class Program
{
    static void Main(string[] args)
    {
        for (int numtrials = 3; numtrials > 0; --numtrials)
        {
            using (ProcessorAffinity.BeginAffinity(0))
            {
                int pagesize = 1024 * 10;
                int numpages = 1024;
                int totalbytes = pagesize * numpages;

                BoundedSingleProducerSingleConsumerQueue<byte> lockFreeByteQueue = new BoundedSingleProducerSingleConsumerQueue<byte>(1024 * 32);
                Stopwatch sw = new Stopwatch();
                sw.Start();
                ThreadPool.QueueUserWorkItem(delegate(object state)
                {
                    using (ProcessorAffinity.BeginAffinity(1))
                    {
                        for (int i = 0; i < totalbytes; i++)
                        {
                            while (!lockFreeByteQueue.TryEnqueue((byte)(i & 0xFF))) ;
                        }
                    }
                });
                for (int i = 0; i < totalbytes; i++)
                {
                    byte tmp;
                    while (!lockFreeByteQueue.TryDequeue(out tmp)) ;
                }
                sw.Stop();
                Console.WriteLine("LockFreeByteQueue: {0}ms", sw.ElapsedMilliseconds);


                SizeQueue<byte> byteQueue = new SizeQueue<byte>(1024 * 32);
                sw.Reset();
                sw.Start();
                ThreadPool.QueueUserWorkItem(delegate(object state)
                {
                    using (ProcessorAffinity.BeginAffinity(1))
                    {
                        for (int i = 0; i < totalbytes; i++)
                        {
                            byteQueue.Enqueue((byte)(i & 0xFF));
                        }
                    }
                });

                for (int i = 0; i < totalbytes; i++)
                {
                    byte tmp = byteQueue.Dequeue();
                }
                sw.Stop();
                Console.WriteLine("ByteQueue: {0}ms", sw.ElapsedMilliseconds);

                Console.ReadKey();
            }
        }
    }
}

Listing C, chunked tests:

BoundedSingleProducerSingleConsumerQueue<byte[]> lockfreePageQueue = new BoundedSingleProducerSingleConsumerQueue<byte[]>(32);
sw.Reset();
sw.Start();
ThreadPool.QueueUserWorkItem(delegate(object state)
{
    using (ProcessorAffinity.BeginAffinity(1))
    {
        for (int i = 0; i < numpages; i++)
        {
            byte[] page = new byte[pagesize];
            for (int j = 0; j < pagesize; j++)
            {
                page[j] = (byte)(i & 0xFF);
            }
            while (!lockfreePageQueue.TryEnqueue(page)) ;
        }
    }
});
for (int i = 0; i < numpages; i++)
{
    byte[] page;
    while (!lockfreePageQueue.TryDequeue(out page)) ;
    for (int j = 0; j < pagesize; j++)
    {
        byte tmp = page[j];
    }
}
sw.Stop();
Console.WriteLine("LockFreePageQueue: {0}ms", sw.ElapsedMilliseconds);

SizeQueue<byte[]> pageQueue = new SizeQueue<byte[]>(32);

ThreadPool.QueueUserWorkItem(delegate(object state)
{
    using (ProcessorAffinity.BeginAffinity(1))
    {
        for (int i = 0; i < numpages; i++)
        {
            byte[] page = new byte[pagesize];
            for (int j = 0; j < pagesize; j++)
            {
                page[j] = (byte)(i & 0xFF);
            }
            pageQueue.Enqueue(page);
        }
    }
});
sw.Reset();
sw.Start();
for (int i = 0; i < numpages; i++)
{
    byte[] page = pageQueue.Dequeue();
    for (int j = 0; j < pagesize; j++)
    {
        byte tmp = page[j];
    }
}
sw.Stop();
Console.WriteLine("PageQueue: {0}ms", sw.ElapsedMilliseconds);
々眼睛长脚气 2024-09-04 12:50:35

在 .NET 4 中,有 System.Collections.Concurrent.Queue ,它与这些东西一样是无锁的(同时仍然是通用的)。

In .NET 4 there is System.Collections.Concurrent.Queue<T> which is as lock free as these things can be (while still being general).

朮生 2024-09-04 12:50:35

Dobbs 博士用 C++ 实现了一个无锁队列,您可以相对轻松地采用 C# 。当只有一个生产者(可以有任意数量的消费者)时,它就可以工作。

基本思想是使用双向链表作为底层结构以及可移动的头和尾引用。当生成一个项目时,它会被添加到末尾,并且列表开头和当前“头”之间的所有内容都会被删除。吃东西时,尝试抬起头;如果它击中了尾部,则失败,如果没有击中,则成功并返回新元素。特定的操作顺序使其本质上是线程安全的。

然而,在这里使用这种“无锁”设计有两个主要问题:

  1. 没有办法强制队列大小的上限,如果你的生产者比你的消费者更快,这可能是一个严重的问题;

  2. 按照设计,如果没有生成任何内容,Consume 方法必定无法检索元素。这意味着您需要为消费者实现您自己的锁定,并且这种锁定总是忙等待(这比性能范围内的锁定差得多)或定时等待(这会进一步减慢消费者的速度)。

出于这些原因,我建议您认真考虑是否真的需要无锁结构。很多人来到这个网站时认为它会比使用锁定的等效结构“更快”,但对于大多数应用程序来说,实际差异可以忽略不计,因此通常不值得增加复杂性,并且在某些情况下它实际上可以执行更差,因为等待状态(或可警报等待)比忙等待便宜得多。

多核机器和内存屏障的需要使得有效的无锁线程变得更加复杂;在正常操作下,您仍然可能会出现乱序执行,而在 .NET 中,抖动可以进一步决定重新排序指令,因此您可能需要在代码中添加 易失性 变量和 Thread.MemoryBarrier 调用,这可能再次导致无锁版本比基本同步版本成本更高。

首先使用普通的旧同步生产者-消费者队列,然后分析您的应用程序以确定它是否可以满足您的性能要求怎么样? Joseph Albahari 的网站 提供了一个出色、高效的 PC 队列实现。或者,正如 Richard 提到的,如果您使用 .NET 4.0 框架,那么您可以简单地使用 ConcurrentQueue 或更可能 阻止集合

首先测试 - 负载测试同步队列,这很容易实现 - 并观察锁定实际花费了多少时间。不是等待,无论如何你都必须这样做,而是在锁发出信号后实际获取释放锁。如果它超过你程序执行时间的 1%,我会感到非常惊讶;但如果是这样,那么那么开始考虑无锁实现 - 并确保您也对它们进行分析,以确保它们实际上性能更好。

Dr. Dobbs implemented a lock-free queue in C++, which you could relatively easily adopt to C#. It works when there is exactly one producer (there can be any number of consumers).

The basic idea is to use a doubly-linked list as the underlying structure along with a movable head and tail reference. When an item is produced, it gets added to the end, and everything between the beginning of the list and the current "head" is removed. To consume, attempt to move the head up; if it hits the tail, fail, if it doesn't, succeed and return the new element. The particular order of operations makes it inherently thread-safe.

However, there are two major problems with using such a "lock-free" design here:

  1. There is no way to enforce an upper bound to the queue size, which might be a serious problem if your producer is faster than your consumer;

  2. By design, the Consume method must simply fail to retrieve an element if nothing has been produced. That means you need to implement your own locking for the consumer, and such locking is invariably either busy-waiting (which is much worse than locking in the performance spectrum) or timed waits (which slows down your consumer even further).

For these reasons, I'd recommend that you seriously consider whether or not you really need a lock-free structure. A lot of people come to this site thinking that it's going to be "faster" than an equivalent structure using locking, but the practical difference for most applications is so negligible that it's normally not worth the added complexity, and in some cases it can actually perform worse, because wait states (or alertable waits) are much cheaper than busy-waiting.

Multicore machines and the need for memory barriers make effective lock-free threading even more complicated; under normal operation you can still get out-of-order execution, and in .NET the jitter can further decide to reorder instructions, so you'd probably need to pepper the code with volatile variables and Thread.MemoryBarrier calls, which again might contribute toward making the lock-free version costlier than the basic synchronized version.

How about using a plain old synchronized producer-consumer queue first, and profiling your application to determine whether or not it can meet your performance requirements? There's a great, efficient P-C queue implementation over at Joseph Albahari's site. Or, as Richard mentions, if you are using the .NET 4.0 framework then you can simply use ConcurrentQueue or more likely BlockingCollection.

Test first - load test the synchronized queue, which is easy to implement - and watch how much time is actually spent locking. Not waiting, which you'd have to do anyway, but on actually acquiring and releasing the locks after they become signaled. If it's more than 1% of your program's execution time, I would be very surprised; but if so, then start looking at lock-free implementations - and make sure you profile those too, to make sure that they're actually performing better.

悲喜皆因你 2024-09-04 12:50:35

节流在这里很重要,从它的声音来看,此 中的 BoundedBuffer 类杂志文章符合要求。 .NET 4.0 中将提供类似的类,如 BlockingCollection 类。调整缓冲区大小仍然取决于您。

Throttling is important here, by the sound of it, the BoundedBuffer class in this magazine article fits the bill. A similar class will be available in .NET 4.0 as the BlockingCollection class. Tuning the buffer size is still up to you.

深海不蓝 2024-09-04 12:50:35

Julian M Bucknall 用 C# 编写了一个

Julian M Bucknall has written one in C#.

音盲 2024-09-04 12:50:35

最重要的部分是共享对象的设计。在我的场景中,读取器和写入器可以独立使用单独的缓冲区(大数据块),然后只应同步访问共享 FIFO 对象(如队列)。这样可以最大限度地减少锁定时间,并且线程可以并行完成作业。通过.NETframewok 4.0实现这个概念变得很容易:

System.Collections.Concurrent 命名空间中有一个 ConcurrentQueue(Of T) 类,而 arrayByte 是在我的场景中用作队列类型的好类型。命名空间中还有其他线程安全集合。

http://msdn.microsoft.com/en-us/库/system.collections.concurrent.aspx

Most important part is the design of the shared object. In my scenario reader and writer can use separate buffers (big data chunks) independently and then, only accessing a shared FIFO object like a queue should be synchronized. This way lock time is minimized and threads can complete the job in parallel. And with .NET framewok 4.0 implementation of this concept made easy :

There's a ConcurrentQueue(Of T) Class in System.Collections.Concurrent namespace and arrayByte is a good type to use as queue type for my scenario. There are other thread-safe collections in the namespace.

http://msdn.microsoft.com/en-us/library/system.collections.concurrent.aspx

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文