是否可以在执行 Parallel.ForEach 期间更改parallelOptions.MaxDegreeOfParallelism?

发布于 2024-09-19 00:26:28 字数 404 浏览 13 评论 0 原文

我正在运行一个多线程循环:

protected ParallelOptions parallelOptions = new ParallelOptions();

parallelOptions.MaxDegreeOfParallelism = 2;
Parallel.ForEach(items, parallelOptions, item =>
{
    // Loop code here
});

我想在并行循环执行期间更改parallelOptions.MaxDegreeOfParallelism,以减少或增加线程数。

parallelOptions.MaxDegreeOfParallelism = 5;

好像并没有增加线程数。有人有什么想法吗?

I am running a multi-threaded loop:

protected ParallelOptions parallelOptions = new ParallelOptions();

parallelOptions.MaxDegreeOfParallelism = 2;
Parallel.ForEach(items, parallelOptions, item =>
{
    // Loop code here
});

I want to change the parallelOptions.MaxDegreeOfParallelism during the execution of the parallel loop, to reduce or increase a number of threads.

parallelOptions.MaxDegreeOfParallelism = 5;

It doesn't seem to increase the threads. Does anyone have any ideas?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

荒芜了季节 2024-09-26 00:26:28

即使尝试这样做,问题也在于这是一个难题。对于初学者来说,如何可靠地观察 CPU 和磁盘利用率?不频繁地对 CPU 进行采样将无法准确地了解实际情况,而对磁盘利用率进行采样则更加困难。其次,您的任务的粒度是多少,以及您多久可以快速实际更改正在运行的数量。第三,随着时间的推移,事情会迅速变化,因此您需要对观察结果进行某种过滤。第四,理想的线程数取决于代码实际运行的 CPU。第五,如果你分配了太多的线程,你就会在它们之间来回奔波,而不是做有用的工作。

请参阅 http://msdn.microsoft.com/en-us/magazine/ff960958 .aspx 有关 .NET 中的线程池如何处理决定使用多少线程的复杂任务的讨论。

您还可以使用反射器并查看 TPL 用于分配线程并避免不必要的上下文切换的代码 - 它很复杂,甚至没有考虑磁盘访问!

您可以尝试在优先级较低的线程上执行任务(创建自己的 TaskScheduler 来运行低于正常优先级的线程实际上非常容易)。这至少可以确保您可以运行 100% 的 CPU,而不会影响系统的其他部分。弄乱线程优先级本身就充满了问题,但如果这是一个纯粹的后台任务,那么它可能很简单并且可能会有所帮助。

但通常情况下,当其他应用程序受到某个贪婪应用程序的影响时,磁盘利用率才是真正的罪魁祸首。 Windows 可以轻松地在应用程序之间公平地分配 CPU,但是当涉及相对较慢的磁盘访问时,情况就完全不同了。您可能不需要尝试动态调整正在运行的线程数量,而是需要简单地限制应用程序,使其不会过于频繁地访问磁盘。您无需更改活动线程数即可完成此操作。

您还可以将 SetPriorityClass 视为通知操作系统您的进程不如系统上运行的其他应用程序重要的一种方式,请参阅如何提高进程的 I/O 优先级? 了解更多信息。但这是假设你的整个过程不那么重要,而不仅仅是这一部分。

The issue with even trying to do this is that it's a hard problem. For starters, how do you even observe CPU and disk utilization reliably? Sampling CPU infrequently will give a poor picture of what's actually going on and sampling disk utilization is even harder. Secondly, what is the granularity of your tasks and how often can you quickly can you actually change the number that are running. Thirdly things change rapidly over time so you need to apply some kind of filtering to your observations. Fourthly, the ideal number of threads will depend on the CPU that the code is actually running on. Fifthly, if you allocate too many threads you'll be thrashing between them instead of doing useful work.

See http://msdn.microsoft.com/en-us/magazine/ff960958.aspx for a discussion on how the Thread Pool in .NET handles the complex task of deciding how many threads to use.

You could also use reflector and take a look at the code that TPL uses to allocate threads and to avoid unnecessary context switching - it's complex and that's not even taking disk access into account!

You could instead try executing the tasks on a lower priority thread (creating your own TaskScheduler that runs threads with a priority of below-normal is actually quite easy). That at least will ensure that you can run up 100% CPU without impacting the rest of the system. Messing with thread priorities is in itself fraught with problems but if this is a purely background task it can be straightforward and might help.

