.NET6 中具有优先级队列的 Parallel.ForEach

发布于 2025-01-14 04:47:32 字数 1157 浏览 5 评论 0原文

我尝试在我的 Priority Queue 上运行 Parallel.ForEach 但出现以下错误:

严重性代码描述项目文件行抑制状态 错误 CS0411 无法从用法推断方法“Parallel.ForEach(OrderablePartitioner, ParallelOptions, Action)”的类型参数。尝试显式指定类型参数。 TPL_POC.PL

我知道如何使用 IEnumerableList 执行 Parallel.ForEach,但以下内容没有运气。

private void ProcessTasksParallely()
{
    PriorityQueue<string, int> activeTasksPriority = new PriorityQueue<string, int>();
    foreach (var task in this.tasks)
    {
        activeTasksPriority.Enqueue(task.Task, task.Id);
    }
    Console.WriteLine("Processing");

    var options = new ParallelOptions { MaxDegreeOfParallelism = (Environment.ProcessorCount / 2) * 10 };

    Parallel.ForEach(activeTasksPriority.TryDequeue(out string t, out int priority),
        options,
        (t, priority) =>
        {
            Console.WriteLine($" task {priority}, task = {t}, thread = {Thread.CurrentThread.ManagedThreadId}");
            Thread.Sleep(100);
        });
}

我正在尝试这样做,因为我需要并行处理任务,但要根据它们安排的优先级。

I'm trying to run Parallel.ForEach on my Priority Queue but I am getting the following error:

Severity Code Description Project File Line Suppression State
Error CS0411 The type arguments for method 'Parallel.ForEach(OrderablePartitioner, ParallelOptions, Action<TSource, ParallelLoopState, long>)' cannot be inferred from the usage. Try specifying the type arguments explicitly. TPL_POC.PL

I know how to execute Parallel.ForEach with IEnumerable and Lists but there's no luck with the following.

