NServiceBus:如何让订阅者订阅多种消息类型(每种消息类型来自不同的队列)

发布于 2024-10-06 02:53:19 字数 1029 浏览 1 评论 0原文

我希望能够让我的订阅者处理两个不同的消息流。我期望每种消息类型都有一个 MSMQ 队列,但我不知道如何在 .config 文件的 MsmqTransportConfig 部分中指定多个 InputQueue。

这是我的订户的无 XML 配置:

        Configure.With(new[] { typeof(EventMessage), typeof(EventMessageHandler), typeof(NServiceBus.Unicast.Transport.CompletionMessage) })
            .CustomConfigurationSource(new UserConfigurationSource()
               .Register(() => new MsmqTransportConfig { InputQueue = "Subscriber1InputQueue", ErrorQueue = "error", NumberOfWorkerThreads = 1, MaxRetries = 5 }))
            .DefaultBuilder()
            .XmlSerializer()
            .MsmqTransport()
              .IsTransactional(true)
          .UnicastBus()
              .DoNotAutoSubscribe()
              .LoadMessageHandlers()
          .CreateBus()
          .Start();

编辑:我似乎从不同的人那里得到了不同的答案。谢谢大家!我想我已经找到了问题的答案,那就是:使用 NServiceBus 的进程(无论是发布者还是订阅者)只能接收单个队列上的消息。对我来说,这是一个不必要的限制,不幸的是 NServiceBus 以这种方式工作。我不想有多个进程来接收消息,也不想让它们全部进入同一个队列。如果特定消息处理程序出现问题,我只想看到该特定消息类型的错误队列大小增长。我认为这可以更好地了解系统中发生的情况。

I would like to be able to have my subscriber handling two different streams of messages. I am expecting there will be a MSMQ queue for each message type, but I don't see how to specify more than one InputQueue in the MsmqTransportConfig section in my .config file.

Here is the no-XML configuration for my subscriber:

        Configure.With(new[] { typeof(EventMessage), typeof(EventMessageHandler), typeof(NServiceBus.Unicast.Transport.CompletionMessage) })
            .CustomConfigurationSource(new UserConfigurationSource()
               .Register(() => new MsmqTransportConfig { InputQueue = "Subscriber1InputQueue", ErrorQueue = "error", NumberOfWorkerThreads = 1, MaxRetries = 5 }))
            .DefaultBuilder()
            .XmlSerializer()
            .MsmqTransport()
              .IsTransactional(true)
          .UnicastBus()
              .DoNotAutoSubscribe()
              .LoadMessageHandlers()
          .CreateBus()
          .Start();

EDIT: I seem to be getting different answers from different folks. Thanks everyone! I think I have the answer to my question and that is: A process using NServiceBus (whether publisher or subscriber) can only receive messages on a SINGLE queue. To me, this is an unnecessary limitation, and it's unfortunate that NServiceBus works this way. I don't want to have multiple processes for receiving messages, and I don't want to have them all going to the same queue. If there is a problem with a particular message handler, I would like to see only the error queue for that particular message type grow in size. I think it would allow for better visibility into what's going on in the system.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

疏忽 2024-10-13 02:53:19

尚未使用 no-xml 配置,但使用配置文件,它看起来像:

<MsmqTransportConfig InputQueue="WorkerQueueForCurrentService" ErrorQueue="ErrorQueue" NumberOfWorkerThreads="1" MaxRetries="5"/>

<UnicastBusConfig>
    <MessageEndpointMappings>
        <add Messages="AssemblyName1" Endpoint="PublisherQueue1" />
        <add Messages="AssemblyName2.Message1, AssemblyName2" Endpoint="PublisherQueue2" />
        <add Messages="AssemblyName2.Message3, AssemblyName2" Endpoint="PublisherQueue2" />
    </MessageEndpointMappings>
</UnicastBusConfig>

