C# 中的并行管道

发布于 2024-10-10 20:04:19 字数 764 浏览 6 评论 0原文

我想在 C# 中创建并行管道。我已经声明了一个名为 IOperation 的接口:

public interface IOperation<Tin, Tout>
{
    BlockingCollection<Tout> BlockingCollection(IEnumerable<Tin> input);
}

现在我想编写一个类,它并行执行多个这些操作。 我对此表示:

public class Pipeline : IPipeline
{
    private List<IOperation<Object, Object>> operations = new List<IOperation<Object, Object>>();
    private List<BlockingCollection<Object>> buffers = new List<BlockingCollection<Object>>();
    public void Register(IOperation<Object, Object> operation)
    {
        operations.Add(operation);
    }

    public void Execute()
    {

    }
}

但我没有找到任何解决方案来保存操作和操作之间的缓冲区,因为它们都有不同的通用类型。有人有想法吗?

I want to create a parallel pipeline in C#. I have declaered an Interface named IOperation:

public interface IOperation<Tin, Tout>
{
    BlockingCollection<Tout> BlockingCollection(IEnumerable<Tin> input);
}

Now i want to write a class, which executes multiple of these operations parallel.
I bagan with this:

public class Pipeline : IPipeline
{
    private List<IOperation<Object, Object>> operations = new List<IOperation<Object, Object>>();
    private List<BlockingCollection<Object>> buffers = new List<BlockingCollection<Object>>();
    public void Register(IOperation<Object, Object> operation)
    {
        operations.Add(operation);
    }

    public void Execute()
    {

    }
}

But i don't find any solution to save the operations and the buffers between the operations, because they all have different generic types. Does anyone have an idea?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(4

单身情人 2024-10-17 20:04:19

您是否考虑过使用 TPL 中的 Parallel.ForEach?
任务并行库 (TPL) 是 .NET 4 中的一组公共类型和 API。

Have you considered using Parallel.ForEach from the TPL?
The Task Parallel Library (TPL) is a set of public types and APIs in .NET 4.

浮云落日 2024-10-17 20:04:19

目前还不清楚您的 Pipeline 是如何工作的。为什么要传递 BlockingCollections?为什么使用泛型,然后将 object 作为类型?

考虑使用一个使用 Action 类型的委托加载的 Pipeline,然后使用任务并行库创建并行执行这些操作的任务。

public void Register(Action operation)
    {
        operations.Add(operation);
    }

public void Execute()
    {
        foreach (var action in operations)
          Task.StartNew(operation);
    }

但这并不是真正的“管道”,它只是一组并行执行的操作。

管道通常会具有带有输入类型和输出类型的管道步骤。您可以通过创建类似 PipelineStep 之类的内容来处理此问题,并构造传入 Func 操作的每个管道步骤。在内部,每个管道步骤都可以使用输入 IEnumerable 并生成输出 IEnumerable,并且可以使用 Task 或更简单地使用并行 foreach 循环来完成此操作。

或者,您也许可以使用 TPL 的 Task.ContinueWith 方法将任务从输入到输出链接在一起。

It's not very clear how your Pipeline is meant to work. Why are you passing around BlockingCollections? Why are you using generics but then putting object in as the type?

Consider instead having a Pipeline that you load with deleggates of type Action and then use the task parallel library to create Tasks that execute those actions in parallel.

public void Register(Action operation)
    {
        operations.Add(operation);
    }

public void Execute()
    {
        foreach (var action in operations)
          Task.StartNew(operation);
    }

But that's not really a 'pipeline', it's just a bundle of operations that execute in parallel.

A pipeline would more normally have pipeline steps with an input type and an output type. You could handle this by creating something like PipelineStep<T,U> and you'd construct each pipeline step passing in a Func operation. Internally, each pipeline step could consume an input IEnumerable and produce an output IEnumerable and it could do this using Task or more simply using a parallel foreach loop.

Alternatively you could perhaps use the TPL's Task.ContinueWith method to chain the Tasks together from input to output.

不即不离 2024-10-17 20:04:19

Microsoft 有类似的东西 - TPL Dataflow 允许您在管道,对它们的缓冲和并行化方式进行细粒度控制。

与您的解决方案不同,它使用完全异步推送设计。它不使用 BlockingCollection(阻塞拉设计),如果您有很深的管道,速度会显着加快。

Microsoft has something exactly like this -- TPL Dataflow lets you define blocks in a pipeline, with fine-grained controls on how they are buffered and parallelized.

Unlike your solution, it uses a fully asynchronous push design. It does not use a BlockingCollection (a blocking pull design), and will be significantly faster for it if you have a deep pipeline.

还给你自由 2024-10-17 20:04:19

http://msdn.microsoft.com/en-us/ 有一篇很好的文章Library/ff963548.aspx 关于使用 BlockingCollection 的并行管道。

基本上每个步骤都应该有一个 BlockingCollection 类型的输出队列。它从上一步的输出队列中获取项目,并在处理完成后将它们添加到输出中。

There's a good article at http://msdn.microsoft.com/en-us/library/ff963548.aspx about parallel pipelines with BlockingCollection.

Basically each step should have an output queue of type BlockingCollection. It takes in items from the previous step's output queue, and adds them to it's output once done processing.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文