Masstransit UseInMemoryOutbox 无法与 MultiBus Saga 配合使用?

发布于 2025-01-09 16:56:02 字数 695 浏览 0 评论 0原文

我实现了一个自定义活动来支持多总线传奇。 但我发现一个问题,即在状态传奇保存到 Redis 之前,我收到了来自 SecondBus 的已发布消息。 我认为 UseInMemoryOutbox() 应该处理这个问题,否则我错了。

public class TestActivity : Activity<SagaState, IFirstBusRequest>
{
    private readonly ISecondBus _secondBus;

    public TestActivity(ISecondBus secondBus)
    {
        _secondBus = secondBus;
    }


    public async Task Execute(BehaviorContext<SagaState, IFirstBusRequest> context, Behavior<SagaState, IFirstBusRequest> next)
    {
        var endpoint = await _secondBus.GetSendEndpoint(new Uri($"queue:second-bus-request"));
        await endpoint.Send(new { }); // send immediately
    }      
}

I implemented a custom activity for supporting multibus saga.
But I found an issue that is I received a published message from SecondBus before the state saga saves to Redis.
I think UseInMemoryOutbox() should handle this or i wrong.

public class TestActivity : Activity<SagaState, IFirstBusRequest>
{
    private readonly ISecondBus _secondBus;

    public TestActivity(ISecondBus secondBus)
    {
        _secondBus = secondBus;
    }


    public async Task Execute(BehaviorContext<SagaState, IFirstBusRequest> context, Behavior<SagaState, IFirstBusRequest> next)
    {
        var endpoint = await _secondBus.GetSendEndpoint(new Uri(
quot;queue:second-bus-request"));
        await endpoint.Send(new { }); // send immediately
    }      
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

最笨的告白 2025-01-16 16:56:02

正确,这是一个已知的限制,因为另一个总线实例是完全独立的总线,并且不是接收总线上发件箱的一部分。

如果您需要涉及发件箱,请考虑在第一条总线上生成消息,并在该总线上有一个使用者,仅将该消息转发到另一条总线。

public class ForwardMessageConsumer<T> :
    IConsumer<T>
    where T : class
{
    readonly ISecondBus _bus;

    public ForwardMessageConsumer(ISecondBus bus)
    {
        _bus = bus;
    }

    public async Task Consume(ConsumeContext<T> context)
    {
        var messagePipe = new ForwardMessagePipe<T>(context);

        await _bus.Publish(context.Message, messagePipe, context.CancellationToken);
    }
}

然后,您可以只拥有一个带有这些消息转发器的端点:

x.AddConsumer<ForwardMessageConsumer<A>>()
    .Endpoint(e => e.Name = "second-bus-forwarder");
x.AddConsumer<ForwardMessageConsumer<B>>()
    .Endpoint(e => e.Name = "second-bus-forwarder");

然后,ConfigureEndpoints 会将这些使用者置于接收端点上,该接收端点会将这些消息转发到其他总线实例。

端点配置是将多个消费者放在同一个队列上,以避免多个队列只转发消息。假设将从活动中使用Publish

Correct, it's a known limitation since the other bus instance is a completely separate bus, and isn't part of the outbox on the receiving bus.

If you need to involve the outbox, consider producing the message on the first bus, and have a consumer on that bus which only forwards that message to the other bus.

public class ForwardMessageConsumer<T> :
    IConsumer<T>
    where T : class
{
    readonly ISecondBus _bus;

    public ForwardMessageConsumer(ISecondBus bus)
    {
        _bus = bus;
    }

    public async Task Consume(ConsumeContext<T> context)
    {
        var messagePipe = new ForwardMessagePipe<T>(context);

        await _bus.Publish(context.Message, messagePipe, context.CancellationToken);
    }
}

You could then just have an endpoint with those message forwarders on it:

x.AddConsumer<ForwardMessageConsumer<A>>()
    .Endpoint(e => e.Name = "second-bus-forwarder");
x.AddConsumer<ForwardMessageConsumer<B>>()
    .Endpoint(e => e.Name = "second-bus-forwarder");

And then, ConfigureEndpoints will put those consumers on a receive endpoint that will forward those messages to the other bus instance.

The endpoint configuration is to put multiple consumers on the same queue, to avoid having multiple queues that just forward messages. It's assumed that Publish would be used from the activity.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文