Often though, disk utilization is the real culprit when it comes to other applications suffering at the hands of one greedy application. Windows can allocate CPU fairly between applications with ease but when relatively slow disk access is involved it's quite another matter. Instead of trying to dynamically adjust how many thread you have running you may instead need to simply throttle your application such that it doesn't access the disk too often. That's something you can do without changing how many threads are active.

You could also look at SetPriorityClass as a way to inform the OS that your process is less important than other applications running on the system, see How can I/O priority of a process be increased? for more information. But that assumes your whole process is less important, not just this part of it.

梦年海沫深 2024-09-26 00:26:28

我不希望在调用 ForEach 后可以更改并行度。据我了解,ForEach 将确定它可以创建多少个线程,创建那么多分区,并创建在这些分区上操作的线程。它没有必要说:“哦,等等,他改变了我们的资源分配,让我重新分区数组并重新分配线程。”

I wouldn't expect it to be possible to change the degree of parallelism after you've called ForEach. As I understand it, ForEach is going to determine how many threads it can create, create that many partitions, and create the threads to operate on those partitions. There's no point at which it can say, "Oh, wait, he changed our resource allocation, let me re-partition the array and re-allocate threads."

春夜浅 2024-09-26 00:26:28

这是 .NET 6 的变体 < code>Parallel.ForEachAsync 允许动态配置并行度的 API。它与本机 API 共享相同的参数和行为,除了采用 ParallelOptions 作为参数 (DynamicParallelOptions)。该类有一个额外的属性DegreeOfParallelism。更改此属性会导致当前活动并行度的快速调整。

此实现基于限制 Parallel.ForEachAsync API 的 source 序列的思想。 API 本身配置了 DegreeOfParallelism 的最大预期值。通过拒绝循环自由访问源元素,可以有效地限制实际的并行性。每次处理另一个元素时,都会向前传播一个元素。限制本身是通过无界 SemaphoreSlim。通过调用信号量的 Release/WaitAsync 方法来更改最大并行度。

/// <summary>
/// Executes a parallel foreach operation on an asynchronous sequence, enforcing
/// a degree of parallelism that can be dynamically changed during the execution.
/// </summary>
public static Task DynamicParallelForEachAsync<TSource>(
    IAsyncEnumerable<TSource> source,
    DynamicParallelOptions options,
    Func<TSource, CancellationToken, ValueTask> body)
{
    ArgumentNullException.ThrowIfNull(source);
    ArgumentNullException.ThrowIfNull(options);
    ArgumentNullException.ThrowIfNull(body);

    SemaphoreSlim throttler = new(options.DegreeOfParallelism);
    options.DegreeOfParallelismChangedDelta += Options_ChangedDelta;
    void Options_ChangedDelta(object sender, int delta)
    {
        if (delta > 0)
            throttler.Release(delta);
        else
            for (int i = delta; i < 0; i++) throttler.WaitAsync();
    }

    async IAsyncEnumerable<TSource> GetThrottledSource(
        [EnumeratorCancellation] CancellationToken cancellationToken = default)
    {
        IAsyncEnumerator<TSource> enumerator = source.GetAsyncEnumerator(
            cancellationToken);
        await using (enumerator.ConfigureAwait(false))
        {
            while (true)
            {
                await throttler.WaitAsync().ConfigureAwait(false);
                if (!await enumerator.MoveNextAsync().ConfigureAwait(false)) break;
                yield return enumerator.Current;
            }
        }
    }

    return Parallel.ForEachAsync(GetThrottledSource(), options, async (item, ct) =>
    {
        try { await body(item, ct).ConfigureAwait(false); }
        finally { throttler.Release(); }
    }).ContinueWith(t =>
    {
        options.DegreeOfParallelismChangedDelta -= Options_ChangedDelta;
        return t;
    }, default, TaskContinuationOptions.DenyChildAttach |
        TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default).Unwrap();
}

/// <summary>
/// Stores options that configure the DynamicParallelForEachAsync method.
/// </summary>
public class DynamicParallelOptions : ParallelOptions
{
    private int _degreeOfParallelism;

    public event EventHandler<int> DegreeOfParallelismChangedDelta;

    public DynamicParallelOptions(int maxDegreeOfParallelism)
    {
        // The native Parallel.ForEachAsync will see the base.MaxDegreeOfParallelism.
        base.MaxDegreeOfParallelism = maxDegreeOfParallelism;
        _degreeOfParallelism = Environment.ProcessorCount;
    }

