BlockingCollection(T) 性能

发布于 2024-09-05 19:22:18 字数 1866 浏览 0 评论 0原文

在我的公司有一段时间,我们使用了一个自制的 ObjectPool 实现,它提供了对其内容的阻止访问。它非常简单:一个Queue、一个要锁定的对象,以及一个AutoResetEvent,用于在发生“借用”线程时向“借用”线程发出信号。项目已添加。

该类的核心实际上是这两个方法:

public T Borrow() {
    lock (_queueLock) {
        if (_queue.Count > 0)
            return _queue.Dequeue();
    }

    _objectAvailableEvent.WaitOne();

    return Borrow();
}

public void Return(T obj) {
    lock (_queueLock) {
        _queue.Enqueue(obj);
    }

    _objectAvailableEvent.Set();
}

我们一直在使用这个方法和其他一些集合类,而不是 System.Collections.Concurrent 提供的那些方法,因为我们使用的是 .NET 3.5,而不是 4.0。但最近我们发现,由于我们使用响应式扩展,我们实际上< em>确实有可供我们使用的Concurrent命名空间(在System.Threading.dll中)。

当然,我认为自从 BlockingCollection 是 Concurrent 命名空间中的核心类之一,它可能会提供比我或我的队友编写的任何内容更好的性能。

因此,我尝试编写一个工作非常简单的新实现:

public T Borrow() {
    return _blockingCollection.Take();
}

public void Return(T obj) {
    _blockingCollection.Add(obj);
}

令我惊讶的是,根据一些简单的测试(从多个线程借用/返回池几千次),我们最初的实现显着优于BlockingCollectionT>在性能方面。它们似乎都可以正常工作;只是我们原来的实现似乎要快得多。

我的问题:

  1. 为什么会这样?可能是因为 BlockingCollection 提供了更大的灵活性(我理解它是通过包装 IProducerConsumerCollection< T>),这必然会带来性能开销吗?
  2. 这是否只是对 BlockingCollection 类的完全误导性使用?
  3. 如果这是 BlockingCollection 的正确使用,我是否使用不当?例如,Take/Add 方法是否过于简单,是否有一种性能更好的方法来获得相同的功能?

除非有人对第三个问题有一些见解,否则我们现在似乎将坚持原来的实现。

For a while at my company we've used a home-grown ObjectPool<T> implementation that provides blocking access to its contents. It's pretty straightforward: a Queue<T>, an object to lock on, and an AutoResetEvent to signal to a "borrowing" thread when an item is added.

The meat of the class is really these two methods:

public T Borrow() {
    lock (_queueLock) {
        if (_queue.Count > 0)
            return _queue.Dequeue();
    }

    _objectAvailableEvent.WaitOne();

    return Borrow();
}

public void Return(T obj) {
    lock (_queueLock) {
        _queue.Enqueue(obj);
    }

    _objectAvailableEvent.Set();
}

We have been using this and a few other collection classes instead of those provided by System.Collections.Concurrent because we are using .NET 3.5, not 4.0. But recently we discovered that since we are using Reactive Extensions, we actually do have the Concurrent namespace available to us (in System.Threading.dll).

Naturally, I figured that since BlockingCollection<T> is one of the core classes in the Concurrent namespace, it would probably offer better performance than anything I or my teammates wrote.

So I tried writing a new implementation that works very simply:

public T Borrow() {
    return _blockingCollection.Take();
}

public void Return(T obj) {
    _blockingCollection.Add(obj);
}

To my surprise, according to some simple tests (borrowing/returning to the pool a few thousand times from multiple threads), our original implementation significantly beats BlockingCollection<T> in terms of performance. They both appear to work correctly; it's just that our original implementation seems to be much faster.

My question:

  1. Why would this be? Is it perhaps because BlockingCollection<T> offers greater flexibility (I understand it works by wrapping an IProducerConsumerCollection<T>), which necessarily introduces performance overhead?
  2. Is this just a flat-out misguided use of the BlockingCollection<T> class?
  3. If this is an appropriate use of BlockingCollection<T>, am I just not using properly? For example, is the Take/Add approach overly simplistic, and there's a far better-performing way to get the same functionality?

Unless anyone has some insight to offer in response to that third question, it looks like we'll be sticking with our original implementation for now.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

预谋 2024-09-12 19:22:18

这里有一些潜在的可能性。

