使用两个背景服务生产并消耗渠道以缓存消息

发布于 2025-02-13 06:18:39 字数 880 浏览 0 评论 0 原文

我正在使用RabbitMQ队列中的背景服务消费消息进行项目。

我有一个背景服务使用背景任务队列,例如 this 在这里处理任务并行处理。

我想将消耗的缓冲消息存储并通过ID进行分组,并在指定的间隔或缓冲区大小后将它们发送到另一个RabbitMQ队列中。

我已经发布问题,解决方案将是实施通道< t>

但是工人不能以同一线程生产和消费。

因此,我考虑使用2名工人,一名工人从RabbitMQ中食用消息并将其存储在频道(写)和另一个工人中 该读取频道和小组项目将它们发送到另一个队列。

这是实现这一目标的最佳方法吗?

I'm working on project with a background service consume messages from Rabbitmq's queue.

I have a background service use background task queue like this and here to process task paralleling.

I would like to store in buffer messages consumed and group them by id and send them in another RabbitMq queue after an specified interval or size of buffer.

I already post question here, and the solution will be to implement Channel<T>.

But a worker can't produce and consume in same thread.

So I thought about to use 2 workers, one to consume messages from RabbitMq and store them in Channel (Write) and another worker
that read Channel and group items to send them to another queue.

Is it the best way to achieve this ?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

唔猫 2025-02-20 06:18:39

事情要简单得多。而且没有泄漏,除非您严重滥用渠道。

背景服务或iHostedService不是工人或任务,它是主机启动时创建的类。当应用程序启动时,主机调用 startAsync 在其上,最后 stopAsync 在最后。 backgroundService 基类简化了aa位的内容,因此您可以将所有代码放在被覆盖的 executeasync 中,而无需处理启动和停止。您可以在 startAsync executeasync 中开始任意多个任务。您无需创建多个背景/托管服务。

在托管服务中, startAsync(concellationToken)可以创建您的客户端,连接到RabbitMQ,然后注册事件处理程序以将任何用于处理消息的内容发布。 .NET提供多个处理事件流的方法,这两者都有:

EventingBasicConsumer? _consumer;

void OnReceived(object sender,BasicDeliverEventArgs ea)
{
   var body = ea.Body.ToArray();
   //Process that message
}

protected override async Task StartAsync(CancellationToken stoppingToken)
{

   _consumer=StartRabbitAndCreateConsumer();
   _consumer.Received += OnReceived;
}

protected override async Task StopAsync(CancellationToken stoppingToken)
{
    _consumer.Received -= OnReceived;
    _consumer.Dispose();
}

public void Dispose()
{
    CleanupRabbitMqEtc();
}

构造处理管道来处理这些事件的方法有很多方法

  • channels 是“只是”螺纹安全,异步(即非阻滞),订单保存,多录,多阅读器,有限或无界队列。使用可配置的溢出行为(例如,丢弃最古老或最新消息)。它们与GO的渠道非常相似,并以类似的方式用于构建处理任务的管道。您必须编写从频道读取的代码,处理消息并写入下一个代码,通常使用任务。
  • dataflow 库是一个提供块的较高级别库,它同时包含输入/输出缓冲区和处理。块使用工人任务来处理其输入缓冲区的消息,并将其发布到其输出缓冲区。可以将块链接到构造处理管道或网格
  • 反应性扩展是专门为使用IOBServable&lt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;专门构建的。来源,LINQ在IEnumerable上的工作方式相同。它们也可用于创建管道,但既包含基于消息的操作员,又包含基于时间的操作员。

使用rx.net,按键进行分组,可以通过类似于此的查询进行缓冲和超时进行缓冲,借用从这个问题中

eventSource
    .GroupBy(e => e.GroupingKey)
    .SelectMany(group => group.Buffer(TimeSpan.FromSeconds(60), 100))
    .Subscribe(list => SendToStorage(list));

因为您关心查询可以的分组密钥(ID)可以 使用rx.net调整为

eventSource
    .GroupBy(e => e.ID)
    .SelectMany(group => new {
                         ID=group.Key,
                         Items=group.Buffer(TimeSpan.FromSeconds(60)), 100)
               })
    .Subscribe(batch => SendToQueue(batch.ID,batch.Items));

gotcha ,它是针对事件流处理优化的,与其他选项不同,因此它是单线的默认!您可以指定可观察到的,订阅甚至各个步骤都会以不同的线程运行。 .subscribeon(Scheduler.threadPool)将在TreadPool线程上运行订阅,

