我正在使用RabbitMQ队列中的背景服务消费消息进行项目。
我有一个背景服务使用背景任务队列,例如 this 和在这里处理任务并行处理。
我想将消耗的缓冲消息存储并通过ID进行分组,并在指定的间隔或缓冲区大小后将它们发送到另一个RabbitMQ队列中。
我已经发布问题,解决方案将是实施通道< t>
。
但是工人不能以同一线程生产和消费。
因此,我考虑使用2名工人,一名工人从RabbitMQ中食用消息并将其存储在频道(写)和另一个工人中
该读取频道和小组项目将它们发送到另一个队列。
这是实现这一目标的最佳方法吗?
I'm working on project with a background service consume messages from Rabbitmq's queue.
I have a background service use background task queue like this and here to process task paralleling.
I would like to store in buffer messages consumed and group them by id and send them in another RabbitMq queue after an specified interval or size of buffer.
I already post question here, and the solution will be to implement Channel<T>
.
But a worker can't produce and consume in same thread.
So I thought about to use 2 workers, one to consume messages from RabbitMq and store them in Channel (Write) and another worker
that read Channel and group items to send them to another queue.
Is it the best way to achieve this ?
发布评论
评论(2)
事情要简单得多。而且没有泄漏,除非您严重滥用渠道。
背景服务或iHostedService不是工人或任务,它是主机启动时创建的类。当应用程序启动时,主机调用
startAsync
在其上,最后stopAsync
在最后。backgroundService
基类简化了aa位的内容,因此您可以将所有代码放在被覆盖的executeasync
中,而无需处理启动和停止。您可以在startAsync
或executeasync
中开始任意多个任务。您无需创建多个背景/托管服务。在托管服务中,
startAsync(concellationToken)
可以创建您的客户端,连接到RabbitMQ,然后注册事件处理程序以将任何用于处理消息的内容发布。 .NET提供多个处理事件流的方法,这两者都有:构造处理管道来处理这些事件的方法有很多方法
使用rx.net,按键进行分组,可以通过类似于此的查询进行缓冲和超时进行缓冲,借用从这个问题中,
因为您关心查询可以的分组密钥(ID)可以 使用rx.net调整为
gotcha
,它是针对事件流处理优化的,与其他选项不同,因此它是单线的默认!您可以指定可观察到的,订阅甚至各个步骤都会以不同的线程运行。.subscribeon(Scheduler.threadPool)
将在TreadPool线程上运行订阅,从事件中创建可观察的可观察到的,但有人要求明确的rx.net支持在兔子中,甚至提供了一个来源。不幸的是,该请求被拒绝,因为一些评论者想要RX.NET操作员,其他人想要数据流操作员,因此决定不包括特定的实现。
在该问题中使用实现:
您可以在
startAsync
中创建RX.NET订阅,然后将其放在stopAsync
中:Things are far simpler. And there's no leak, unless you seriously misuse Channels.
A BackgroundService or an IHostedService isn't a worker or Task, it's a class created by the Host when it starts. The host calls
StartAsync
on it when the application starts, andStopAsync
at the end. TheBackgroundService
base class simplifies things a a bit so you can put all your code in an overriddenExecuteAsync
without having to handle Start and Stop. You can start as many tasks as you want insideStartAsync
orExecuteAsync
. You don't need to create multiple background/hosted services.In a hosted service the
StartAsync(CancellationToken)
can create your client, connect to RabbitMq, and register an event handler to post to whatever you use to process the messages. .NET offers multiple ways to process streams of events , both :There are many ways to construct processing pipelines to process these events
With Rx.NET, grouping by key, buffering by count and timeout could be done with a query similar to this one, borrowed from this question
Since you care about the grouping key (the ID) the query could be adjusted to
A
gotcha
with Rx.NET is that it's optimized for event stream processing unlike the other options, so it's single threaded by default! You can specify that the observable, the subscriptions and even individual steps will run in different threads though..SubscribeOn(Scheduler.ThreadPool)
will run the subscription on threadpool threadsCreating an Observable from an event is already supported but someone asked for explicit Rx.NET support in RabbitMQ and even provided a source. Unfortunately, the request was rejected because some of the commenters wanted Rx.NET operators, others wanted Dataflow operators, so it was decided to not include a specific implementation.
Using the implementation in that issue:
You could create an Rx.NET subscription in
StartAsync
and dispose it inStopAsync
:看来您正在搜索这样的linq操作员:
然后,您可以使用它来消耗
channel&gt; message&gt;
在具有相同id> id
的消息组中:问题在于,
groupByuntil
异步枚举序列的操作员不存在。 system.linq.async 软件包包含功能,这些功能远不及此级别。精致。您期望在 groupByuntil 确实存在可观察到的序列(System.Reactive)的操作员,并具有此签名:与
to Observable
/结合使用此操作员很容易使用此操作员。 toasyncenumerable
转换器(system.linq.async),以实现理想的操作员,但存在问题。让我们首先查看实现,然后再讨论问题:问题是
source> source
序列的枚举不是由结果序列的枚举驱动的。取而代之的是,源
序列是由后台消费者以最大速度列举的,并且消耗的消息在隐藏的队列中被缓冲。您无法控制该队列的大小,也无法知道其当前大小(至少没有直接)。当您将to Observable
转换器附加到它时,这就是枚举序列发生的情况。这有多大的问题?这取决于
source
序列中包含的消息总数,这些消息的发射频率以及所得顺序的消费者在消耗每个组后必须执行的工作量。在极端情况下,无限序列的速度比消费者的速度更快,隐藏的队列将越来越大,从source
和处理过程中消费消息之间的延迟将会更长和更长的时间,最终,迟早或更晚的应用程序将使用OutofMemoryException
崩溃。不幸的是,正确实施
groupByuntil
操作员并不是微不足道的。您可以看到,执行最大间隔连续批次之间的策略?”>这里iasyncencenumerable&lt; t&gt;
序列(buffer
操作员在这种情况下)。我不会尝试在此处实现groupByuntil
,因为这是一项重大任务,您可能不需要它。也许您的情况不会受到to Observable
/toasyncenumerable
实现的缺点的影响。如果确实如此,您可能会尝试自己实施它,如果您被卡住了,则可以发布一个有关麻烦的新问题,我们也许可以提供帮助。It seems that you are searching for a LINQ operator like this one:
You could then use it to consume a
Channel<Message>
in groups of messages with the sameId
like this:The problem is that the
GroupByUntil
operator for asynchronous enumerable sequences does not exist. The System.Linq.Async package contains functionality that is nowhere near this level of sophistication. That's something that you'd expect to find in the System.Reactive package instead. This is the project that reached nearly 100% maturity, before the whole repository stopped evolving for unknown reasons a couple of years ago. Currently aGroupByUntil
operator do exist for observable sequences (System.Reactive), with this signature:It is tempting to use this operator, in combination with the
ToObservable
/ToAsyncEnumerable
converters (System.Linq.Async), in order to implement the desirable operator, but there is a problem. Let's first see the implementation, and talk about the problem later:The problem is that the enumeration of the
source
sequence is not driven by the enumeration of the resulting sequence. Instead, thesource
sequence is enumerated by a background consumer at the maximum speed possible, and the consumed messages are buffered in a hidden queue. You have no control regarding the size of this queue, and you have no way to know its current size (at least not directly). This is what happens with an enumerable sequence, when you attach theToObservable
converter to it.How big of a problem is this? It depends on the total number of messages contained in the
source
sequence, on the frequency that these messages are emitted, and on the amount of work that the consumer of the resulting sequence has to do after consuming each group. In the extreme case of an infinite sequence that emits at a faster pace than the pace of the consumer, the hidden queue will grow larger and larger, the latency between consuming a message from thesource
and processing it will get longer and longer, and eventually, sooner or later the application will crash with anOutOfMemoryException
.Unfortunately implementing properly the
GroupByUntil
operator is not trivial. You can see here an example of what it takes for a proper implementation of a LINQ operator forIAsyncEnumerable<T>
sequences (aBuffer
operator in that case). I won't attempt to implement theGroupByUntil
here because it's a major undertaking, and you might not need it. Maybe your scenario is not affected too much by the shortcomings of theToObservable
/ToAsyncEnumerable
implementation. In case it does, you might try to implement it yourself, and if you get stuck you could post a new question about your troubles, and we might be able to help.