.NET6 中具有优先级队列的 Parallel.ForEach
我尝试在我的 Priority Queue
上运行 Parallel.ForEach
但出现以下错误:
严重性代码描述项目文件行抑制状态 错误 CS0411 无法从用法推断方法“Parallel.ForEach(OrderablePartitioner, ParallelOptions, Action
)”的类型参数。尝试显式指定类型参数。 TPL_POC.PL
我知道如何使用 IEnumerable
和 List
执行 Parallel.ForEach
,但以下内容没有运气。
private void ProcessTasksParallely()
{
PriorityQueue<string, int> activeTasksPriority = new PriorityQueue<string, int>();
foreach (var task in this.tasks)
{
activeTasksPriority.Enqueue(task.Task, task.Id);
}
Console.WriteLine("Processing");
var options = new ParallelOptions { MaxDegreeOfParallelism = (Environment.ProcessorCount / 2) * 10 };
Parallel.ForEach(activeTasksPriority.TryDequeue(out string t, out int priority),
options,
(t, priority) =>
{
Console.WriteLine($" task {priority}, task = {t}, thread = {Thread.CurrentThread.ManagedThreadId}");
Thread.Sleep(100);
});
}
我正在尝试这样做,因为我需要并行处理任务,但要根据它们安排的优先级。
I'm trying to run Parallel.ForEach
on my Priority Queue
but I am getting the following error:
Severity Code Description Project File Line Suppression State
Error CS0411 The type arguments for method 'Parallel.ForEach(OrderablePartitioner, ParallelOptions, Action<TSource, ParallelLoopState, long>)' cannot be inferred from the usage. Try specifying the type arguments explicitly. TPL_POC.PL
I know how to execute Parallel.ForEach
with IEnumerable
and List
s but there's no luck with the following.
private void ProcessTasksParallely()
{
PriorityQueue<string, int> activeTasksPriority = new PriorityQueue<string, int>();
foreach (var task in this.tasks)
{
activeTasksPriority.Enqueue(task.Task, task.Id);
}
Console.WriteLine("Processing");
var options = new ParallelOptions { MaxDegreeOfParallelism = (Environment.ProcessorCount / 2) * 10 };
Parallel.ForEach(activeTasksPriority.TryDequeue(out string t, out int priority),
options,
(t, priority) =>
{
Console.WriteLine(quot; task {priority}, task = {t}, thread = {Thread.CurrentThread.ManagedThreadId}");
Thread.Sleep(100);
});
}
I am trying this because I need to process tasks parallel but according to the priority they were scheduled.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
PriorityQueue
类没有提供将其作为开箱即用的IEnumerable
使用的方法。它只有一个UnorderedItems
属性,这不是您想要的。此属性生成队列的内容而不消耗它们,并且不按特定顺序。不过,为PriorityQueue
类实现自定义GetConsumingEnumerable
方法很容易,如下所示:使用示例:
Partitioner.Create
+NoBuffering
是为了防止Parallel.ForEach
提前消耗元素并将其存储到缓冲区中,然后它已准备好处理它们。注意:此答案涉及问题中提出的简单场景,其中
PriorityQueue
在启动并行循环之前已完全填充。如果您想在循环运行时在队列中添加更多项目,则不能直接使用PriorityQueue
,原因有两个:如果你正在处理这样的场景,你可以看看这个问题:优先并发收集。
The
PriorityQueue<TElement, TPriority>
class does not offer a way to consume it as anIEnumerable
out of the box. It only has anUnorderedItems
property, which is not what you want. This property yields the contents of the queue without consuming them, and in no particular order. It is easy though to implement a customGetConsumingEnumerable
method for thePriorityQueue<TElement, TPriority>
class, like this:Usage example:
The intention of the
Partitioner.Create
+NoBuffering
is to prevent theParallel.ForEach
from consuming elements in advance and storing them into a buffer, before it's ready to process them.Note: This answer deals with the simple scenario presented in the question, where the
PriorityQueue<E,P>
is fully populated before starting the parallel loop. In case you want to add more items in the queue while the loop is running, you can't use directly aPriorityQueue<E,P>
for two reasons:If you are dealing with such a scenario, you could take a look at this question: Concurrent collection with priority.
如果您想在发布/订阅场景中实现优先级,
Parallel.ForEach
和PriorityQueue
都不是好的选择。OrderBy
来强加它。Partitioner.Create
并选择 禁用缓冲在高吞吐量网络和消息传递中,优先级处理是通过多个队列而不是单个优先级队列执行的。较高优先级的队列可以获得更多的资源,或者在较低优先级的队列之前得到处理。
每个优先级一个队列
这就是高度可扩展的消息传递系统的工作方式,因为它不需要任何同步来确定下一步要处理哪个项目。
实现此策略的一种方法是使用多个 ActionBlock 实例,每个实例具有不同数量的工作任务:
在这种情况下,
Process
使用模式匹配将消息路由到适当的 ActionBlockIf you want to implement priority in a pub/sub scenario, both
Parallel.ForEach
andPriorityQueue<T>
are bad choices.Parallel.ForEach
is built for data parallelism - processing a ton of in-memory data by partitioning it and using roughly one worker task per core to process each partition with minimal synchronization. A PriorityQueue isn't needed here - if you want a specific order you can impose it using eg PLINQ andOrderBy
.Parallel.ForEach
buffers items. This means that a new high priority item may have to wait for multiple low priority items. You'd have to usePartitioner.Create
with an option to disable bufferingIn high-throughput networking and messaging, priority processing is performed through multiple queues not a single priority queue. Higher-priority queues get more resources or are processed before lower priority queues.
One queue per priority class
This is how highly-scaleable messaging systems work, because it doesn't require any synchronization to determine which item to process next.
One way to implement this strategy would be to use multiple ActionBlock instances, each with a different number of worker tasks :
In this case
Process
uses pattern matching to route the message to the appropriate ActionBlock