首先,响应式扩展中的 BlockingCollection 是向后移植的,与 .NET 4 最终版本并不完全相同。如果此向后移植的性能与 .NET 4 RTM 不同,我不会感到惊讶(尽管我没有具体介绍此集合)。大多数 TPL 在 .NET 4 中的性能比在 .NET 3.5 向后移植中的性能更好。

话虽如此,如果您有一个生产者线程和一个消费者线程,我怀疑您的实现将优于 BlockingCollection。对于一个生产者和一个消费者,你的锁对总体性能的影响会更小,并且重置事件是消费者端等待的一种非常有效的手段。

然而,BlockingCollection 的设计目的是允许许多生产者线程很好地“排队”数据。这对于您的实现来说效果不佳,因为锁定争用很快就会开始出现问题。

话虽这么说,我还想在这里指出一个误解:

...它可能会提供比我或我的队友编写的任何内容更好的性能。

这通常是不正确的。框架集合类通常执行非常好,但对于给定场景通常不是性能最佳的选项。话虽如此,它们往往表现良好,同时非常灵活和强大。它们往往能够很好地扩展。 “自制”集合类在特定场景中通常优于框架集合,但在专门设计的场景之外使用时往往会出现问题。我怀疑这是其中一种情况。

There are a couple of potential possibilities, here.

First, BlockingCollection<T> in the Reactive Extensions is a backport, and not exactly the same as the .NET 4 final version. I wouldn't be surprised if the performance of this backport differs from .NET 4 RTM (though I haven't profiled this collection, specifically). Much of the TPL performs better in .NET 4 than in the .NET 3.5 backport.

That being said, I'd suspect your implementation will out-perform BlockingCollection<T> if you have a single producer thread and a single consumer thread. With one producer and one consumer, your lock is going to have a smaller impact on the total performance, and the reset event is a very effective means of waiting on the consumer side.

However, BlockingCollection<T> is designed to allow many producer threads to "enqueue" data very well. This will not perform well with your implementation, as the locking contention will start to become problematic fairly quickly.

That being said, I'd also like to point out one misconception here:

...it would probably offer better performance than anything I or my teammates wrote.

This is often not true. The framework collection classes typically perform very well, but are often not the most performant option for a given scenario. That being said, they tend to perform well while being very flexible and very robust. They often tend to scale very well. "Home-written" collection classes often outperform framework collections in specific scenarios, but tend to be problematic when used in scenarios outside of the one for which they were specifically designed. I suspect this is one of those situations.

无边思念无边月 2024-09-12 19:22:18

我在 .Net 4 中尝试使用 BlockingCollection 来对抗 ConurrentQueue/AutoResetEvent 组合(类似于 OP 的解决方案,但无锁),而后一个组合如此对于我的用例来说,速度要快得多,所以我放弃了 BlockingCollection。不幸的是,这已经是一年前的事了,我找不到基准测试结果。

使用单独的 AutoResetEvent 不会使事情变得更加复杂。事实上,人们甚至可以将其一劳永逸地抽象为 BlockingCollectionSlim...

BlockingCollection 内部也依赖于 ConcurrentQueue,但是 使用细长信号量取消标记<进行了一些额外的处理/em>,它会产生额外的功能,但要付出一定的代价,即使不使用也是如此。还应该注意的是,BlockingCollection 并未与 ConcurrentQueue 联姻,但也可以与 IProducerConsumerCollection 的其他实现者一起使用。


一个无限制的、非常简单的 BlockingCollectionSlim 实现:

class BlockingCollectionSlim<T>
{
    private readonly ConcurrentQueue<T> _queue = new ConcurrentQueue<T>();
    private readonly AutoResetEvent _autoResetEvent = new AutoResetEvent(false);
    public void Add(T item)
    {
        _queue.Enqueue(item);
        _autoResetEvent.Set();
    }
    public bool TryPeek(out T result)
    {
        return _queue.TryPeek(out result);
    }
    public T Take()
    {
        T item;
        while (!_queue.TryDequeue(out item))
            _autoResetEvent.WaitOne();
        return item;
    }
    public bool TryTake(out T item, TimeSpan patience)
    {
        if (_queue.TryDequeue(out item))
            return true;
        var stopwatch = Stopwatch.StartNew();
        while (stopwatch.Elapsed < patience)
        {
            if (_queue.TryDequeue(out item))
                return true;
            var patienceLeft = (patience - stopwatch.Elapsed);
            if (patienceLeft <= TimeSpan.Zero)
                break;
            else if (patienceLeft < MinWait)
            // otherwise the while loop will degenerate into a busy loop,
            // for the last millisecond before patience runs out
                patienceLeft = MinWait;
            _autoResetEvent.WaitOne(patienceLeft);
        }
        return false;
    }
    private static readonly TimeSpan MinWait = TimeSpan.FromMilliseconds(1);

I tried BlockingCollection against a ConurrentQueue/AutoResetEvent combo (similar to OP's solution, but lockless) in .Net 4, and the latter combo was so much faster for my use case, that I ditched BlockingCollection. Unfortunately this was almost a year ago and I couldn't find the benchmark results.

Using a separate AutoResetEvent doesn't make things too much more complicated. In fact, one could even abstract it away, once and for all, into a BlockingCollectionSlim....

BlockingCollection internally relies on a ConcurrentQueue as well, but does some additional juggling with slim semaphores and cancellation tokens, which yields additional features, but at a cost, even when not used. It should also be noted that BlockingCollection is not married to ConcurrentQueue, but can be used with other implementors of IProducerConsumerCollection instead as well.


An unbounded, pretty bare bones BlockingCollectionSlim implementation:

class BlockingCollectionSlim<T>
{
    private readonly ConcurrentQueue<T> _queue = new ConcurrentQueue<T>();
    private readonly AutoResetEvent _autoResetEvent = new AutoResetEvent(false);
    public void Add(T item)
    {
        _queue.Enqueue(item);
        _autoResetEvent.Set();
    }
    public bool TryPeek(out T result)
    {
        return _queue.TryPeek(out result);
    }
    public T Take()
    {
        T item;
        while (!_queue.TryDequeue(out item))
            _autoResetEvent.WaitOne();
        return item;
    }
    public bool TryTake(out T item, TimeSpan patience)
    {
        if (_queue.TryDequeue(out item))
            return true;
        var stopwatch = Stopwatch.StartNew();
        while (stopwatch.Elapsed < patience)
        {
            if (_queue.TryDequeue(out item))
                return true;
            var patienceLeft = (patience - stopwatch.Elapsed);
            if (patienceLeft <= TimeSpan.Zero)
                break;
            else if (patienceLeft < MinWait)
            // otherwise the while loop will degenerate into a busy loop,
            // for the last millisecond before patience runs out
                patienceLeft = MinWait;
            _autoResetEvent.WaitOne(patienceLeft);
        }
        return false;
    }
    private static readonly TimeSpan MinWait = TimeSpan.FromMilliseconds(1);
染柒℉ 2024-09-12 19:22:18

我在 .Net 4.7.2 中遇到了与 BlockingCollection 相同的性能问题,并找到了这篇文章。我的案例是多个生产者-多个消费者,特别是从多个源读取小数据块,并且应该由多个过滤器处理。使用了多个 (Env.ProcessorCount) BlockingCollections,最终性能分析器告诉我 BlockingCollection.GetConsumingEnumerable.MoveNext() 占用的 CPU 时间比实际过滤更多!

谢谢,@Eugene别列索夫斯基,给你的代码。仅供参考:在我的环境中,它几乎比 BlockingCollection 慢两倍。因此,这是我的 SpinLocked BlockingCollection:

public class BlockingCollectionSpin<T>
{
    private SpinLock _lock = new SpinLock(false);
    private Queue<T> _queue = new Queue<T>();

    public void Add(T item)
    {
        bool gotLock = false;
        try
        {
            _lock.Enter(ref gotLock);
            _queue.Enqueue(item);
        }
        finally
        {
            if (gotLock) _lock.Exit(false);
        }
    }

    public bool TryPeek(out T result)
    {
        bool gotLock = false;
        try
        {
            _lock.Enter(ref gotLock);
            if (_queue.Count > 0)
            {
                result = _queue.Peek();
                return true;
            }
            else
            {
                result = default(T);
                return false;
            }
        }
        finally
        {
            if (gotLock) _lock.Exit(false);
        }
    }

    public T Take()
    {
        var spin = new SpinWait();
        do
        {
            bool gotLock = false;
            try
            {
                _lock.Enter(ref gotLock);
                if (_queue.Count > 0)
                    return _queue.Dequeue();
            }
            finally
            {
                if (gotLock) _lock.Exit(false);
            }
            spin.SpinOnce();
        } while (true);
    }
}

对于性能关键型代码,我建议避免 readonly 字段修饰符。它添加了对 IL 中每个字段访问的检查。使用以下测试代码

private static void TestBlockingCollections()
{
    const int workAmount = 10000000;
    var workerCount = Environment.ProcessorCount * 2;
    var sw = new Stopwatch();
    var source = new long[workAmount];
    var rnd = new Random();
    for (int i = 0; i < workAmount; i++)
        source[i] = rnd.Next(1000000);

    var swOverhead = 0.0;
    for (int i = 0; i < workAmount; i++)
    {
        sw.Restart();
        swOverhead += sw.Elapsed.TotalMilliseconds;
    }
    swOverhead /= workAmount;

    var sum1 = new long[workerCount];
    var queue1 = new BlockingCollection<long>(10000);
    var workers = Enumerable.Range(0, workerCount - 1).Select(n =>
    Task.Factory.StartNew(() =>
    {
        foreach (var l in queue1.GetConsumingEnumerable())
            sum1[n] += l;
    })).ToArray();

    Thread.Sleep(1000);

    sw.Restart();
    foreach (var l in source)
        queue1.Add(l);
    queue1.CompleteAdding();
    Task.WaitAll(workers);
    var elapsed = sw.Elapsed.TotalMilliseconds - swOverhead;
    Console.WriteLine("BlockingCollection {0:F4}ms", elapsed / workAmount);

    var sum2 = new long[workerCount];
    var queue2 = new BlockingCollectionSlim<long?>();
    workers = Enumerable.Range(0, workerCount - 1).Select(n =>
    Task.Factory.StartNew(() =>
    {
        long? l;
        while ((l = queue2.Take()).HasValue)
            sum2[n] += l.Value;
    })).ToArray();

    Thread.Sleep(1000);

    sw.Restart();
    foreach (var l in source)
        queue2.Add(l);
    for (int i = 0; i < workerCount; i++)
        queue2.Add(null);
    Task.WaitAll(workers);
    elapsed = sw.Elapsed.TotalMilliseconds - swOverhead;
    Console.WriteLine("BlockingCollectionSlim {0:F4}ms", elapsed / workAmount);

    var sum3 = new long[workerCount];
    var queue3 = new BlockingCollectionSpin<long?>();
    workers = Enumerable.Range(0, workerCount - 1).Select(n =>
    Task.Factory.StartNew(() =>
    {
        long? l;
        while ((l = queue3.Take()).HasValue)
            sum3[n] += l.Value;
    })).ToArray();

    Thread.Sleep(1000);

    sw.Restart();
    foreach (var l in source)
        queue3.Add(l);
    for (int i = 0; i < workerCount; i++)
        queue3.Add(null);
    Task.WaitAll(workers);
    elapsed = sw.Elapsed.TotalMilliseconds - swOverhead;
    Console.WriteLine("BlockingCollectionSpin {0:F4}ms", elapsed/workAmount);

    if (sum1.Sum() != sum2.Sum() || sum2.Sum() != sum3.Sum())
        Console.WriteLine("Wrong sum in the end!");

    Console.ReadLine();
}

在具有 2 个内核且启用 HT 的 Core i5-3210M 上,我得到以下输出:

BlockingCollection     0.0006ms
BlockingCollectionSlim 0.0010ms (Eugene Beresovsky implementation)
BlockingCollectionSpin 0.0003ms

因此,SpinLocked 版本比 .Net BlockingCollection 快两倍。但是,我建议只使用它!如果您真的更喜欢性能而不是代码简单性(和可维护性)。

I came across the same performance issues with BlockingCollection in .Net 4.7.2 and found this post. My case is MultipleProducers-MultipleConsumers, in particular small data chunks are read from many sources and should be processed by many filters. Several (Env.ProcessorCount) BlockingCollections were used and I ended up with a performance profiler telling me that BlockingCollection.GetConsumingEnumerable.MoveNext() eats more CPU time than actual filtering!

Thank you, @Eugene Beresovsky, for your code. FYI: On my environment it was almost twice slower than BlockingCollection. So, here is my SpinLocked BlockingCollection:

public class BlockingCollectionSpin<T>
{
    private SpinLock _lock = new SpinLock(false);
    private Queue<T> _queue = new Queue<T>();

    public void Add(T item)
    {
        bool gotLock = false;
        try
        {
            _lock.Enter(ref gotLock);
            _queue.Enqueue(item);
        }
        finally
        {
            if (gotLock) _lock.Exit(false);
        }
    }

    public bool TryPeek(out T result)
    {
        bool gotLock = false;
        try
        {
            _lock.Enter(ref gotLock);
            if (_queue.Count > 0)
            {
                result = _queue.Peek();
                return true;
            }
            else
            {
                result = default(T);
                return false;
            }
        }
        finally
        {
            if (gotLock) _lock.Exit(false);
        }
    }

    public T Take()
    {
        var spin = new SpinWait();
        do
        {
            bool gotLock = false;
            try
            {
                _lock.Enter(ref gotLock);
                if (_queue.Count > 0)
                    return _queue.Dequeue();
            }
            finally
            {
                if (gotLock) _lock.Exit(false);
            }
            spin.SpinOnce();
        } while (true);
    }
}

And for performance-critical code I would suggest to avoid readonly field modifier. It adds a check on every field access in the IL. With the following test code

private static void TestBlockingCollections()
{
    const int workAmount = 10000000;
    var workerCount = Environment.ProcessorCount * 2;
    var sw = new Stopwatch();
    var source = new long[workAmount];
    var rnd = new Random();
    for (int i = 0; i < workAmount; i++)
        source[i] = rnd.Next(1000000);

    var swOverhead = 0.0;
    for (int i = 0; i < workAmount; i++)
    {
        sw.Restart();
        swOverhead += sw.Elapsed.TotalMilliseconds;
    }
    swOverhead /= workAmount;

    var sum1 = new long[workerCount];
    var queue1 = new BlockingCollection<long>(10000);
    var workers = Enumerable.Range(0, workerCount - 1).Select(n =>
    Task.Factory.StartNew(() =>
    {
        foreach (var l in queue1.GetConsumingEnumerable())
            sum1[n] += l;
    })).ToArray();

    Thread.Sleep(1000);

    sw.Restart();
    foreach (var l in source)
        queue1.Add(l);
    queue1.CompleteAdding();
    Task.WaitAll(workers);
    var elapsed = sw.Elapsed.TotalMilliseconds - swOverhead;
    Console.WriteLine("BlockingCollection {0:F4}ms", elapsed / workAmount);

    var sum2 = new long[workerCount];
    var queue2 = new BlockingCollectionSlim<long?>();
    workers = Enumerable.Range(0, workerCount - 1).Select(n =>
    Task.Factory.StartNew(() =>
    {
        long? l;
        while ((l = queue2.Take()).HasValue)
            sum2[n] += l.Value;
    })).ToArray();

    Thread.Sleep(1000);

    sw.Restart();
    foreach (var l in source)
        queue2.Add(l);
    for (int i = 0; i < workerCount; i++)
        queue2.Add(null);
    Task.WaitAll(workers);
    elapsed = sw.Elapsed.TotalMilliseconds - swOverhead;
    Console.WriteLine("BlockingCollectionSlim {0:F4}ms", elapsed / workAmount);

    var sum3 = new long[workerCount];
    var queue3 = new BlockingCollectionSpin<long?>();
    workers = Enumerable.Range(0, workerCount - 1).Select(n =>
    Task.Factory.StartNew(() =>
    {
        long? l;
        while ((l = queue3.Take()).HasValue)
            sum3[n] += l.Value;
    })).ToArray();

    Thread.Sleep(1000);

    sw.Restart();
    foreach (var l in source)
        queue3.Add(l);
    for (int i = 0; i < workerCount; i++)
        queue3.Add(null);
    Task.WaitAll(workers);
    elapsed = sw.Elapsed.TotalMilliseconds - swOverhead;
    Console.WriteLine("BlockingCollectionSpin {0:F4}ms", elapsed/workAmount);

    if (sum1.Sum() != sum2.Sum() || sum2.Sum() != sum3.Sum())
        Console.WriteLine("Wrong sum in the end!");

    Console.ReadLine();
}

On Core i5-3210M with 2 cores and HT enabled I've got the following output:

BlockingCollection     0.0006ms
BlockingCollectionSlim 0.0010ms (Eugene Beresovsky implementation)
BlockingCollectionSpin 0.0003ms

So, SpinLocked version is twice faster than .Net BlockingCollection. But, I would suggest to use it only! if you really prefer performance against code simplicity (and maintainability).

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文