使用 MassTransit 时的工作单位

发布于 2024-10-31 08:49:05 字数 228 浏览 5 评论 0原文

我正在寻找一种方法来连接消息处理管道,并在消费者完成处理某些消息后执行一些工作。 我的目的是打开一个新会话并启动一个事务(可以在 IoC 容器中完成),然后立即处理和处置它们。

在 NServiceBus 中,我将使用 IMessageModule 接口进行挂钩。有类似的东西吗?实际上,处理处理程序也可以做到这一点,但由于我使用 StructureMap 作为 ObjectBuilder,Release 方法什么也不做。

I'm looking for a way to hook on the message handling pipeline and do some work after a consumer finishes handling some message.
My intention is to open a new session and start a transaction(could be done the IoC Container) before handling and disposing them right after it.

In NServiceBus I would use the IMessageModule interface to hook in. does have anything similar to it? Actually disposing the the handler would also do it for, but as I'm using StructureMap as the ObjectBuilder, the Release method just does nothing.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

夏日落 2024-11-07 08:49:05

您可以注册一个拦截器,以便在每条消息被消费之前和之后调用。例如:

        LocalBus = ServiceBusConfigurator.New(x =>
            {
                x.ReceiveFrom("loopback://localhost/mt_client");

                x.BeforeConsumingMessage(() => { _before.Set(); });
                x.AfterConsumingMessage(() => { _after.Set(); });
            });

查看 MassTransit.Tests 项目中的 MessageInterceptor_Specs.cs 文件以进行工作单元测试。

You can register an interceptor to be called before and after each message is consumed. As an example:

        LocalBus = ServiceBusConfigurator.New(x =>
            {
                x.ReceiveFrom("loopback://localhost/mt_client");

                x.BeforeConsumingMessage(() => { _before.Set(); });
                x.AfterConsumingMessage(() => { _after.Set(); });
            });

Take a look at the MessageInterceptor_Specs.cs file in the MassTransit.Tests project for a working unit test.

意中人 2024-11-07 08:49:05

我也经历过同样的挑战,这就是我的应对方式。
我们有一个 IUnitOfWork
和一个 ITransaction:使用 Commit() 和 Rollback()
我们为 TransactionalOperation 添加了一个类:ITransaction 来添加对非数据库内容的事务行为支持。
IUnitOfWork.Commit() 迭代可能已添加到其中的 TransactionalOperations 列表。

现在将总线连接到我们的系统:
添加了 IBus 来包装外部总线
实施了 MassTransitBusGateway:IBus
然后将总线绑定到工作单元:
实现的UnitOfWorkBus:IBus(装饰器)
- 此装饰器对 Publish() 进行任何调用以了解工作单元并将其添加为 TransactionalOperation,以便其执行延迟到 UnitOfWork.Commit()

这样我们就可以抽象具体总线,从而避免添加 MassTransit 依赖项在几个项目上,(只有客户真正需要它)
并将事务行为添加到其操作中。

I went through the same challenge and this is how I went about it.
We had an IUnitOfWork
and an ITransaction: with Commit() and Rollback()
we added a class for TransactionalOperation : ITransaction to added support transactional behaviour to non-db stuff.
The IUnitOfWork.Commit() iterates through a list of TransactionalOperations that might have been added to it.

Now to tie the bus to our system:
Added an IBus to wrap external bus
Implemented a MassTransitBusGateway: IBus
then to tie the bus to unit of work:
Implemented UnitOfWorkBus: IBus (decorator)
- this decorator makes any calls to Publish() to be aware of the unit of work and add it as TransactionalOperation so that Its execution is delayed until the UnitOfWork.Commit()

This way we are abstracting the concrete bus, thus avoiding adding MassTransit dependencies on several projects, (only the clients really needs it)
and added transactional behavior to its operations.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文