订阅自己发布的消息的 NServiceBus 主机

发布于 2024-09-27 07:09:09 字数 1969 浏览 5 评论 0原文

NServiceBus 使用的版本:2.0.0.1145

问题:

是否可以配置 NServiceBus 主机,使其消费(订阅)自己发布的消息?

答案:

这似乎是可能的,但在以下配置中,在尝试将订阅插入订阅存储时,它给了我一个事务死锁异常。 当您使用 DbSubscriptionStorage 和超过 1 个“NumberOfWorkerThreads”时,就会发生这种情况。

错误:

无法执行命令:
INSERT INTO 订阅(SubscriberEndpoint、MessageType)值(@p0、@p1)
System.Data.SqlClinet.SqlException:
事务在锁资源上与另一个进程发生死锁,并被选为死锁受害者。重新运行事务。

之后,NServiceBus 尝试断开连接但失败,因为还有一个事务仍在进行中并抛出 UnhandledException。

如何重现:

这是我的 App.Config:

<!-- Publishing Configuration -->
<MsmqTransportConfig InputQueue="test_publisher_output" ErrorQueue="test_error" NumberOfWorkerThreads="3" MaxRetries="5" />

<!-- Subscription Configuration -->
<UnicastBusConfig DistributorControlAddress="" DistributorDataAddress="" ForwardReceivedMessagesTo="">
    <MessageEndpointMappings>
        <add Messages="MessageAssembly" Endpoint="test_publisher_output" />
    </MessageEndpointMappings>
</UnicastBusConfig>

我的总线配置:

var bus = Configure.With()
    .Log4Net()
    .StructureMapBuilder(container)
    .XmlSerializer()
    .MsmqTransport()
        .IsTransactional(true)
        .PurgeOnStartup(false)
    .DBSubcriptionStorage(subscriptionDbProperties, true)
    .Sagas()
    .NHibernateSagaPersister(sagaDbProperties, true)
    .UnicastBus()
        .ImpersonateSender(false)
        .LoadMessageHandlers(First<GridInterceptingMessageHandler>
            .Then<SagaMessageHandler>())
    .CreateBus()
    .Start();

这是我的订阅和 saga db 的 dbProperties:

connection.provider      NHibernate.Connection.DriverConnectionProvider
connection.driver_class  NHibernate.Driver.SqlClientDriver
dialect                  NHibernate.Dialect.MsSql2005Dialect

只要我不将 NumberOfWorkerThreads 增加到 1 以上,一切都会正常。一切都高于该值它会抛出上述错误。

我希望我没有忘记任何事情。提前感谢您的帮助。

Used Version of NServiceBus: 2.0.0.1145

Question:

Is it possible to configure a NServiceBus Host in such a way that, it consumes (subcribes to) his own published messages?

Answer:

It seems possible, but in the following Configuration it gives me a Transaction deadlocked Exception while trying to insert Subscriptions into the SubscriptionStorage.
It happens when you use DbSubscriptionStorage and more than 1 "NumberOfWorkerThreads".

Error:

Could not execute command:
INSERT INTO Subscription (SubscriberEndpoint, MessageType) VALUES (@p0, @p1)
System.Data.SqlClinet.SqlException:
Transaction was deadlocked on lock resources with another process and has been chosen as the deadlock victim. Rerun the transaction.

After that NServiceBus tries to disconnect but fails because there is a transaction still in progress and throws an UnhandledException.

How to reproduce:

Here is my App.Config:

<!-- Publishing Configuration -->
<MsmqTransportConfig InputQueue="test_publisher_output" ErrorQueue="test_error" NumberOfWorkerThreads="3" MaxRetries="5" />

<!-- Subscription Configuration -->
<UnicastBusConfig DistributorControlAddress="" DistributorDataAddress="" ForwardReceivedMessagesTo="">
    <MessageEndpointMappings>
        <add Messages="MessageAssembly" Endpoint="test_publisher_output" />
    </MessageEndpointMappings>
</UnicastBusConfig>

My Bus-Configuration:

var bus = Configure.With()
    .Log4Net()
    .StructureMapBuilder(container)
    .XmlSerializer()
    .MsmqTransport()
        .IsTransactional(true)
        .PurgeOnStartup(false)
    .DBSubcriptionStorage(subscriptionDbProperties, true)
    .Sagas()
    .NHibernateSagaPersister(sagaDbProperties, true)
    .UnicastBus()
        .ImpersonateSender(false)
        .LoadMessageHandlers(First<GridInterceptingMessageHandler>
            .Then<SagaMessageHandler>())
    .CreateBus()
    .Start();

and here are my dbProperties for both the subscription and the saga db:

connection.provider      NHibernate.Connection.DriverConnectionProvider
connection.driver_class  NHibernate.Driver.SqlClientDriver
dialect                  NHibernate.Dialect.MsSql2005Dialect

Everything works fine as long as i don't increase the NumberOfWorkerThreads above 1. Everything higher than that and it will throw the above stated errors.

I hope i haven't forgotten anything. Thanks for your help in advance.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

深府石板幽径 2024-10-04 07:09:09

如果您希望同一进程处理已发布的消息,最好在 Bus.Publish() 之后执行 Bus.SendLocal()。 SendLocal() 方法将在本地队列上放置一条消息,并且您的内部处理程序将拾取该消息并进行处理。这将消除你的僵局,但保持相同的语义。

If you want the same process to handle a published message it would be better to do a Bus.SendLocal() after Bus.Publish(). The SendLocal() method will place a message on the local queue and your internal handler will pick it up and process it. This will get rid of your deadlock yet keep the same semantics.

别在捏我脸啦 2024-10-04 07:09:09

我真的会考虑重新设计这个组件。如果您想要 nservicebus 为您提供的稳定性,并且您已经分解了组件,因此处理的每个部分都在单独的消息处理程序中,请将每个消息处理程序放入具有单独队列的单独可执行文件中。如果这是不可能的,那么您还没有真正获得 nservicebus 的稳定性,因为您被其他一些代码片段锁定,在这种情况下您应该直接调用所需的函数。

如果您只是测试它们全部从一个队列运行,那么在测试时也将它们分开。实际上没有理由订阅您自己的消息 - 如果可能的话,要么将处理程序拆分为单独的端点,如果不可能,则直接调用函数。

I would really consider a re-design of this component. If you want the stability nservicebus gives you, and you've already broken the component down so each part of the processing is in a separate message handler, put each message handler in a separate executable with a separate queue. If that isn't possible, well then you haven't really got the stability of nservicebus as you are locked down by some other pieces of code in which case you should just directly call the required functions.

If you are just testing them all running off one queue, then just split them up when you test as well. There really is no reason to subscribe to your own messages - either split the handlers into seperate endpoints if possible, and if not possible then call the functions directly.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文