eventSource
    .GroupBy(e => e.ID)
    .SelectMany(group => new {
                         ID=group.Key,
                         Items=group.Buffer(TimeSpan.FromSeconds(60)), 100)
               })
    .SubcribeOn(Scheduler.ThreadPool)
    .Subscribe(batch => SendToQueue(batch.ID,batch.Items));

从事件中创建可观察的可观察到的,但有人要求明确的rx.net支持在兔子中,甚至提供了一个来源。不幸的是,该请求被拒绝,因为一些评论者想要RX.NET操作员,其他人想要数据流操作员,因此决定不包括特定的实现。

在该问题中使用实现:

public IObservable<DisposableValue<T>> Receive<T>(string exchangeName, string routingKey)
{
    if (exchangeName == null)
    {
        throw new ArgumentNullException(nameof(exchangeName));
    }

    if (routingKey == null)
    {
        throw new ArgumentNullException(nameof(routingKey));
    }

    var queueName = this.model.QueueDeclare().QueueName;
    this.model.QueueBind(queueName, exchangeName, routingKey);

    using (var subscription = new Subscription(model, queueName, false))
    {
        var consumer = new EventingBasicConsumer(this.model);

        var observable = Observable
            .FromEventPattern<BasicDeliverEventArgs>(
                x => consumer.Received += x,
                x => consumer.Received -= x)
            .Select(x => new DisposableValue<T>(
                Deserialize<T>(x.EventArgs.Body),
                () => this.model.BasicAck(x.EventArgs.DeliveryTag, false)));

        this.model.BasicConsume(queueName, false, consumer);

        return observable;
    }
}

public class DisposableValue<T> : IDisposable
{
    private readonly Action disposeAction;

    public DisposableValue(T value, Action disposeAction)
    {
        this.disposeAction = this.disposeAction ??
            throw new ArgumentNullException(nameof(DisposableValue<T>.disposeAction));
        this.Value = value;
    }

    public T Value { get; }

    public void Dispose() => this.disposeAction();
}

您可以在 startAsync 中创建RX.NET订阅,然后将其放在 stopAsync 中:

protected override async Task StartAsync(CancellationToken stoppingToken)
{
    _sub=Receive<Foo>("Exchange", "RoutingKey")
            .GroupBy(e => e.ID)
            .SelectMany(group => new {
                         ID=group.Key,
                         Items=group.Buffer(TimeSpan.FromSeconds(60)), 100)
               })
            .SubcribeOn(Scheduler.ThreadPool)
            .Subscribe(batch => SendToQueue(batch.ID,batch.Items));
}

protected override async Task StopAsync(CancellationToken stoppingToken)
{
    _sub.Dispose();
}

Things are far simpler. And there's no leak, unless you seriously misuse Channels.

A BackgroundService or an IHostedService isn't a worker or Task, it's a class created by the Host when it starts. The host calls StartAsync on it when the application starts, and StopAsync at the end. The BackgroundService base class simplifies things a a bit so you can put all your code in an overridden ExecuteAsync without having to handle Start and Stop. You can start as many tasks as you want inside StartAsync or ExecuteAsync. You don't need to create multiple background/hosted services.

In a hosted service the StartAsync(CancellationToken) can create your client, connect to RabbitMq, and register an event handler to post to whatever you use to process the messages. .NET offers multiple ways to process streams of events , both :

EventingBasicConsumer? _consumer;

void OnReceived(object sender,BasicDeliverEventArgs ea)
{
   var body = ea.Body.ToArray();
   //Process that message
}

protected override async Task StartAsync(CancellationToken stoppingToken)
{

   _consumer=StartRabbitAndCreateConsumer();
   _consumer.Received += OnReceived;
}

protected override async Task StopAsync(CancellationToken stoppingToken)
{
    _consumer.Received -= OnReceived;
    _consumer.Dispose();
}

public void Dispose()
{
    CleanupRabbitMqEtc();
}

There are many ways to construct processing pipelines to process these events

  • Channels are "just" a thread-safe, asynchronous (ie non-blocking), order-preserving, multi-writer, multi-reader, bounded or unbounded queue. With configurable overflow behavior (eg, discard the oldest or newest message). They're quite similar to Go's channels and used in similar ways to construct pipelines of processing tasks. You have to write the code that reads from a channel, processes the message and writes to the next one, typically using tasks.
  • The Dataflow library is a higher level library that provides blocks that incorporate both input/output buffers and processing. Blocks use worker tasks to process messages from their input buffers and publish them to their output buffers. Blocks can be linked to construct a processing pipeline or mesh
  • Reactive Extensions are specifically built for event stream processing using LINQ-like operators over IObservable<> sources, the same way LINQ works over IEnumerable. They can be used to create pipelines too, but contain both message based and time-based operators.

