AsParallel() 和内部缓冲区大小
如何限制 AsParallel() 预先读取并放入其内部缓冲区的项目数量?
这是一个例子:
int returnedCounter;
IEnumerable<int> Enum()
{
while (true)
yield return Interlocked.Increment(ref returnedCounter);
}
[TestMethod]
public void TestMethod1()
{
foreach (var i in Enum().AsParallel().Select(a => a))
{
Thread.Sleep(3000);
break;
}
Console.WriteLine(returnedCounter);
}
我消耗了 1 个项目,睡眠,停止枚举。它在我的机器上打印 526400。在我的实际项目中,每个项目分配数千字节。 AsParallel() 预先读取大量项目,这会导致非常严重的内存消耗和 CPU 浪费。
Put WithMergeOptions(ParallelMergeOptions.NotBuffered) 有一点帮助。它打印出 4544。但这对我来说仍然太多了。
在 Enum() 中等待会冻结主线程中的循环。
How to limit the amount of items that AsParallel() reads upfront and puts in its internal buffer?
Here is an example:
int returnedCounter;
IEnumerable<int> Enum()
{
while (true)
yield return Interlocked.Increment(ref returnedCounter);
}
[TestMethod]
public void TestMethod1()
{
foreach (var i in Enum().AsParallel().Select(a => a))
{
Thread.Sleep(3000);
break;
}
Console.WriteLine(returnedCounter);
}
I consume 1 item, sleep, stop enumeration. It prints 526400 on my machine. In my real project each item allocates thousands of kilobytes. AsParallel() reads a lot of items upfront which results to very bad memory consumption and waste of CPU.
Putting WithMergeOptions(ParallelMergeOptions.NotBuffered) helps a bit. It prints 4544. But it is still too much for me.
Waiting in Enum() freezes the loop in the main thread.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
关于分区器的另一个问题!
在您的情况下,您将必须找到/编写一个一次仅获取一项的分区程序。
这是一篇关于自定义分区
更新:
我刚刚记得在哪里看到过
SingleItemPartitioner
实现:它位于ParallelExtensionsExtras
项目中:使用 .NET Framework 进行并行编程的示例我也刚刚阅读你的测试代码。我可能第一次就应该这样做!
此代码:
意味着:采用
Enum()
并尽可能快地并行枚举它,并返回一个新的IEnumerable
。因此,您的
foreach
并不是从Enum()
中提取项目,而是从 linq 语句创建的新IEnumerable
中提取项目。此外,您的
foreach
在主线程上运行,因此每个项目的工作都是单线程的。如果您想并行运行,但仅在需要时生成一个项目,请尝试:
Another question about Partitioners!
In your case, you will have to find / write a Partitioner that only takes one item at a time.
Here's an article on Custom Partitioners
UPDATE:
I just remembered where I saw a
SingleItemPartitioner
implementation: it's in theParallelExtensionsExtras
project here: Samples for Parallel Programming with the .NET FrameworkI've also just read your test code. I probably should have done that first time!
This code:
means: take
Enum()
and enumerate it as fast as possible, in parallel, and return a newIEnumerable<int>
.So your
foreach
isn't pulling items fromEnum()
- it's pulling items from a newIEnumerable<int>
created by the linq statement.Also, your
foreach
runs on the main thread, so the work on each item is single-threaded.If you want to run in parallel, but only yield an item when it is required, try:
找到了解决方法。
首先,让我澄清一下原来的问题。我需要一个可以无限序列工作的可暂停管道。管道是:
Enum()
AsParallel().Select(a => a)
foreach
body第 3 步可能会暂停管道。这是由
Sleep()
模拟的。问题是当管道暂停时,步骤 2 会提前获取太多元素。PLinq 必须有一些内部队列。无法显式配置队列大小。不过,大小取决于 ParallelMergeOptions。 ParallelMergeOptions.NotBuffered 降低了队列大小,但该大小对我来说仍然太大。
我的解决方法是了解正在处理多少个项目,在达到限制时停止并行处理,在管道再次启动时重新启动并行处理。
Found a workaround.
First, let me clarify the original question. I need a pausable pipeline that works on infinite sequence. Pipeline is:
Enum()
AsParallel().Select(a => a)
foreach
bodyStep 3 may pause the pipeline. That is emulated by
Sleep()
. Problem is that step 2 fetches too many elements ahead when pipleine is paused.PLinq must have some internal queue. Queue size cannot be configured explicitly. The size depends on
ParallelMergeOptions
though.ParallelMergeOptions.NotBuffered
lowers the queue size, but the size is still too big for me.My workaround is to know how many items are being processed, stop parallel processing when limit is reached, restart parallel processing when pipeline is started again.