因此当前服务的工作队列是“WorkerQueueForCurrentService”,并且它订阅在队列“PublisherQueue1”和“发布者队列2”。我提供了一个订阅整个消息程序集的示例(请参阅添加消息第 1 行)以及给定消息程序集中的特定消息(请参阅添加消息第 2 行和第 3 行)。

克里斯蒂安·克里斯滕森的回答不正确。输入队列与使用 nservicebus 的每个服务相关。无论是发布者还是订阅者。发布者在输入队列上接收订阅通知,订阅者将输入队列设置为发送给发布者的订阅通知的目标队列。

如果您想以编程方式订阅像 mrnye 所说的消息,您将需要一个消息端点映射。因此,如果您执行bus.subscribe,则nservicebus会查看他的messageendpointmappings并尝试提取发布此消息的发布者队列名称。

messageendpointmappings 用于以下两者:
- 查找哪些消息在哪里发布

- 发送消息的目标队列,您使用bus.send()

希望这可以解决一些问题:-)

haven't use a no-xml configuration yet, but with the config file it would look like:

<MsmqTransportConfig InputQueue="WorkerQueueForCurrentService" ErrorQueue="ErrorQueue" NumberOfWorkerThreads="1" MaxRetries="5"/>

<UnicastBusConfig>
    <MessageEndpointMappings>
        <add Messages="AssemblyName1" Endpoint="PublisherQueue1" />
        <add Messages="AssemblyName2.Message1, AssemblyName2" Endpoint="PublisherQueue2" />
        <add Messages="AssemblyName2.Message3, AssemblyName2" Endpoint="PublisherQueue2" />
    </MessageEndpointMappings>
</UnicastBusConfig>

so your worker queue for the current service is "WorkerQueueForCurrentService" and it subscribes to different messages that are published on the queues "PublisherQueue1" and "PublisherQueue2". i have included a sample for the subscription of a whole messageassembly (see add messages line 1) and for specific messages in a given messageassembly (see add messages line 2 and line 3).

Kristian kristensens answer is not correct. the input queue is relevant for every service that uses nservicebus. regardless of whether it's an publisher or an subscriber. a publisher receives subscription notices on the input queue and the subscriber sets the input queue as the destination queue for a subscription notice that is sent to the publisher.

if you want to programmatically subscribe to messages like mrnye says you would need a messageendpointmapping. so if you do bus.subscribe nservicebus looks into his messageendpointmappings and tries to extract the publisher queue name where this message gets published.

messageendpointmappings are used for both:
- the lookup of which messages gets published where
and
- the destination queue where messages are sent which you bus.send()

hope this clears some things up :-)

半仙 2024-10-13 02:53:19

在 NServiceBus 中,所有消息都通过单个队列。它是队列:进程之间的 1:1 映射。因此,使用 DoNotAutoSubscribe(),您只需使用 app.config 中的映射手动订阅所需的消息,

例如,要在配置后使用该函数进行订阅,

_Bus.Subscribe<SomeMessage>();

我不记得消息映射的语法了,抱歉

In NServiceBus, all messages come through a single queue. It is a 1:1 mapping between queues:processes. So with your DoNotAutoSubscribe(), you just manually subscribe to the messages you want, with the mappings in your app.config

e.g., to subscribe use the function after your config

_Bus.Subscribe<SomeMessage>();

I can't remember the syntax for the message mapping sorry

傲性难收 2024-10-13 02:53:19

查看发布/订阅配置。 InputQueue 是为发布者元素而不是订阅者指定的。后者将其感兴趣的消息添加到 UnicastBusConfig 下的 MessageEndpointMappings 中。如果您对两个不同的流感兴趣,只需在 MessageEndpointMappings 下添加元素即可。

Have a look at Publish/Subscribe Configuration. The InputQueue is specified for the Publisher element and not for the subscriber. The latter adds the messages it's interested in the MessageEndpointMappings under UnicastBusConfig. If you're interested in two different streams just have to add elements under MessageEndpointMappings.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文