With Rx.NET, grouping by key, buffering by count and timeout could be done with a query similar to this one, borrowed from this question

eventSource
    .GroupBy(e => e.GroupingKey)
    .SelectMany(group => group.Buffer(TimeSpan.FromSeconds(60), 100))
    .Subscribe(list => SendToStorage(list));

Since you care about the grouping key (the ID) the query could be adjusted to

eventSource
    .GroupBy(e => e.ID)
    .SelectMany(group => new {
                         ID=group.Key,
                         Items=group.Buffer(TimeSpan.FromSeconds(60)), 100)
               })
    .Subscribe(batch => SendToQueue(batch.ID,batch.Items));

A gotcha with Rx.NET is that it's optimized for event stream processing unlike the other options, so it's single threaded by default! You can specify that the observable, the subscriptions and even individual steps will run in different threads though. .SubscribeOn(Scheduler.ThreadPool) will run the subscription on threadpool threads

eventSource
    .GroupBy(e => e.ID)
    .SelectMany(group => new {
                         ID=group.Key,
                         Items=group.Buffer(TimeSpan.FromSeconds(60)), 100)
               })
    .SubcribeOn(Scheduler.ThreadPool)
    .Subscribe(batch => SendToQueue(batch.ID,batch.Items));

Creating an Observable from an event is already supported but someone asked for explicit Rx.NET support in RabbitMQ and even provided a source. Unfortunately, the request was rejected because some of the commenters wanted Rx.NET operators, others wanted Dataflow operators, so it was decided to not include a specific implementation.

Using the implementation in that issue:

public IObservable<DisposableValue<T>> Receive<T>(string exchangeName, string routingKey)
{
    if (exchangeName == null)
    {
        throw new ArgumentNullException(nameof(exchangeName));
    }

    if (routingKey == null)
    {
        throw new ArgumentNullException(nameof(routingKey));
    }

    var queueName = this.model.QueueDeclare().QueueName;
    this.model.QueueBind(queueName, exchangeName, routingKey);

    using (var subscription = new Subscription(model, queueName, false))
    {
        var consumer = new EventingBasicConsumer(this.model);

        var observable = Observable
            .FromEventPattern<BasicDeliverEventArgs>(
                x => consumer.Received += x,
                x => consumer.Received -= x)
            .Select(x => new DisposableValue<T>(
                Deserialize<T>(x.EventArgs.Body),
                () => this.model.BasicAck(x.EventArgs.DeliveryTag, false)));

        this.model.BasicConsume(queueName, false, consumer);

        return observable;
    }
}

public class DisposableValue<T> : IDisposable
{
    private readonly Action disposeAction;

    public DisposableValue(T value, Action disposeAction)
    {
        this.disposeAction = this.disposeAction ??
            throw new ArgumentNullException(nameof(DisposableValue<T>.disposeAction));
        this.Value = value;
    }

    public T Value { get; }

    public void Dispose() => this.disposeAction();
}

You could create an Rx.NET subscription in StartAsync and dispose it in StopAsync :

protected override async Task StartAsync(CancellationToken stoppingToken)
{
    _sub=Receive<Foo>("Exchange", "RoutingKey")
            .GroupBy(e => e.ID)
            .SelectMany(group => new {
                         ID=group.Key,
                         Items=group.Buffer(TimeSpan.FromSeconds(60)), 100)
               })
            .SubcribeOn(Scheduler.ThreadPool)
            .Subscribe(batch => SendToQueue(batch.ID,batch.Items));
}

protected override async Task StopAsync(CancellationToken stoppingToken)
{
    _sub.Dispose();
}
你又不是我 2025-02-20 06:18:39

看来您正在搜索这样的linq操作员:

/// <summary>
/// Groups the elements of an asynchronous sequence according to a specified
/// key selector function and comparer. The groupTimeSpan and groupCount parameters
/// are used to control the lifetime of groups. A group is emitted when the
/// specified time-span has elapsed after receiving the first element of the group,
/// or when a group contains the specified number of elements. Multiple groups
/// with the same key can be emitted by the resulting sequence.
/// </summary>
public static IAsyncEnumerable<IGrouping<TKey, TSource>>
    GroupByUntil<TSource, TKey>(
        this IAsyncEnumerable<TSource> source,
        Func<TSource, TKey> keySelector,
        TimeSpan groupTimeSpan,
        int groupCount);

