NServiceBus - 如何为接收者订阅的每种消息类型获取单独的队列?
我有以下情况:
因此,接收者订阅两种事件:eventA 和 eventB。 NServiceBus 为接收者(Receiver)创建队列,并将 eventA 和 eventB 类型的消息放置到同一个队列中。问题是,我是否可以将 NServiceBus 配置为对接收者的每种类型的事件使用单独的队列(ReceiverEventA 和 ReceiverEventB)?或者我可以在单个进程中有两个接收器(并且每个接收器单独的队列)。 事实是,EventA 的处理时间比 EventB 长得多,而且它们是独立的 - 因此,如果它们位于单独的队列中,则可以同时处理它们。
更新:如果我采用这样的幼稚方法,接收器无法以空引用异常启动:
private static IBus GetBus<THandler, TEvent>()
{
var bus = Configure.With(new List<Type>
{
typeof(THandler),
typeof(TEvent),
typeof(CompletionMessage)
})
.Log4Net()
.DefaultBuilder()
.XmlSerializer()
.MsmqTransport()
.IsTransactional(true)
.PurgeOnStartup(false)
.UnicastBus()
.LoadMessageHandlers()
.ImpersonateSender(false);
bus.Configurer.ConfigureProperty<MsmqTransport>(x => x.InputQueue, "Queue" + typeof(THandler).Name);
return bus.CreateBus().Start();
}
[STAThread]
static void Main()
{
Busses = new List<IBus>
{
GetBus<ItemEventHandlerA, ItemEventA>(),
GetBus<ItemEventHandlerB, ItemEventB>()
};
Application.EnableVisualStyles();
Application.SetCompatibleTextRenderingDefault(false);
Application.Run(new TestForm());
}
异常堆栈跟踪是:
位于 C:\Users\User\Documents\Visual Studio 2010\Projects\NServiceBusTest2\NServiceBusTest2.WinFormsReceiver\Program.cs 中的 NServiceBusTest2.WinFormsReceiver.Program.GetBusTHandler、TEvent:第 57 行
在 C:\Users\User\Documents\Visual Studio 2010\Projects\NServiceBusTest2\NServiceBusTest2.WinFormsReceiver\Program.cs 中的 NServiceBusTest2.WinFormsReceiver.Program.Main() 处:第 26 行
在 System.AppDomain._nExecuteAssembly(RuntimeAssembly 程序集,String[] args)
在 System.AppDomain.ExecuteAssembly(字符串 assemblyFile,证据 assemblySecurity,字符串 [] args) 在 Microsoft.VisualStudio.HostingProcess.HostProc.RunUsersAssembly()
在 System.Threading.ThreadHelper.ThreadStart_Context(对象状态)
在System.Threading.ExecutionContext.Run(ExecutionContextexecutionContext,ContextCallback回调,对象状态,布尔ignoreSyncCtx)
在 System.Threading.ExecutionContext.Run(ExecutionContextexecutionContext,ContextCallback 回调,对象状态)
在 System.Threading.ThreadHelper.ThreadStart()
I have a following situation:
So, receiver subscribes to two kind of events: eventA and eventB. NServiceBus creates queue for receiver (Receiver) and places messages of type eventA and eventB to to same queue. Question is, if I can configure NServiceBus to use separate queues (ReceiverEventA and ReceiverEventB) for each type of event for receiver? Or can I have two receivers in single process (and each receiver separate queue).
Thing is, that EventA takes much longer to process than EventB, and they are independent - so if they would be in separate queues, they could be processed concurrently.
Update: If i'm going with naive approach like this, receiver fails to start with null reference exception:
private static IBus GetBus<THandler, TEvent>()
{
var bus = Configure.With(new List<Type>
{
typeof(THandler),
typeof(TEvent),
typeof(CompletionMessage)
})
.Log4Net()
.DefaultBuilder()
.XmlSerializer()
.MsmqTransport()
.IsTransactional(true)
.PurgeOnStartup(false)
.UnicastBus()
.LoadMessageHandlers()
.ImpersonateSender(false);
bus.Configurer.ConfigureProperty<MsmqTransport>(x => x.InputQueue, "Queue" + typeof(THandler).Name);
return bus.CreateBus().Start();
}
[STAThread]
static void Main()
{
Busses = new List<IBus>
{
GetBus<ItemEventHandlerA, ItemEventA>(),
GetBus<ItemEventHandlerB, ItemEventB>()
};
Application.EnableVisualStyles();
Application.SetCompatibleTextRenderingDefault(false);
Application.Run(new TestForm());
}
Exception stack trace is:
at NServiceBusTest2.WinFormsReceiver.Program.GetBusTHandler,TEvent in C:\Users\User\Documents\Visual Studio 2010\Projects\NServiceBusTest2\NServiceBusTest2.WinFormsReceiver\Program.cs:line 57
at NServiceBusTest2.WinFormsReceiver.Program.Main() in C:\Users\User\Documents\Visual Studio 2010\Projects\NServiceBusTest2\NServiceBusTest2.WinFormsReceiver\Program.cs:line 26
at System.AppDomain._nExecuteAssembly(RuntimeAssembly assembly, String[] args)
at System.AppDomain.ExecuteAssembly(String assemblyFile, Evidence assemblySecurity, String[] args)
at Microsoft.VisualStudio.HostingProcess.HostProc.RunUsersAssembly()
at System.Threading.ThreadHelper.ThreadStart_Context(Object state)
at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean ignoreSyncCtx)
at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state)
at System.Threading.ThreadHelper.ThreadStart()
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
我开始写一个更冗长的解释,解释为什么你不应该有两个单独的进程,以支持我对 @stephenl 发布的答案的评论。 NServiceBus 基本上为每个进程强制执行一个输入队列。
所以通常情况下,您会有两个独立的进程。 EventAService 将从 QForEventA 读取 EventA,在与 EventBService 不同的进程中,EventBService 将从 QForEventB 读取 EventB。
然后我更仔细地查看了您的示例代码,并意识到您正在使用 Windows 窗体应用程序。呃!现在我觉得自己有点傻。当然你只能有一个进程。想象一下,如果启动 Outlook 后您还必须启动 MailService.exe 才能实际接收邮件!
因此,问题实际上是 Windows 窗体应用程序中 EventA 和 EventB 的处理时间截然不同。我没有任何方法知道这项工作是什么,但这对于客户端应用程序来说有点奇怪。
大多数时候,它是一项需要进行大量处理的服务,并且客户端收到的任何消息都相当轻量 - 类似于“实体 X 已更改,因此下次您需要直接从数据库加载它”处理它只是从缓存中删除一些东西——当然不是一个长时间运行的过程。
但听起来无论出于什么原因,您的客户端中的处理需要更长的时间,这在 WinForms 应用程序中是令人担忧的,因为担心 UI 线程编组、阻塞 UI 线程等。
我建议您使用一个系统,在该系统中您不需要执行所有操作在 WinForms 应用程序的 NServiceBus 处理程序中进行处理,但将其封送到其他地方。将其作为工作项或类似的东西扔到线程池中。或者将长时间运行的项目放入队列中,并让后台线程以自己的速度处理这些项目。这样,NServiceBus 消息处理程序所做的就是“是的,收到消息了。非常感谢”。那么,是否一次处理一个 NServiceBus 消息并不重要。
更新:
在评论中,OP询问如果在NServiceBus完成接收工作后将工作扔到ThreadPool中会发生什么。当然,这就是这种方法的另一面 - NServiceBus 完成后,你就得靠自己了 - 如果它在 ThreadPool 中失败,那么你就可以创建自己的重试逻辑,或者只是捕获异常,警告 WinForms 应用程序的用户,然后让它消亡。
显然这是最佳的,但它引出了一个问题 - 您到底想在 WinForms 应用程序中完成什么类型的工作?如果 NServiceBus 提供的稳健性(自动重试和有害消息的错误队列)是这个难题的关键部分,那么为什么它首先出现在 WinForms 应用程序中呢?此逻辑可能需要卸载到 WinForms 应用程序外部的服务,其中为每种消息类型设置单独的队列(通过部署单独的服务)变得很容易,然后只有影响 UI 的部分才会发送回 WinForms 客户端。当发送给 UI 的消息仅影响 UI 时,处理它们几乎总是微不足道的,并且您不需要卸载到 ThreadPool 来跟上。
直接针对您在 GitHub Issue 中描述的情况,这听起来确实像这种情况每种消息类型的单独进程正是规定的解决方案的场景。我听说部署和管理这么多流程听起来令人难以承受,但我认为您会发现它并不像听起来那么糟糕。甚至还有一些优势 - 例如,如果您必须重新部署与 Amazon.com 的连接器,您只需重新部署该端点,而不会导致任何其他端点出现停机,也无需担心可能会向其他端点引入错误。
为了简化部署,希望您使用持续集成服务器,然后检查诸如 DropkicK 有助于编写部署脚本。就我个人而言,我最喜欢的部署工具是老式的 Robocopy。即使像 1) NET STOP ServiceName、2) ROBOCOPY、3) NET START ServiceName 这样简单的命令也非常有效。
I started writing a more drawn-out explanation of why you should NOT have two separate processes, in support of my comment to the answer @stephenl posted. NServiceBus basically enforces one input queue per process.
So normally, you would have two separate processes. EventAService would read EventA from QForEventA, in a separate process from EventBService which would read EventB from QForEventB.
Then I looked more carefully at your example code and realized you were in a Windows Forms app. Duh! Now I feel a bit foolish. Of course you can only have one process. Imagine if after launching Outlook you also had to launch MailService.exe to actually get mail!
So the problem is really that you have drastically different processing times for EventA and EventB within your Windows Forms app. I don't have any way of knowing what that work is, but this is a little bit odd for a client application.
Most of the time it's a service that has big processing to do, and any message received by a client is fairly lightweight - something along the lines of "Entity X has changed so next time you'll need to load it direct from the database" and processing it involves just dropping something out of cache - certainly not a long-running process.
But it sounds like for whatever reason the processing in YOUR client takes longer, which in a WinForms app is concerning because of concerns about UI thread marshalling, blocking the UI thread, etc.
I would suggest a system where you don't do all the processing in the NServiceBus handler in your WinForms app, but marshal it off somewhere else. Throw it over to the ThreadPool as a work item, or something like that. Or put the long-running items into a Queue and have a background thread crunch on those at its own speed. That way all the NServiceBus message handler does is "Yep, got the message. Thank you very much." Then it shouldn't really matter if the NServiceBus messages are processed one at a time.
Update:
In the comments, the OP asks what happens if you throw the work to the ThreadPool after NServiceBus finsihes receiving it. That is, of course, the flip side to this approach - after NServiceBus is done, you're on your own - if it fails in the ThreadPool, then it's up to you to create your own retry logic, or just catch the exception, alert the user of the WinForms app, and let it die.
Obviously that's optimal, but it begs the question - just exactly what sort of work are you trying to accomplish in the WinForms app? If the robustness that NServiceBus offers (the automatic retries and error queue for poison messages) is a critical piece of this puzzle, then why is it going on in a WinForms application in the first place? This logic probably needs to be offloaded to a service external to the WinForms application, where having separate queues for each message type (by deploying separate services) becomes easy, and then only the pieces affecting the UI are ever sent back to the WinForms client. When the messages to the UI only affect the UI, processing them is almost always trivial, and you won't have any need for offloading to the ThreadPool to keep up.
Speaking directly to the situation you described in the GitHub Issue, this really does sound like the kind of scenario where separate processes for each message type are exactly the prescribed solution. I hear that it sounds overwhelming to deploy and manage that many processes, but I think you'll find that it's not as bad as it sounds. There are even advantages - if you have to redeploy your connector with Amazon.com, for instance, you only have to redeploy THAT endpoint, with no downtime for any of the others, or any worries that bugs may have been introduced to the others.
To ease deployment, hopefully you're using a continuous integration server, and then check in to tools like DropkicK that help deployments to be scripted. Personally, my favorite deployment tool is good old Robocopy. Even something as simple as 1) NET STOP ServiceName, 2) ROBOCOPY, 3) NET START ServiceName is quite effective.
您不需要,您只需要为接收器中的每个事件创建一个处理程序。例如,
如果您想分隔队列,那么您需要将每个处理程序放入单独的端点中。这有道理吗?
我还认为你的图表需要一些工作。我确信这是一个标签问题,但我认为主机是实际的发布者,而不是客户端端点(您将其命名为发布者 A 和发布者 B)。
更新:这是一个简单的例子,但说明了我的意思 - https://github.com/ sliedig/sof9411638
我扩展了 NServiceBus 附带的全双工示例,以包含三个附加端点,其中两个端点分别订阅服务器发布的事件,一个端点处理两者。 HTH。
You don't need to, you just need to create a handler for each event in the receiver. For example
If you want to separate queues then you need to put each handler into a separate endpoint. Does that make sense?
Also I think your diagram needs a bit of work. Its a labelling thing I'm sure, but the way I see it is that Host is the actual publisher, not the client endpoints (which you've name Publisher A and Publisher B).
UPDATE: Its a simple example but illustrates what I mean - https://github.com/sliedig/sof9411638
I've extended the Full Duplex sample that comes with NServiceBus to include three additional endpoints, two of which subscribe to events being published by the server respectively and one endpoint that handles both. HTH.