使用 BlockingCollection.GetConsumableEnumerable 的 Parallel.ForEach 循环

发布于 2024-11-19 00:29:10 字数 783 浏览 6 评论 0原文

为什么在使用 GetConsumableEnumerable 时,Parallel.ForEach 循环会因 OperationCancelledException 退出?

//outside the function
static BlockingCollection<double> _collection = new BlockingCollection<double>();
    
    
var t = Task.Factory.StartNew(Producer);            
Parallel.ForEach(_collection.GetConsumingEnumerable(),
    item => Console.WriteLine("Processed {0}", item));
Console.WriteLine("FINISHED processing");


public static void Producer()
{
     var data = Enumerable.Range(1, 1000);
     foreach (var i in data)
     {
        _collection.Add(i);
        Console.WriteLine("Added {0}",i);
     }
                    
     Console.WriteLine("Finished adding");
     _collection.CompleteAdding();
}

Why Parallel.ForEach loop exits with OperationCancelledException, while using GetConsumableEnumerable?

//outside the function
static BlockingCollection<double> _collection = new BlockingCollection<double>();
    
    
var t = Task.Factory.StartNew(Producer);            
Parallel.ForEach(_collection.GetConsumingEnumerable(),
    item => Console.WriteLine("Processed {0}", item));
Console.WriteLine("FINISHED processing");


public static void Producer()
{
     var data = Enumerable.Range(1, 1000);
     foreach (var i in data)
     {
        _collection.Add(i);
        Console.WriteLine("Added {0}",i);
     }
                    
     Console.WriteLine("Finished adding");
     _collection.CompleteAdding();
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

悲欢浪云 2024-11-26 00:29:10

正如我最近发现的那样,将 Parallel.ForEachBlockingCollection 一起使用有些问题。它可以工作,但需要一些额外的努力。

Stephen Toub 有一篇优秀的博客文章,如果您下载“并行扩展附加”项目(也可在 NuGet 上使用)您会发现一些代码可以为您提供帮助。

Using Parallel.ForEach with BlockingCollection is somewhat problematic, as I found out recently. It can be made to work, but it needs a little extra effort.

Stephen Toub has an excellent blog post on it, and if you download the "Parallel Extension Extras" project (also available on NuGet) you'll find some code ready to help you.

因为看清所以看轻 2024-11-26 00:29:10

使用 Parallel.ForEachBlockingCollection作为源,需要进行两项具体调整:

  1. 使用 EnumerablePartitionerOptions.NoBuffering 选项。
  2. 指定 MaxDegreeOfParallelism -1 以外的值。

正确用法示例:

Partitioner<Item> partitioner = Partitioner.Create(collection.GetConsumingEnumerable(),
    EnumerablePartitionerOptions.NoBuffering);

ParallelOptions options = new()
{
    MaxDegreeOfParallelism = Environment.ProcessorCount
};

Parallel.ForEach(partitioner, options, item =>
{
    //...
});

说明:

  1. EnumerablePartitionerOptions .NoBuffering 是必需的,否则 Parallel.ForEach 将不会立即处理每个消耗的项目。相反,它会将项目放入一个小缓冲区中,并等待缓冲区达到任意大小,然后再处理缓冲区中的所有项目。此行为会引入不希望的延迟,甚至可能在某些高级场景中导致死锁。
  2. 配置 MaxDegreeOfParallelism需要 才能保留 ThreadPool 使用情况受到控制。否则,Parallel.ForEach 将不断请求越来越多的线程,从而促使 ThreadPool 以每秒一个新线程的速度创建新线程,即使 >BlockingCollection 完全为空,并行循环空闲!有关这种奇怪行为的实验演示,请参阅

Using the Parallel.ForEach with a BlockingCollection<T> as source, requires two specific adjustments:

  1. Use the EnumerablePartitionerOptions.NoBuffering option.
  2. Specify the MaxDegreeOfParallelism to a value other than -1.

Example of correct usage:

Partitioner<Item> partitioner = Partitioner.Create(collection.GetConsumingEnumerable(),
    EnumerablePartitionerOptions.NoBuffering);

ParallelOptions options = new()
{
    MaxDegreeOfParallelism = Environment.ProcessorCount
};

Parallel.ForEach(partitioner, options, item =>
{
    //...
});

Explanation:

  1. The EnumerablePartitionerOptions.NoBuffering is required because otherwise the Parallel.ForEach will not process immediately each consumed item. Instead it will put the item in a small buffer, and will wait until the buffer reaches an arbitrary size before processing all the items in the buffer. This behavior introduces undesirable latency, and can even cause deadlocks in some advanced scenarios.
  2. Configuring the MaxDegreeOfParallelism is required in order to keep the ThreadPool usage under control. Otherwise the Parallel.ForEach will keep asking for more and more threads, prompting the ThreadPool to create new threads at a rate of one new thread per second, even while the BlockingCollection<T> is completely empty and the parallel loop is idle! For an experimental demonstration of this strange behavior, see this question.
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文