然后,您可以使用它来消耗 channel&gt; message&gt; 在具有相同 id> id 的消息组中:

var groupedMessages = channel.Reader.ReadAllAsync()
    .GroupByUntil(msg => msg.Id, TimeSpan.FromSeconds(15), groupCount: 50);
await foreach (IGrouping<int, Message> messages in groupedMessages)
{
    // Process group of messages
}

问题在于, groupByuntil 异步枚举序列的操作员不存在。 system.linq.async 软件包包含功能,这些功能远不及此级别。精致。您期望在 groupByuntil 确实存在可观察到的序列(System.Reactive)的操作员,并具有此签名:

// Groups the elements of an observable sequence according to a specified key
// selector function and comparer. A duration selector function is used to control
// the lifetime of groups. When a group expires, it receives an OnCompleted
// notification. When a new element with the same key value as a reclaimed group
// occurs, the group will be reborn with a new lifetime request.
public static IObservable<IGroupedObservable<TKey, TSource>>
    GroupByUntil<TSource, TKey, TDuration>(
    this IObservable<TSource> source,
    Func<TSource, TKey> keySelector,
    Func<IGroupedObservable<TKey, TSource>, IObservable<TDuration>> durationSelector);

to Observable /结合使用此操作员很容易使用此操作员。 toasyncenumerable 转换器(system.linq.async),以实现理想的操作员,但存在问题。让我们首先查看实现,然后再讨论问题:

// Caution: Hidden queue
public static IAsyncEnumerable<IGrouping<TKey, TSource>>
    GroupByUntil<TSource, TKey>(
        this IAsyncEnumerable<TSource> source,
        Func<TSource, TKey> keySelector,
        TimeSpan groupTimeSpan,
        int groupCount,
        IEqualityComparer<TKey> keyComparer = null)
{
    ArgumentNullException.ThrowIfNull(source);
    ArgumentNullException.ThrowIfNull(keySelector);
    if (groupTimeSpan < TimeSpan.Zero)
        throw new ArgumentOutOfRangeException(nameof(groupTimeSpan));
    if (groupCount < 1) throw new ArgumentOutOfRangeException(nameof(groupCount));
    keyComparer ??= EqualityComparer<TKey>.Default;
    return source
        .ToObservable()
        .GroupByUntil(keySelector, g => g.Skip(groupCount - 1).Select(_ => 0L)
            .Amb(Observable.Timer(groupTimeSpan)), keyComparer)
        .SelectMany(g => g.ToList().Select(x => x.GroupBy(_ => g.Key, keyComparer)))
        .ToAsyncEnumerable();
}

问题是 source> source 序列的枚举不是由结果序列的枚举驱动的。取而代之的是,序列是由后台消费者以最大速度列举的,并且消耗的消息在隐藏的队列中被缓冲。您无法控制该队列的大小,也无法知道其当前大小(至少没有直接)。当您将 to Observable 转换器附加到它时,这就是枚举序列发生的情况。

这有多大的问题?这取决于 source 序列中包含的消息总数,这些消息的发射频率以及所得顺序的消费者在消耗每个组后必须执行的工作量。在极端情况下,无限序列的速度比消费者的速度更快,隐藏的队列将越来越大,从 source 和处理过程中消费消息之间的延迟将会更长和更长的时间,最终,迟早或更晚的应用程序将使用 OutofMemoryException 崩溃。

不幸的是,正确实施 groupByuntil 操作员并不是微不足道的。您可以看到,执行最大间隔连续批次之间的策略?”>这里 iasyncencenumerable&lt; t&gt; 序列( buffer 操作员在这种情况下)。我不会尝试在此处实现 groupByuntil ,因为这是一项重大任务,您可能不需要它。也许您的情况不会受到 to Observable / toasyncenumerable 实现的缺点的影响。如果确实如此,您可能会尝试自己实施它,如果您被卡住了,则可以发布一个有关麻烦的新问题,我们也许可以提供帮助。

It seems that you are searching for a LINQ operator like this one:

