如何使用 BlockingCollection 通知生产者-消费者模式中的失败?

发布于 2025-01-15 10:05:11 字数 1666 浏览 3 评论 0 原文

我正在尝试创建一个生命周期过程,用于批量插入数据库批量插入的传入消息。 新消息一次传入 1 条,间隔不规则。 我的解决方案类似于使用 BlockingCollection 的生产者-消费者模式。 消息通过各种事件自由添加到 BlockingCollection 中,并按固定时间间隔(5 秒)从 BlockingCollection 中批量取出以进行数据库插入。

然而,当前的解决方案是“即发即忘”。如果批量插入因任何原因失败,我需要一种方法让处理器通知失败的原始源,因为源包含恢复和重试的逻辑。

我是否应该使用特定的模式来实现我想要实现的目标? 非常感谢任何建议或帮助!

        private BlockingCollection<Message> _messageCollection;

        public async Task<bool> InsertMessage(Message message)
        {
            if (!_messageCollection.TryAdd(message)) return false;

            // TODO: check message has been successfully processed, if not return false
            // return false;

            return true;
        }

        private void BulkInsertProcess()
        {
            Task consumerThread = Task.Factory.StartNew(async () =>
            {
                while (!_messageCollection.IsCompleted)
                {
                    List<Message> messages = new List<Message>();

                    for (int i = 0; i < 50; i++)
                    {
                        if (_messageCollection.Any())
                        {
                            messages.Add(_messageCollection.Take());
                        }
                        else
                        {
                            break;
                        }
                    }

                    bool insertResult = await _database.BulkInsertMessages(messages);

                    // TODO: check result and inform the consumer if insert failed

                    await Task.Delay(5000);
                }
            });
        }

I'm trying to create a lifetime process that batches incoming messages for DB bulk insert.
The new message is coming in 1 at a time, in an irregular interval.
My solution to this would be something like the producer-consumer pattern using BlockingCollection.
Messages are added freely into the BlockingCollection by various events and are taken out of BlockingCollection in bulk for DB insert in regular intervals, 5 seconds.

However, the current solution is fire-and-forget. If the bulk insert failed for any reason, I need a way for the processor to notify the original sources of the failure, because the source contains the logic to recover and retry.

Is there a specific pattern I should be using for what I'm trying to achieve?
Any suggestion or help is much appreciated!

        private BlockingCollection<Message> _messageCollection;

        public async Task<bool> InsertMessage(Message message)
        {
            if (!_messageCollection.TryAdd(message)) return false;

            // TODO: check message has been successfully processed, if not return false
            // return false;

            return true;
        }

        private void BulkInsertProcess()
        {
            Task consumerThread = Task.Factory.StartNew(async () =>
            {
                while (!_messageCollection.IsCompleted)
                {
                    List<Message> messages = new List<Message>();

                    for (int i = 0; i < 50; i++)
                    {
                        if (_messageCollection.Any())
                        {
                            messages.Add(_messageCollection.Take());
                        }
                        else
                        {
                            break;
                        }
                    }

                    bool insertResult = await _database.BulkInsertMessages(messages);

                    // TODO: check result and inform the consumer if insert failed

                    await Task.Delay(5000);
                }
            });
        }

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

稍尽春風 2025-01-22 10:05:11

您必须以某种方式将每个 Message 与专用的 TaskCompletionSource。您可能希望使第二个成为第一个的属性:

public class Message
{
    public TaskCompletionSource<bool> TCS { get; } = new();
}

...或者使第一个成为第二个的属性:

private class Entry : TaskCompletionSource<bool>
{
    public Message Message { get; init; }
}

...或者创建一个包含两者的自定义类,或者使用 ValueTuple> 就像我一样已经在下面的示例中选择:

private BlockingCollection<(Message, TaskCompletionSource<bool>)> _queue;

public Task<bool> InsertMessage(Message message)
{
    var tcs = new TaskCompletionSource<bool>(
        TaskCreationOptions.RunContinuationsAsynchronously);
    if (!_queue.TryAdd((message, tcs)))
        return Task.FromResult(false);
    return tcs.Task;
}

private void BulkInsertProcess()
{
    Task consumerTask = Task.Run(async () =>
    {
        while (!_queue.IsCompleted)
        {
            var delayTask = Task.Delay(5000);

            var batch = new List<(Message, TaskCompletionSource<bool>)>();
            while (batch.Count < 50 && _queue.TryTake(out var entry))
                batch.Add(entry);

            if (batch.Count > 0)
            {
                var messages = batch.Select(e => e.Item1).ToList();
                bool insertResult = await _database.BulkInsertMessages(messages);

                foreach (var (message, tcs) in batch)
                    tcs.SetResult(insertResult);
            }
            await delayTask;
        }
    });
}

我对您的代码进行了一些改进,以使其工作更顺畅:

  1. Task.Run 而不是 Task.Factory.StartNew。前者理解异步委托。后面的则不会
  2. TryTake 而不是 任何AnyIEnumerable 接口上的扩展方法,这些不保证线程安全。它很可能是线程安全的,但使用 BlockingCollection 类的公共成员更安全、更高效。
  3. 在执行批量插入操作之前创建Task.Delay,然后await它。通过这种方式,您可以在后续批量插入操作之间获得稳定的间隔,这不依赖于操作本身的持续时间。

如果您在一批中收到 50 条消息,您可能会考虑完全跳过 await delayTask,因为这表明您的服务面临压力,并且消息正在队列中堆积。

You will have to associate somehow each Message with a dedicated TaskCompletionSource<bool>. You might want to make the second a property of the first:

public class Message
{
    public TaskCompletionSource<bool> TCS { get; } = new();
}

...or make the first a property of the second:

private class Entry : TaskCompletionSource<bool>
{
    public Message Message { get; init; }
}

...or create a custom class that contains both, or use a ValueTuple<Message, TaskCompletionSource<bool>> as I've chosen in the example below:

private BlockingCollection<(Message, TaskCompletionSource<bool>)> _queue;

public Task<bool> InsertMessage(Message message)
{
    var tcs = new TaskCompletionSource<bool>(
        TaskCreationOptions.RunContinuationsAsynchronously);
    if (!_queue.TryAdd((message, tcs)))
        return Task.FromResult(false);
    return tcs.Task;
}

private void BulkInsertProcess()
{
    Task consumerTask = Task.Run(async () =>
    {
        while (!_queue.IsCompleted)
        {
            var delayTask = Task.Delay(5000);

            var batch = new List<(Message, TaskCompletionSource<bool>)>();
            while (batch.Count < 50 && _queue.TryTake(out var entry))
                batch.Add(entry);

            if (batch.Count > 0)
            {
                var messages = batch.Select(e => e.Item1).ToList();
                bool insertResult = await _database.BulkInsertMessages(messages);

                foreach (var (message, tcs) in batch)
                    tcs.SetResult(insertResult);
            }
            await delayTask;
        }
    });
}

I made some improvements to your code, to make it work more smoothly:

  1. Task.Run instead of Task.Factory.StartNew. The former understands async delegates. The later doesn't.
  2. TryTake instead of Any. The Any is an extension method on the IEnumerable<T> interface, and these are not guaranteed to be thread-safe. Most probably it's thread-safe, but using a public member of the BlockingCollection<T> class is safer and more efficient.
  3. Create the Task.Delay before doing the bulk insert operation, and await it afterwards. This way you get a stable interval between subsequent bulk insert operations, that doesn't depend on the duration of the operations themselves.

In case you got 50 messages in one batch you might consider skipping the await delayTask altogether, because this indicates that your service is under pressure, and the messages are piling up in the queue.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文