    public int DegreeOfParallelism
    {
        get { return _degreeOfParallelism; }
        set
        {
            if (value < 1) throw new ArgumentOutOfRangeException();
            if (value == _degreeOfParallelism) return;
            int delta = value - _degreeOfParallelism;
            DegreeOfParallelismChangedDelta?.Invoke(this, delta);
            _degreeOfParallelism = value;
        }
    }
}

DynamicParallelOptions.DegreeOfParallelism 属性不是线程安全的。假设控制并行度将由单个线程执行,或者至少操作将被同步。

使用示例,具有 Channel< T> 作为并行循环的源:

Channel<int> channel = Channel.CreateUnbounded<int>();
DynamicParallelOptions options = new(maxDegreeOfParallelism: 50)
{
    DegreeOfParallelism = 2
};

await DynamicParallelForEachAsync(
    channel.Reader.ReadAllAsync(), options, async (item, ct) =>
    {
        Console.WriteLine($"Processing #{item}");
        await Task.Delay(1000, ct); // Simulate an I/O-bound operation
    });
// Push values to the channel from any thread
channel.Writer.TryWrite(1);
channel.Writer.TryWrite(2);
channel.Writer.TryWrite(3);
channel.Writer.Complete();
// Set the DegreeOfParallelism to a positive value at any time from a single thread
options.DegreeOfParallelism = 5;

同步source或同步body的一些重载:

public static Task DynamicParallelForEachAsync<TSource>(
    IEnumerable<TSource> source,
    DynamicParallelOptions options,
    Func<TSource, CancellationToken, ValueTask> body)
{
    ArgumentNullException.ThrowIfNull(source);

    #pragma warning disable CS1998
    async IAsyncEnumerable<TSource> GetSource()
    { foreach (TSource item in source) yield return item; }
    #pragma warning restore CS1998

    return DynamicParallelForEachAsync(GetSource(), options, body);
}

public static void DynamicParallelForEach<TSource>(
    IEnumerable<TSource> source,
    DynamicParallelOptions options,
    Action<TSource, CancellationToken> body)
{
    ArgumentNullException.ThrowIfNull(body);
    DynamicParallelForEachAsync(source, options, (item, ct) =>
    {
        body(item, ct); return ValueTask.CompletedTask;
    }).Wait();
}

的默认值DegreeOfParallelismEnvironment.ProcessorCount。允许将 DegreeOfParallelism 设置为大于 MaxDegreeOfParallelism 的值,尽管它没有任何效果。 MaxDegreeOfParallelism 表示有效并行度的硬性上限。建议将 MaxDegreeOfParallelism 配置为合理的值。将其设置得太高(例如 Int32.MaxValue)可能会增加整个操作的内存和 CPU 开销,尤其是在源包含大量元素的情况下。

Here is a variant of the .NET 6 Parallel.ForEachAsync API that allows to configure dynamically the degree of parallelism. It shares the same parameters and behavior with the native API, except from taking a derived version of the ParallelOptions as argument (DynamicParallelOptions). This class has an extra property DegreeOfParallelism. Changing this property results in a rapid adaptation of the currently active degree of parallelism.

This implementation is based on the idea of throttling the source sequence of the Parallel.ForEachAsync API. The API itself is configured with the maximum expected value for the DegreeOfParallelism. The actual parallelism is limited effectively by denying the loop from free access to the source elements. An element is propagated forward every time another element is processed. The throttling itself is performed with an unbounded SemaphoreSlim. Changing the maximum degree of parallelism is performed by calling the Release/WaitAsync methods of the semaphore.

