AsParallel() 和内部缓冲区大小

发布于 2024-12-19 19:02:56 字数 659 浏览 1 评论 0原文

如何限制 AsParallel() 预先读取并放入其内部缓冲区的项目数量?

这是一个例子:

int returnedCounter;

IEnumerable<int> Enum()
{
    while (true)
        yield return Interlocked.Increment(ref returnedCounter);
}

[TestMethod]
public void TestMethod1()
{
    foreach (var i in Enum().AsParallel().Select(a => a))
    {
        Thread.Sleep(3000);
        break;
    }
    Console.WriteLine(returnedCounter);
}

我消耗了 1 个项目,睡眠,停止枚举。它在我的机器上打印 526400。在我的实际项目中,每个项目分配数千字节。 AsParallel() 预先读取大量项目,这会导致非常严重的内存消耗和 CPU 浪费。

Put WithMergeOptions(ParallelMergeOptions.NotBuffered) 有一点帮助。它打印出 4544。但这对我来说仍然太多了。

在 Enum() 中等待会冻结主线程中的循环。

How to limit the amount of items that AsParallel() reads upfront and puts in its internal buffer?

Here is an example:

int returnedCounter;

IEnumerable<int> Enum()
{
    while (true)
        yield return Interlocked.Increment(ref returnedCounter);
}

[TestMethod]
public void TestMethod1()
{
    foreach (var i in Enum().AsParallel().Select(a => a))
    {
        Thread.Sleep(3000);
        break;
    }
    Console.WriteLine(returnedCounter);
}

I consume 1 item, sleep, stop enumeration. It prints 526400 on my machine. In my real project each item allocates thousands of kilobytes. AsParallel() reads a lot of items upfront which results to very bad memory consumption and waste of CPU.

Putting WithMergeOptions(ParallelMergeOptions.NotBuffered) helps a bit. It prints 4544. But it is still too much for me.

Waiting in Enum() freezes the loop in the main thread.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

有木有妳兜一样 2024-12-26 19:02:56

关于分区器的另一个问题!

在您的情况下,您将必须找到/编写一个一次仅获取一项的分区程序。

这是一篇关于自定义分区


更新:

我刚刚记得在哪里看到过 SingleItemPartitioner 实现:它位于 ParallelExtensionsExtras 项目中:使用 .NET Framework 进行并行编程的示例

我也刚刚阅读你的测试代码。我可能第一次就应该这样做!

此代码:

Enum().AsParallel().Select(a => a)

意味着:采用 Enum() 并尽可能快地并行枚举它,并返回一个新的 IEnumerable

因此,您的 foreach 并不是从 Enum() 中提取项目,而是从 linq 语句创建的新 IEnumerable 中提取项目。

此外,您的 foreach 在主线程上运行,因此每个项目的工作都是单线程的。

如果您想并行运行,但仅在需要时生成一个项目,请尝试:

Parallel.ForEach( SingleItemPartitioner.Create( Enum() ), ( i, state ) =>
    {
        Thread.Sleep( 3000 );
        state.Break();
    }

Another question about Partitioners!

In your case, you will have to find / write a Partitioner that only takes one item at a time.

Here's an article on Custom Partitioners


UPDATE:

I just remembered where I saw a SingleItemPartitioner implementation: it's in the ParallelExtensionsExtras project here: Samples for Parallel Programming with the .NET Framework

I've also just read your test code. I probably should have done that first time!

This code:

Enum().AsParallel().Select(a => a)

means: take Enum() and enumerate it as fast as possible, in parallel, and return a new IEnumerable<int>.

So your foreach isn't pulling items from Enum() - it's pulling items from a new IEnumerable<int> created by the linq statement.

Also, your foreach runs on the main thread, so the work on each item is single-threaded.

If you want to run in parallel, but only yield an item when it is required, try:

Parallel.ForEach( SingleItemPartitioner.Create( Enum() ), ( i, state ) =>
    {
        Thread.Sleep( 3000 );
        state.Break();
    }
旧时模样 2024-12-26 19:02:56

找到了解决方法。

首先,让我澄清一下原来的问题。我需要一个可以无限序列工作的可暂停管道。管道是:

  1. 同步从序列中读取: Enum()
  2. 并行处理项目: AsParallel().Select(a => a)
  3. 继续同步处理: foreach body

第 3 步可能会暂停管道。这是由 Sleep() 模拟的。问题是当管道暂停时,步骤 2 会提前获取太多元素。
PLinq 必须有一些内部队列。无法显式配置队列大小。不过,大小取决于 ParallelMergeOptions。 ParallelMergeOptions.NotBuffered 降低了队列大小,但该大小对我来说仍然太大。

我的解决方法是了解正在处理多少个项目,在达到限制时停止并行处理,在管道再次启动时重新启动并行处理。

int sourceCounter;

IEnumerable<int> SourceEnum() // infinite input sequence
{
    while (true)
        yield return Interlocked.Increment(ref sourceCounter);
}

[TestMethod]
public void PlainPLinq_PausedConsumtionTest()
{
    sourceCounter = 0;
    foreach (var i in SourceEnum().AsParallel().WithMergeOptions(ParallelMergeOptions.NotBuffered).Select(a => a))
    {
        Thread.Sleep(3000);
        break;
    }
    Console.WriteLine("fetched from source sequence: {0}", sourceCounter); // prints 4544 on my machine
}

[TestMethod]
public void MyParallelSelect_NormalConsumtionTest()
{
    sourceCounter = 0;
    foreach (var i in MyParallelSelect(SourceEnum(), 64, a => a))
    {
        if (sourceCounter > 1000000)
            break;
    }
    Console.WriteLine("fetched from source sequence: {0}", sourceCounter);
}

[TestMethod]
public void MyParallelSelect_PausedConsumtionTest()
{
    sourceCounter = 0;
    foreach (var i in MyParallelSelect(SourceEnum(), 64, a => a))
    {
        Thread.Sleep(3000);
        break;
    }
    Console.WriteLine("fetched from source sequence: {0}", sourceCounter);
}

class DataHolder<D> // reference type to store class or struct D
{
    public D Data;
}

static IEnumerable<DataHolder<T>> FetchSourceItems<T>(IEnumerator<T> sourceEnumerator, DataHolder<int> itemsBeingProcessed, int queueSize)
{
    for (; ; )
    {
        var holder = new DataHolder<T>();
        if (Interlocked.Increment(ref itemsBeingProcessed.Data) > queueSize)
        {
            // many enought items are already being processed - stop feeding parallel processing
            Interlocked.Decrement(ref itemsBeingProcessed.Data);
            yield break;
        }
        if (sourceEnumerator.MoveNext())
        {
            holder.Data = sourceEnumerator.Current;
            yield return holder;
        }
        else
        {
            yield return null; // return null DataHolder to indicate EOF
            yield break;
        }
    }
}

IEnumerable<OutT> MyParallelSelect<T, OutT>(IEnumerable<T> source, int queueSize, Func<T, OutT> selector)
{
    var itemsBeingProcessed = new DataHolder<int>();
    using (var sourceEnumerator = source.GetEnumerator())
    {
        for (;;) // restart parallel processing
        {
            foreach (var outData in FetchSourceItems(sourceEnumerator, itemsBeingProcessed, queueSize).AsParallel().WithMergeOptions(ParallelMergeOptions.NotBuffered).Select(
                inData => inData != null ? new DataHolder<OutT> { Data = selector(inData.Data) } : null))
            {
                Interlocked.Decrement(ref itemsBeingProcessed.Data);
                if (outData == null)
                    yield break; // EOF reached
                yield return outData.Data;
            }
        }
    }
}

Found a workaround.

First, let me clarify the original question. I need a pausable pipeline that works on infinite sequence. Pipeline is:

  1. read from sequence synchronously: Enum()
  2. process items in parallel: AsParallel().Select(a => a)
  3. proceed with synchronous processing: foreach body

Step 3 may pause the pipeline. That is emulated by Sleep(). Problem is that step 2 fetches too many elements ahead when pipleine is paused.
PLinq must have some internal queue. Queue size cannot be configured explicitly. The size depends on ParallelMergeOptions though. ParallelMergeOptions.NotBuffered lowers the queue size, but the size is still too big for me.

My workaround is to know how many items are being processed, stop parallel processing when limit is reached, restart parallel processing when pipeline is started again.

int sourceCounter;

IEnumerable<int> SourceEnum() // infinite input sequence
{
    while (true)
        yield return Interlocked.Increment(ref sourceCounter);
}

[TestMethod]
public void PlainPLinq_PausedConsumtionTest()
{
    sourceCounter = 0;
    foreach (var i in SourceEnum().AsParallel().WithMergeOptions(ParallelMergeOptions.NotBuffered).Select(a => a))
    {
        Thread.Sleep(3000);
        break;
    }
    Console.WriteLine("fetched from source sequence: {0}", sourceCounter); // prints 4544 on my machine
}

[TestMethod]
public void MyParallelSelect_NormalConsumtionTest()
{
    sourceCounter = 0;
    foreach (var i in MyParallelSelect(SourceEnum(), 64, a => a))
    {
        if (sourceCounter > 1000000)
            break;
    }
    Console.WriteLine("fetched from source sequence: {0}", sourceCounter);
}

[TestMethod]
public void MyParallelSelect_PausedConsumtionTest()
{
    sourceCounter = 0;
    foreach (var i in MyParallelSelect(SourceEnum(), 64, a => a))
    {
        Thread.Sleep(3000);
        break;
    }
    Console.WriteLine("fetched from source sequence: {0}", sourceCounter);
}

class DataHolder<D> // reference type to store class or struct D
{
    public D Data;
}

static IEnumerable<DataHolder<T>> FetchSourceItems<T>(IEnumerator<T> sourceEnumerator, DataHolder<int> itemsBeingProcessed, int queueSize)
{
    for (; ; )
    {
        var holder = new DataHolder<T>();
        if (Interlocked.Increment(ref itemsBeingProcessed.Data) > queueSize)
        {
            // many enought items are already being processed - stop feeding parallel processing
            Interlocked.Decrement(ref itemsBeingProcessed.Data);
            yield break;
        }
        if (sourceEnumerator.MoveNext())
        {
            holder.Data = sourceEnumerator.Current;
            yield return holder;
        }
        else
        {
            yield return null; // return null DataHolder to indicate EOF
            yield break;
        }
    }
}

IEnumerable<OutT> MyParallelSelect<T, OutT>(IEnumerable<T> source, int queueSize, Func<T, OutT> selector)
{
    var itemsBeingProcessed = new DataHolder<int>();
    using (var sourceEnumerator = source.GetEnumerator())
    {
        for (;;) // restart parallel processing
        {
            foreach (var outData in FetchSourceItems(sourceEnumerator, itemsBeingProcessed, queueSize).AsParallel().WithMergeOptions(ParallelMergeOptions.NotBuffered).Select(
                inData => inData != null ? new DataHolder<OutT> { Data = selector(inData.Data) } : null))
            {
                Interlocked.Decrement(ref itemsBeingProcessed.Data);
                if (outData == null)
                    yield break; // EOF reached
                yield return outData.Data;
            }
        }
    }
}
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文