如何使用 BlockingCollection 通知生产者-消费者模式中的失败?
我正在尝试创建一个生命周期过程,用于批量插入数据库批量插入的传入消息。
新消息一次传入 1 条,间隔不规则。
我的解决方案类似于使用 BlockingCollection 的生产者-消费者模式。
消息通过各种事件自由添加到 BlockingCollection
中,并按固定时间间隔(5 秒)从 BlockingCollection
中批量取出以进行数据库插入。
然而,当前的解决方案是“即发即忘”。如果批量插入因任何原因失败,我需要一种方法让处理器通知失败的原始源,因为源包含恢复和重试的逻辑。
我是否应该使用特定的模式来实现我想要实现的目标? 非常感谢任何建议或帮助!
private BlockingCollection<Message> _messageCollection;
public async Task<bool> InsertMessage(Message message)
{
if (!_messageCollection.TryAdd(message)) return false;
// TODO: check message has been successfully processed, if not return false
// return false;
return true;
}
private void BulkInsertProcess()
{
Task consumerThread = Task.Factory.StartNew(async () =>
{
while (!_messageCollection.IsCompleted)
{
List<Message> messages = new List<Message>();
for (int i = 0; i < 50; i++)
{
if (_messageCollection.Any())
{
messages.Add(_messageCollection.Take());
}
else
{
break;
}
}
bool insertResult = await _database.BulkInsertMessages(messages);
// TODO: check result and inform the consumer if insert failed
await Task.Delay(5000);
}
});
}
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
您必须以某种方式将每个
Message
与专用的TaskCompletionSource
。您可能希望使第二个成为第一个的属性:...或者使第一个成为第二个的属性:
...或者创建一个包含两者的自定义类,或者使用
ValueTuple>
就像我一样已经在下面的示例中选择:我对您的代码进行了一些改进,以使其工作更顺畅:
Task.Run
而不是Task.Factory.StartNew
。前者理解异步委托。后面的则不会。TryTake
而不是任何
。Any
是IEnumerable
接口上的扩展方法,这些不保证线程安全。它很可能是线程安全的,但使用BlockingCollection
类的公共成员更安全、更高效。Task.Delay
,然后await
它。通过这种方式,您可以在后续批量插入操作之间获得稳定的间隔,这不依赖于操作本身的持续时间。如果您在一批中收到 50 条消息,您可能会考虑完全跳过
await delayTask
,因为这表明您的服务面临压力,并且消息正在队列中堆积。You will have to associate somehow each
Message
with a dedicatedTaskCompletionSource<bool>
. You might want to make the second a property of the first:...or make the first a property of the second:
...or create a custom class that contains both, or use a
ValueTuple<Message, TaskCompletionSource<bool>>
as I've chosen in the example below:I made some improvements to your code, to make it work more smoothly:
Task.Run
instead ofTask.Factory.StartNew
. The former understands async delegates. The later doesn't.TryTake
instead ofAny
. TheAny
is an extension method on theIEnumerable<T>
interface, and these are not guaranteed to be thread-safe. Most probably it's thread-safe, but using a public member of theBlockingCollection<T>
class is safer and more efficient.Task.Delay
before doing the bulk insert operation, andawait
it afterwards. This way you get a stable interval between subsequent bulk insert operations, that doesn't depend on the duration of the operations themselves.In case you got 50 messages in one batch you might consider skipping the
await delayTask
altogether, because this indicates that your service is under pressure, and the messages are piling up in the queue.