重新配置 NServiceBus 丢失消息
这是我在 StackOverflow 上的第一篇文章。
我试图将 NServiceBus 放在如下所示的接口后面:
interface IMessagingService
{
IObserver<T> RegisterPublication<T>() where T : IPublishedMessage;
IObservable<T> RegisterSubscription<T>() where T : IPublishedMessage;
}
为了支持此接口而不强制用户进行显式 Start() 调用,我需要在每次通过调用方法来调用 RegisterSubscription 时重新配置总线看起来像这样:
IBus ConfigureBus()
{
Configure config = BuildConfiguration();
return config.CreateBus().Start();
}
我无法复制和粘贴真实的代码,但希望您能看到我需要一个启动总线来在调用 RegisterSubscription 后侦听消息。我需要在后续调用中添加订阅者。
我遇到的问题是每次调用ConfigureBus() 似乎都会启动更多线程并导致消息丢失。我的处理程序实例没有被调用,但 NServiceBus 记录它已完成对每条消息的处理。 通过四次调用 RegisterSubscription,我丢失了大约 40% 的消息。
是否有一种安全的方法可以动态重新配置和重新启动 IBus,而不会导致此问题? IBus.Subscribe 似乎不起作用。
This is my first post on StackOverflow.
I am trying to put NServiceBus behind an interface that looks like the following:
interface IMessagingService
{
IObserver<T> RegisterPublication<T>() where T : IPublishedMessage;
IObservable<T> RegisterSubscription<T>() where T : IPublishedMessage;
}
In order to support this interface without forcing users to have an explictit Start() call, I need to reconfigure the bus each time RegisterSubscription is invoked with a call to a method that looks like this:
IBus ConfigureBus()
{
Configure config = BuildConfiguration();
return config.CreateBus().Start();
}
I can't copy and paste the real code, but hopefully you can see that I need to have a started bus to listen for messages once RegisterSubscription is invoked. I need to add subscribers on subsequent invocations.
The problem that I am running into is that each call to ConfigureBus() seems to be spinning up more threads and results in messages being lost. My handler instances are not invoked, yet NServiceBus logs that it finished handling each of the messages.
With four calls to RegisterSubscription, I lose about 40% of my messages.
Is there a safe way to reconfigure and restart the IBus on the fly without causing this problem? The IBus.Subscribe doesn't seem to work.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
@bsf 的回答
我找到了解决方案。
CreateBus()
返回一个IStarableBus
。如果我保留对此的静态引用并在下次调用 CreateBus() 之前将其处置,则一切似乎都工作正常。@bsf's answer
I found a solution.
CreateBus()
returns anIStarableBus
. If I keep a static reference to that and dispose it before my next call toCreateBus()
, everything seems to work fine.