我们正在使用 BlockingCollection 在实时应用程序中实现生产者-消费者模式:
BlockingCollection<T> collection = new BlockingCollection<T>();
CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
// Starting up consumer
Task.Run(() => consumer(this.cancellationTokenSource.Token));
…
void Producer(T item)
{
collection.Add(item);
}
…
void consumer()
{
while (true)
{
var item = this.blockingCollection.Take(token);
process (item);
}
}
可以肯定的是,这是实际生产代码的非常简化的版本。
有时,当应用程序负载较重时,我们会观察到消费部分落后于生产部分。由于应用程序逻辑非常复杂,它涉及通过网络与其他应用程序以及SQL数据库的交互。许多地方都可能出现延误;它们可能发生在对 process() 的调用中,这原则上可以解释为什么消费部分可能很慢。
除了上述所有考虑因素之外,使用 BlockingCollection
是否有一些固有的东西可以解释这种现象? .Net中是否有更有效的选择来实现生产者-消费者模式?
We are using BlockingCollection
to implement producer-consumer pattern in a real-time application:
BlockingCollection<T> collection = new BlockingCollection<T>();
CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
// Starting up consumer
Task.Run(() => consumer(this.cancellationTokenSource.Token));
…
void Producer(T item)
{
collection.Add(item);
}
…
void consumer()
{
while (true)
{
var item = this.blockingCollection.Take(token);
process (item);
}
}
To be sure, this is a very simplified version of the actual production code.
Sometimes when the application is under heavy load, we observe that the consuming part is lagging behind the producing part. Since the application logic is very complex, it involves interaction with other applications over network, as well as with SQL databases. Delays could be occurring in many places; they could occur in the calls to process(), which might in principle explain why the consuming part can be slow.
All the above considerations aside, is there something inherent in using BlockingCollection
, which could explain this phenomenon? Are there more efficient options in .Net to realise producer-consumer pattern?
发布评论
评论(2)
首先,BockingCollection不是生产者/消费者场景的最佳选择。至少有两个更好的选项(数据流,频道),选择取决于实际应用程序方案 - 问题所缺少。
也可以使用 async流和iasyncenmerable。
异步流
在这种情况下,生产者可以是异步迭代器。消费者将收到iAsyncencencenterable,并在其完成之前进行迭代。它还可以产生自己的IASYNCENCENUMETARE输出,可以将其传递给管道中的下一个方法:
生产者可以是:
和消费者:
在这种情况下,没有缓冲,并且生产者在消费者之前不能发出新的消息消耗当前的。消费者可以通过并行处理。最后, system.linq.async 为我们提供linq操作编写例如:
dataFlow-action block
dataFlow 可以使用块来构建整个处理管道,每个块从上一个接收消息(数据),对其进行处理并将其传递到下一个块。大多数块具有输入,并且在适当的输出缓冲区。每个块都使用单个工作任务,但可以配置为更多。但是,应用程序代码不必处理任务。
在最简单的情况下,单个 action block 可以处理一个或多个生产者发布给它的消息,用作消费者:
在此示例中除了目前正在处理的人之外。
BufferBlock作为生产者/消费者队列
BufferBlock是一个不活动块,该块由其他块用作缓冲区。它可以用作异步生产者/消费者集合,如如何:实现生产者 - 消费者数据流模式。在这种情况下,代码需要明确接收消息。线程取决于开发人员。 :
.NET 6中的平行消费者
可以通过使用
等待
和aceiveallasync
:并同时使用
parallel.foreachAsyAsyAsync来简化 消费者。
:默认情况下,
Parallel.ForeachAsync
将使用核心channels
频道与GO的频道相似。它们是专门为生产者/消费者场景构建的,并允许在比DataFlow库较低的级别上创建管道。如果今天构建了数据流库,则将其建立在频道之上。
只能通过其读者或作家界面直接访问通道。这是有意的,可以轻松地对方法进行管道。一个非常常见的模式是生产方法来创建其拥有的通道并仅返回ChannelReader。消费方法接受该读者作为输入。这样,生产者可以控制频道的寿命,而不必担心其他生产商是否正在写它。
使用频道,生产者看起来像这样:
不寻常的
.continuewith(t =&gt; writer.triter.trycomplete(t.exception));
用于向作者发出信号。这也将表示读者也可以完成。这样,从一种方法传播到另一种方法。 任何例外都会传播writer.trycomplete(t.exception))
都不会阻止或执行任何重要工作,因此 。这意味着无需在工作任务上使用等待
,这将通过重新造成任何例外来使代码复杂化。一种消费方法只需要
ChannelReader
作为源。一种方法可以从一个通道读取并使用生产者模式将新数据发布到另一个数据:
First of all, BlockingCollection isn't the best choice for producer/consumer scenarios. There are at least two better options (Dataflow, Channels) and the choice depends on the actual application scenario - which is missing from the question.
It's also possible to create a producer/consumer pipeline without a buffer, by using async streams and IAsyncEnmerable.
Async Streams
In this case, the producer can be an async iterator. The consumer will receive the IAsyncEnumerable and iterate over it until it completes. It could also produce its own IAsyncEnumerable output, which can be passed to the next method in the pipeline:
The producer can be :
And the consumer :
There's no buffering in this case, and the producer can't emit a new message until the consumer consumes the current one. The consumer can be parallelized with Parallel.ForEachAsync. Finally, the System.Linq.Async provides LINQ operations to async streams, allowing us to write eg :
Dataflow - ActionBlock
Dataflow blocks can be used to construct entire processing pipelines, with each block receiving a message (data) from the previous one, processing it and passing it to the next block. Most blocks have input and where appropriate output buffers. Each block uses a single worker task but can be configured to use more. The application code doesn't have to handle the tasks though.
In the simplest case, a single ActionBlock can process messages posted to it by one or more producers, acting as a consumer:
In this example the block uses 4 worker tasks and will block if more than 5 items are waiting in its input buffer, beyond those currently being processed.
BufferBlock as a producer/consumer queue
A BufferBlock is an inactive block that's used as a buffer by other blocks. It can be used as an asynchronous producer/consumer collection as shown in How to: Implement a producer-consumer dataflow pattern. In this case, the code needs to receive messages explicitly. Threading is up to the developer. :
Parallelized consumer
In .NET 6 the consumer can be simplified by using
await foreach
andReceiveAllAsync
:And processed concurrently using
Parallel.ForEachAsync
:By default
Parallel.ForeachAsync
will use as many worker tasks as there are coresChannels
Channels are similar to Go's channels. They are built specifically for producer/consumer scenarios and allow creating pipelines at a lower level than the Dataflow library. If the Dataflow library was built today, it would be built on top of Channels.
A channel can't be accessed directly, only through its Reader or Writer interfaces. This is intentional, and allows easy pipelining of methods. A very common pattern is for a producer method to create an channel it owns and return only a ChannelReader. Consuming methods accept that reader as input. This way, the producer can control the channel's lifetime without worrying whether other producers are writing to it.
With channels, a producer would look like this :
The unusual
.ContinueWith(t=>writer.TryComplete(t.Exception));
is used to signal completion to the writer. This will signal readers to complete as well. This way completion propagates from one method to the next. Any exceptions are propagated as wellwriter.TryComplete(t.Exception))
doesn't block or perform any significant work so it doesn't matter what thread it executes on. This means there's no need to useawait
on the worker task, which would complicate the code by rethrowing any exceptions.A consuming method only needs the
ChannelReader
as source.A method may read from one channel and publish new data to another using the producer pattern :
您可以查看使用 dataFlow库。我不确定它是否比阻止票据更具性能。正如其他人所说的那样,不能保证您的消费速度比农产品更快,因此总是有可能落后。
You could look at using the Dataflow library. I'm not sure if it is more performant than a BlockingCollection. As others have said, there is no guarantee that you can consume faster than produce, so it is always possible to fall behind.