/// <summary>
/// Executes a parallel foreach operation on an asynchronous sequence, enforcing
/// a degree of parallelism that can be dynamically changed during the execution.
/// </summary>
public static Task DynamicParallelForEachAsync<TSource>(
    IAsyncEnumerable<TSource> source,
    DynamicParallelOptions options,
    Func<TSource, CancellationToken, ValueTask> body)
{
    ArgumentNullException.ThrowIfNull(source);
    ArgumentNullException.ThrowIfNull(options);
    ArgumentNullException.ThrowIfNull(body);

    SemaphoreSlim throttler = new(options.DegreeOfParallelism);
    options.DegreeOfParallelismChangedDelta += Options_ChangedDelta;
    void Options_ChangedDelta(object sender, int delta)
    {
        if (delta > 0)
            throttler.Release(delta);
        else
            for (int i = delta; i < 0; i++) throttler.WaitAsync();
    }

    async IAsyncEnumerable<TSource> GetThrottledSource(
        [EnumeratorCancellation] CancellationToken cancellationToken = default)
    {
        IAsyncEnumerator<TSource> enumerator = source.GetAsyncEnumerator(
            cancellationToken);
        await using (enumerator.ConfigureAwait(false))
        {
            while (true)
            {
                await throttler.WaitAsync().ConfigureAwait(false);
                if (!await enumerator.MoveNextAsync().ConfigureAwait(false)) break;
                yield return enumerator.Current;
            }
        }
    }

    return Parallel.ForEachAsync(GetThrottledSource(), options, async (item, ct) =>
    {
        try { await body(item, ct).ConfigureAwait(false); }
        finally { throttler.Release(); }
    }).ContinueWith(t =>
    {
        options.DegreeOfParallelismChangedDelta -= Options_ChangedDelta;
        return t;
    }, default, TaskContinuationOptions.DenyChildAttach |
        TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default).Unwrap();
}

/// <summary>
/// Stores options that configure the DynamicParallelForEachAsync method.
/// </summary>
public class DynamicParallelOptions : ParallelOptions
{
    private int _degreeOfParallelism;

    public event EventHandler<int> DegreeOfParallelismChangedDelta;

    public DynamicParallelOptions(int maxDegreeOfParallelism)
    {
        // The native Parallel.ForEachAsync will see the base.MaxDegreeOfParallelism.
        base.MaxDegreeOfParallelism = maxDegreeOfParallelism;
        _degreeOfParallelism = Environment.ProcessorCount;
    }

    public int DegreeOfParallelism
    {
        get { return _degreeOfParallelism; }
        set
        {
            if (value < 1) throw new ArgumentOutOfRangeException();
            if (value == _degreeOfParallelism) return;
            int delta = value - _degreeOfParallelism;
            DegreeOfParallelismChangedDelta?.Invoke(this, delta);
            _degreeOfParallelism = value;
        }
    }
}

The DynamicParallelOptions.DegreeOfParallelism property is not thread-safe. It is assumed that controlling the degree of parallelism will be performed by a single thread, or at least that the operations will be synchronized.

Usage example, featuring a Channel<T> as the source of the parallel loop:

Channel<int> channel = Channel.CreateUnbounded<int>();
DynamicParallelOptions options = new(maxDegreeOfParallelism: 50)
{
    DegreeOfParallelism = 2
};

await DynamicParallelForEachAsync(
    channel.Reader.ReadAllAsync(), options, async (item, ct) =>
    {
        Console.WriteLine(
quot;Processing #{item}");
        await Task.Delay(1000, ct); // Simulate an I/O-bound operation
    });
// Push values to the channel from any thread
channel.Writer.TryWrite(1);
channel.Writer.TryWrite(2);
channel.Writer.TryWrite(3);
channel.Writer.Complete();
// Set the DegreeOfParallelism to a positive value at any time from a single thread
options.DegreeOfParallelism = 5;

Some overloads with synchronous source, or synchronous body:

public static Task DynamicParallelForEachAsync<TSource>(
    IEnumerable<TSource> source,
    DynamicParallelOptions options,
    Func<TSource, CancellationToken, ValueTask> body)
{
    ArgumentNullException.ThrowIfNull(source);

    #pragma warning disable CS1998
    async IAsyncEnumerable<TSource> GetSource()
    { foreach (TSource item in source) yield return item; }
    #pragma warning restore CS1998

    return DynamicParallelForEachAsync(GetSource(), options, body);
}

public static void DynamicParallelForEach<TSource>(
    IEnumerable<TSource> source,
    DynamicParallelOptions options,
    Action<TSource, CancellationToken> body)
{
    ArgumentNullException.ThrowIfNull(body);
    DynamicParallelForEachAsync(source, options, (item, ct) =>
    {
        body(item, ct); return ValueTask.CompletedTask;
    }).Wait();
}

The default value of the DegreeOfParallelism is Environment.ProcessorCount. Setting the DegreeOfParallelism to a value larger than the MaxDegreeOfParallelism is allowed, although it doesn't have any effect. The MaxDegreeOfParallelism represents the hard upper limit for the effective degree of parallelism. It is recommended to configure the MaxDegreeOfParallelism with a reasonable value. Setting it too high, for example to Int32.MaxValue, might increase the memory and CPU overhead of the whole operation, especially if the source contains a large number of elements.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文