使用 BlockingCollection.GetConsumableEnumerable 的 Parallel.ForEach 循环
为什么在使用 GetConsumableEnumerable
时,Parallel.ForEach
循环会因 OperationCancelledException
退出?
//outside the function
static BlockingCollection<double> _collection = new BlockingCollection<double>();
var t = Task.Factory.StartNew(Producer);
Parallel.ForEach(_collection.GetConsumingEnumerable(),
item => Console.WriteLine("Processed {0}", item));
Console.WriteLine("FINISHED processing");
public static void Producer()
{
var data = Enumerable.Range(1, 1000);
foreach (var i in data)
{
_collection.Add(i);
Console.WriteLine("Added {0}",i);
}
Console.WriteLine("Finished adding");
_collection.CompleteAdding();
}
Why Parallel.ForEach
loop exits with OperationCancelledException
, while using GetConsumableEnumerable
?
//outside the function
static BlockingCollection<double> _collection = new BlockingCollection<double>();
var t = Task.Factory.StartNew(Producer);
Parallel.ForEach(_collection.GetConsumingEnumerable(),
item => Console.WriteLine("Processed {0}", item));
Console.WriteLine("FINISHED processing");
public static void Producer()
{
var data = Enumerable.Range(1, 1000);
foreach (var i in data)
{
_collection.Add(i);
Console.WriteLine("Added {0}",i);
}
Console.WriteLine("Finished adding");
_collection.CompleteAdding();
}
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
正如我最近发现的那样,将
Parallel.ForEach
与BlockingCollection
一起使用有些问题。它可以工作,但需要一些额外的努力。Stephen Toub 有一篇优秀的博客文章,如果您下载“并行扩展附加”项目(也可在 NuGet 上使用)您会发现一些代码可以为您提供帮助。
Using
Parallel.ForEach
withBlockingCollection
is somewhat problematic, as I found out recently. It can be made to work, but it needs a little extra effort.Stephen Toub has an excellent blog post on it, and if you download the "Parallel Extension Extras" project (also available on NuGet) you'll find some code ready to help you.
使用
Parallel.ForEach
与
BlockingCollection
作为源,需要进行两项具体调整:EnumerablePartitionerOptions.NoBuffering
选项。MaxDegreeOfParallelism
为-1
以外的值。正确用法示例:
说明:
EnumerablePartitionerOptions .NoBuffering
是必需的,否则Parallel.ForEach
将不会立即处理每个消耗的项目。相反,它会将项目放入一个小缓冲区中,并等待缓冲区达到任意大小,然后再处理缓冲区中的所有项目。此行为会引入不希望的延迟,甚至可能在某些高级场景中导致死锁。MaxDegreeOfParallelism
需要 才能保留ThreadPool
使用情况受到控制。否则,Parallel.ForEach
将不断请求越来越多的线程,从而促使ThreadPool
以每秒一个新线程的速度创建新线程,即使>BlockingCollection
完全为空,并行循环空闲!有关这种奇怪行为的实验演示,请参阅Using the
Parallel.ForEach
with aBlockingCollection<T>
as source, requires two specific adjustments:EnumerablePartitionerOptions.NoBuffering
option.MaxDegreeOfParallelism
to a value other than-1
.Example of correct usage:
Explanation:
EnumerablePartitionerOptions.NoBuffering
is required because otherwise theParallel.ForEach
will not process immediately each consumed item. Instead it will put the item in a small buffer, and will wait until the buffer reaches an arbitrary size before processing all the items in the buffer. This behavior introduces undesirable latency, and can even cause deadlocks in some advanced scenarios.MaxDegreeOfParallelism
is required in order to keep theThreadPool
usage under control. Otherwise theParallel.ForEach
will keep asking for more and more threads, prompting theThreadPool
to create new threads at a rate of one new thread per second, even while theBlockingCollection<T>
is completely empty and the parallel loop is idle! For an experimental demonstration of this strange behavior, see this question.