private void ProcessTasksParallely()
{
    PriorityQueue<string, int> activeTasksPriority = new PriorityQueue<string, int>();
    foreach (var task in this.tasks)
    {
        activeTasksPriority.Enqueue(task.Task, task.Id);
    }
    Console.WriteLine("Processing");

    var options = new ParallelOptions { MaxDegreeOfParallelism = (Environment.ProcessorCount / 2) * 10 };

    Parallel.ForEach(activeTasksPriority.TryDequeue(out string t, out int priority),
        options,
        (t, priority) =>
        {
            Console.WriteLine(
quot; task {priority}, task = {t}, thread = {Thread.CurrentThread.ManagedThreadId}");
            Thread.Sleep(100);
        });
}

I am trying this because I need to process tasks parallel but according to the priority they were scheduled.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

荒人说梦 2025-01-21 04:47:32

PriorityQueue类没有提供将其作为开箱即用的 IEnumerable 使用的方法。它只有一个 UnorderedItems 属性,这不是您想要的。此属性生成队列的内容而不消耗它们,并且不按特定顺序。不过,为 PriorityQueue 类实现自定义 GetConsumingEnumerable 方法很容易,如下所示:

/// <summary>
/// Gets an enumerable sequence that consumes the elements of the queue
/// in an ordered manner.
/// </summary>
public static IEnumerable<(TElement Element, TPriority Priority)>
    GetConsumingEnumerable<TElement, TPriority>(
    this PriorityQueue<TElement, TPriority> source)
{
    while (source.TryDequeue(out TElement element, out TPriority priority))
    {
        yield return (element, priority);
    }
}

使用示例:

var partitioner = Partitioner.Create(activeTasksPriority.GetConsumingEnumerable(),
    EnumerablePartitionerOptions.NoBuffering);

Parallel.ForEach(partitioner, options, entry =>
{
    var (t, priority) = entry;
    Console.WriteLine($"Priority: {priority}, Task: {t}");
    Thread.Sleep(100);
});

Partitioner.Create+NoBuffering是为了防止Parallel.ForEach提前消耗元素并将其存储到缓冲区中,然后它已准备好处理它们。

注意:此答案涉及问题中提出的简单场景,其中 PriorityQueue 在启动并行循环之前已完全填充。如果您想在循环运行时在队列中添加更多项目,则不能直接使用 PriorityQueue,原因有两个:

  1. 它不是线程安全的集合。
  2. 它没有阻塞功能,因此循环可能会在处理所有项目之前提前完成。

如果你正在处理这样的场景,你可以看看这个问题:优先并发收集。

The PriorityQueue<TElement, TPriority> class does not offer a way to consume it as an IEnumerable out of the box. It only has an UnorderedItems property, which is not what you want. This property yields the contents of the queue without consuming them, and in no particular order. It is easy though to implement a custom GetConsumingEnumerable method for the PriorityQueue<TElement, TPriority> class, like this:

/// <summary>
/// Gets an enumerable sequence that consumes the elements of the queue
/// in an ordered manner.
/// </summary>
public static IEnumerable<(TElement Element, TPriority Priority)>
    GetConsumingEnumerable<TElement, TPriority>(
    this PriorityQueue<TElement, TPriority> source)
{
    while (source.TryDequeue(out TElement element, out TPriority priority))
    {
        yield return (element, priority);
    }
}

Usage example:

var partitioner = Partitioner.Create(activeTasksPriority.GetConsumingEnumerable(),
    EnumerablePartitionerOptions.NoBuffering);

Parallel.ForEach(partitioner, options, entry =>
{
    var (t, priority) = entry;
    Console.WriteLine(
quot;Priority: {priority}, Task: {t}");
    Thread.Sleep(100);
});

The intention of the Partitioner.Create+NoBuffering is to prevent the Parallel.ForEach from consuming elements in advance and storing them into a buffer, before it's ready to process them.

Note: This answer deals with the simple scenario presented in the question, where the PriorityQueue<E,P> is fully populated before starting the parallel loop. In case you want to add more items in the queue while the loop is running, you can't use directly a PriorityQueue<E,P> for two reasons:

  1. It's not a thread-safe collection.
  2. It doesn't have blocking capabilities, so the loop might complete prematurely before all items are processed.

If you are dealing with such a scenario, you could take a look at this question: Concurrent collection with priority.

好听的两个字的网名 2025-01-21 04:47:32

如果您想在发布/订阅场景中实现优先级,Parallel.ForEachPriorityQueue 都不是好的选择。

  • Parallel.ForEach 专为数据并行而构建 - 通过对内存中的数据进行分区来处理大量内存数据,并使用每个核心大约一个工作任务来以最少的同步处理每个分区。这里不需要 PriorityQueue - 如果您想要特定的顺序,您可以使用例如 PLINQ 和 OrderBy 来强加它。
  • 优先级不可避免地会改变项目的感知顺序和队列状态,这对于并发来说是一个很大的禁忌。
  • 优先级可能会颠倒。所有工作任务可能都忙于处理低优先级项目,而新的高优先级项目正在等待。更糟糕的是,Parallel.ForEach 使用的默认分区器会缓冲项目。这意味着新的高优先级项目可能必须等待多个低优先级项目。您必须使用 Partitioner.Create 并选择 禁用缓冲

在高吞吐量网络和消息传递中,优先级处理是通过多个队列而不是单个优先级队列执行的。较高优先级的队列可以获得更多的资源,或者在较低优先级的队列之前得到处理。

每个优先级一个队列

这就是高度可扩展的消息传递系统的工作方式,因为它不需要任何同步来确定下一步要处理哪个项目。

实现此策略的一种方法是使用多个 ActionBlock 实例,每个实例具有不同数量的工作任务:

async Task ProcessMessage(string msg) {...}

ExecutionDataflowBlockOptions WithDop(int dop)=>new ExecutionDataflowBlockOptions{ 
    MaxDegreeOfParallelism = dop
};


void BuildQueues()
{ 

   _highQueue=new ActionBlock<string>(ProcessMessage,WithDop(4));

   _midQueue=new ActionBlock<string>(ProcessMessage,WithDop(2));

   _lowQueue=new ActionBlock<string>(ProcessMessage,WithDop(1));
}

public void Process(string msg,int priority)
{
    var queue= priority switch {
          0 => _highQueue,
          1 => _midQueue,
          _ => _lowQueue
    }
    queue.Post(msg);    
}

async Task Complete()
{
    _highQueue.Complete();
    _midQueue.Complete();
    _lowQueue.Complete();
    await Task.WhenAll(
        _hiqhQueue.Completion, 
        _midQueue.Completion, 
        _lowQueue.Completion
    );
}

在这种情况下,Process 使用模式匹配将消息路由到适当的 ActionBlock

If you want to implement priority in a pub/sub scenario, both Parallel.ForEach and PriorityQueue<T> are bad choices.

  • Parallel.ForEach is built for data parallelism - processing a ton of in-memory data by partitioning it and using roughly one worker task per core to process each partition with minimal synchronization. A PriorityQueue isn't needed here - if you want a specific order you can impose it using eg PLINQ and OrderBy.
  • Priorities inevitably change the perceived order of items and the queue state, which is a big no-no for concurrency.
  • Priorities can get inverted. All worker tasks may be busy processing low-priority items while a new high priority item is waiting. Worse, the default partitioner used by Parallel.ForEach buffers items. This means that a new high priority item may have to wait for multiple low priority items. You'd have to use Partitioner.Create with an option to disable buffering

In high-throughput networking and messaging, priority processing is performed through multiple queues not a single priority queue. Higher-priority queues get more resources or are processed before lower priority queues.

One queue per priority class

This is how highly-scaleable messaging systems work, because it doesn't require any synchronization to determine which item to process next.

One way to implement this strategy would be to use multiple ActionBlock instances, each with a different number of worker tasks :

async Task ProcessMessage(string msg) {...}

ExecutionDataflowBlockOptions WithDop(int dop)=>new ExecutionDataflowBlockOptions{ 
    MaxDegreeOfParallelism = dop
};


void BuildQueues()
{ 

   _highQueue=new ActionBlock<string>(ProcessMessage,WithDop(4));

   _midQueue=new ActionBlock<string>(ProcessMessage,WithDop(2));

   _lowQueue=new ActionBlock<string>(ProcessMessage,WithDop(1));
}

public void Process(string msg,int priority)
{
    var queue= priority switch {
          0 => _highQueue,
          1 => _midQueue,
          _ => _lowQueue
    }
    queue.Post(msg);    
}

async Task Complete()
{
    _highQueue.Complete();
    _midQueue.Complete();
    _lowQueue.Complete();
    await Task.WhenAll(
        _hiqhQueue.Completion, 
        _midQueue.Completion, 
        _lowQueue.Completion
    );
}

In this case Process uses pattern matching to route the message to the appropriate ActionBlock

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文