/// <summary>
/// Groups the elements of an asynchronous sequence according to a specified
/// key selector function and comparer. The groupTimeSpan and groupCount parameters
/// are used to control the lifetime of groups. A group is emitted when the
/// specified time-span has elapsed after receiving the first element of the group,
/// or when a group contains the specified number of elements. Multiple groups
/// with the same key can be emitted by the resulting sequence.
/// </summary>
public static IAsyncEnumerable<IGrouping<TKey, TSource>>
    GroupByUntil<TSource, TKey>(
        this IAsyncEnumerable<TSource> source,
        Func<TSource, TKey> keySelector,
        TimeSpan groupTimeSpan,
        int groupCount);

You could then use it to consume a Channel<Message> in groups of messages with the same Id like this:

var groupedMessages = channel.Reader.ReadAllAsync()
    .GroupByUntil(msg => msg.Id, TimeSpan.FromSeconds(15), groupCount: 50);
await foreach (IGrouping<int, Message> messages in groupedMessages)
{
    // Process group of messages
}

The problem is that the GroupByUntil operator for asynchronous enumerable sequences does not exist. The System.Linq.Async package contains functionality that is nowhere near this level of sophistication. That's something that you'd expect to find in the System.Reactive package instead. This is the project that reached nearly 100% maturity, before the whole repository stopped evolving for unknown reasons a couple of years ago. Currently a GroupByUntil operator do exist for observable sequences (System.Reactive), with this signature:

// Groups the elements of an observable sequence according to a specified key
// selector function and comparer. A duration selector function is used to control
// the lifetime of groups. When a group expires, it receives an OnCompleted
// notification. When a new element with the same key value as a reclaimed group
// occurs, the group will be reborn with a new lifetime request.
public static IObservable<IGroupedObservable<TKey, TSource>>
    GroupByUntil<TSource, TKey, TDuration>(
    this IObservable<TSource> source,
    Func<TSource, TKey> keySelector,
    Func<IGroupedObservable<TKey, TSource>, IObservable<TDuration>> durationSelector);

It is tempting to use this operator, in combination with the ToObservable/ToAsyncEnumerable converters (System.Linq.Async), in order to implement the desirable operator, but there is a problem. Let's first see the implementation, and talk about the problem later:

// Caution: Hidden queue
public static IAsyncEnumerable<IGrouping<TKey, TSource>>
    GroupByUntil<TSource, TKey>(
        this IAsyncEnumerable<TSource> source,
        Func<TSource, TKey> keySelector,
        TimeSpan groupTimeSpan,
        int groupCount,
        IEqualityComparer<TKey> keyComparer = null)
{
    ArgumentNullException.ThrowIfNull(source);
    ArgumentNullException.ThrowIfNull(keySelector);
    if (groupTimeSpan < TimeSpan.Zero)
        throw new ArgumentOutOfRangeException(nameof(groupTimeSpan));
    if (groupCount < 1) throw new ArgumentOutOfRangeException(nameof(groupCount));
    keyComparer ??= EqualityComparer<TKey>.Default;
    return source
        .ToObservable()
        .GroupByUntil(keySelector, g => g.Skip(groupCount - 1).Select(_ => 0L)
            .Amb(Observable.Timer(groupTimeSpan)), keyComparer)
        .SelectMany(g => g.ToList().Select(x => x.GroupBy(_ => g.Key, keyComparer)))
        .ToAsyncEnumerable();
}

The problem is that the enumeration of the source sequence is not driven by the enumeration of the resulting sequence. Instead, the source sequence is enumerated by a background consumer at the maximum speed possible, and the consumed messages are buffered in a hidden queue. You have no control regarding the size of this queue, and you have no way to know its current size (at least not directly). This is what happens with an enumerable sequence, when you attach the ToObservable converter to it.

How big of a problem is this? It depends on the total number of messages contained in the source sequence, on the frequency that these messages are emitted, and on the amount of work that the consumer of the resulting sequence has to do after consuming each group. In the extreme case of an infinite sequence that emits at a faster pace than the pace of the consumer, the hidden queue will grow larger and larger, the latency between consuming a message from the source and processing it will get longer and longer, and eventually, sooner or later the application will crash with an OutOfMemoryException.

Unfortunately implementing properly the GroupByUntil operator is not trivial. You can see here an example of what it takes for a proper implementation of a LINQ operator for IAsyncEnumerable<T> sequences (a Buffer operator in that case). I won't attempt to implement the GroupByUntil here because it's a major undertaking, and you might not need it. Maybe your scenario is not affected too much by the shortcomings of the ToObservable/ToAsyncEnumerable implementation. In case it does, you might try to implement it yourself, and if you get stuck you could post a new question about your troubles, and we might be able to help.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文