如何为 MassTransit sagas 配置重试

发布于 2025-01-11 14:55:18 字数 11115 浏览 6 评论 0原文

我遇到了 MassTransit sagas 的并发问题。

我目前正在使用以下流程开发 POC:

  1. 一个线程生成 100 个事件,这些事件按顺序发布到 MassTransit。
  2. 当实例化 saga 时,它会向 MassTransit 发布另一个事件。
  3. 新事件由执行某些业务逻辑的消费者拾取,并将两个结果事件之一发布到 MassTransit。
  4. 步骤 3 中产生的事件会触发传奇中的状态更改

我有时会遇到类似这样的异常 Microsoft.EntityFrameworkCore.DbUpdateConcurrencyException: 数据库操作预计会影响 1 行,但实际上影响了 0 行;自步骤 4 中加载实体以来,数据可能已被修改或删除。,并且状态更改不会持久。

这是业务逻辑代码:

public interface IInitialSagaEvent : CorrelatedBy<Guid> { }
public interface IExternalCheckRequest : CorrelatedBy<Guid> { }
public interface IExternalCheckOk : CorrelatedBy<Guid> { }
public interface IExternalCheckNotOk : CorrelatedBy<Guid> { }

public class MySaga : SagaStateMachineInstance
{
    public Guid CorrelationId { get; set; }
    public string CurrentState { get; set; }
    public byte[] RowVersion { get; set; }
}

public class MyStateMachine : MassTransitStateMachine<MySaga>
{
    public MyStateMachine()
    {
        InstanceState(instance => instance.CurrentState);
        Initially(
            When(InitialSagaEvent)
                .ThenAsync(context => context.GetPayload<ConsumeContext>().Publish<IExternalCheckRequest>(new { context.Instance.CorrelationId }))
                .TransitionTo(AwaitingExternalCheck)
        );

        During(AwaitingExternalCheck,
            Ignore(InitialSagaEvent),
            When(ExternalCheckOk)
                .TransitionTo(CheckedOk),
            When(ExternalCheckNotOk)
                .TransitionTo(CheckedNotOk)
        );

        During(CheckedOk,
            When(InitialSagaEvent)
                .ThenAsync(context => context.GetPayload<ConsumeContext>().Publish<IExternalCheckRequest>(new { context.Instance.CorrelationId }))
                .TransitionTo(AwaitingExternalCheck)
        );

        During(CheckedNotOk,
            When(InitialSagaEvent)
                .ThenAsync(context => context.GetPayload<ConsumeContext>().Publish<IExternalCheckRequest>(new { context.Instance.CorrelationId }))
                .TransitionTo(AwaitingExternalCheck)
        );
    }
    public Event<IInitialSagaEvent> InitialSagaEvent { get; private set; }
    public Event<IExternalCheckOk> ExternalCheckOk { get; private set; }
    public Event<IExternalCheckNotOk> ExternalCheckNotOk { get; private set; }
    public State AwaitingExternalCheck { get; private set; }
    public State CheckedOk { get; private set; }
    public State CheckedNotOk { get; private set; }
}

public class ExternalCheckRequestConsumer : IConsumer<IExternalCheckRequest>
{
    private readonly IExternalChecker externalChecker;

    public ExternalCheckRequestConsumer(IExternalChecker externalChecker)
    {
        this.externalChecker = externalChecker;
    }

    public async Task Consume(ConsumeContext<IExternalCheckRequest> context)
    {
        var ok = await externalChecker.PerformCheck(context.Message, context.CancellationToken);
        if (ok)
        {
            await context.Publish<IExternalCheckOk>(new { context.Message.CorrelationId }, context.CancellationToken);
        }
        else
        {
            await context.Publish<IExternalCheckNotOk>(new { context.Message.CorrelationId }, context.CancellationToken);
        }
    }
}

public interface IExternalChecker
{
    Task<bool> PerformCheck(IExternalCheckRequest request, CancellationToken cancellationToken);
}

public class Publisher
{
    private readonly IPublishEndpoint publishEndpoint;

    public Publisher(IPublishEndpoint publishEndpoint)
    {
        this.publishEndpoint = publishEndpoint;
    }

    public async Task Publish(Guid correlationId, CancellationToken cancellationToken)
    {
        await publishEndpoint.Publish<IInitialSagaEvent>(new { CorrelationId = correlationId }, cancellationToken);
    }
}

这里是配置代码

public class MySagaDbContext : SagaDbContext
{
    public MySagaDbContext(DbContextOptions<MySagaDbContext> options) : base(options) { }

    protected override IEnumerable<ISagaClassMap> Configurations
    {
        get
        {
            yield return new MySagaClassMap();
        }
    }
}

public class MySagaClassMap : SagaClassMap<MySaga>
{
    protected override void Configure(EntityTypeBuilder<MySaga> entity, ModelBuilder model)
    {
        entity.Property(x => x.CurrentState).HasMaxLength(128);
        entity.Property(x => x.RowVersion).IsRowVersion();
    }
}

public class ExternalCheckRequestConsumerDefinition : ConsumerDefinition<ExternalCheckRequestConsumer>
{
    protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator, IConsumerConfigurator<ExternalCheckRequestConsumer> consumerConfigurator) =>
        endpointConfigurator.UseRetry(r =>
        {
            r.Handle<DbUpdateConcurrencyException>();
            // This is the SQLServer error code for duplicate key
            r.Handle<DbUpdateException>(y => y.InnerException is SqlException e && e.Number == 2627);
            r.Interval(5, TimeSpan.FromMilliseconds(100));
        });
}

public class Program
{
    public static async Task Main(string[] args)
    {
        var services = new ServiceCollection();
        services.AddDbContext<DbContext, MySagaDbContext>((provider, builder)
                => builder.UseSqlServer(connectionString, m =>
                {
                    m.MigrationsAssembly(typeof(MySagaDbContext).Assembly.GetName().Name);
                    m.MigrationsHistoryTable($"__EFMigrationsHistory_Sagas");
                }));
        services.AddMassTransit(configureMassTransit =>
        {
            configureMassTransit.AddConsumer<ExternalCheckRequestConsumer, ExternalCheckRequestConsumerDefinition>();
            configureMassTransit.AddSagaStateMachine<MyStateMachine, MySaga>()
                .EntityFrameworkRepository(r =>
                {
                    r.ConcurrencyMode = ConcurrencyMode.Optimistic;
                    r.ExistingDbContext<MySagaDbContext>();
                });
            configureMassTransit.SetEndpointNameFormatter(new DefaultEndpointNameFormatter(true));
            configureMassTransit.UsingActiveMq((context, config) =>
            {
                config.Host("artemis", 61616, configureHost =>
                {
                    configureHost.Username("admin");
                    configureHost.Password("admin");
                });

                config.UseInMemoryOutbox(); // ref https://masstransit-project.com/articles/outbox.html#the-in-memory-outbox
                config.EnableArtemisCompatibility();
                config.ConfigureEndpoints(context);
            });
        });
        var serviceProvider = services.BuildServiceProvider();
        var busControl = serviceProvider.GetRequiredService<IBusControl>();
        await busControl.StartAsync();
        await RunPoc(serviceProvider);
    }

    private static async Task RunPoc(IServiceProvider serviceProvider)
    {
        await Task.CompletedTask;
    }
    static string connectionString = string.Empty;
}

我的猜测是我需要在正确的点进入 UseRetry,所以我尝试使用 UseRetry 配置 AddSagaStateMachine,如下所示:

configureMassTransit.AddSagaStateMachine<MyStateMachine, MySaga>(
    configure =>
    {
        configure.UseRetry(r =>
        {
            r.Handle<DbUpdateConcurrencyException>();
            // This is the SQLServer error code for duplicate key
            r.Handle<DbUpdateException>(y => y.InnerException is SqlException e && e.Number == 2627);
            r.Interval(5, TimeSpan.FromMilliseconds(100));
        });
    })
    .EntityFrameworkRepository(r =>
    {
        r.ConcurrencyMode = ConcurrencyMode.Optimistic;
        r.ExistingDbContext<MySagaDbContext>();
    });

但是使用 AddSagaStateMachine 中的 UseRetry 没有任何效果,我只是得到了这样的异常负载:

fail: MassTransit.ReceiveTransport[0]
R - FAULT activemq://artemis:61616/XXXX
System.ArgumentException: THe message could not be retrieved: IInitialSagaEvent(Parameter 'context')
at MassTransit.Saga.Pipeline.Pipes.SagaMergePipe`2.Send(SagaConsumeContext`1 context)
at GreenPipes.Filters.RetryFilter`1.GreenPipes.IFilter<TContext>.Send(TContext context, IPipe`1 next)
at GreenPipes.Filters.RetryFilter`1.GreenPipes.IFilter<TContext>.Send(TContext context, IPipe`1 next)
at MassTransit.Saga.SendSagaPipe`2.Send(SagaRepositoryContext`2 context)
at MassTransit.Saga.SendSagaPipe`2.Send(SagaRepositoryContext`2 context)
at MassTransit.EntityFrameworkCoreIntegration.Saga.Context.EntityFrameworkSagaRepositoryContextFactory`1.<> c__DisplayClass5_0`1.<< Send > b__1 > d.MoveNext()
-- - End of stack trace from previous location ---
at MassTransit.EntityFrameworkCoreIntegration.Saga.Context.EntityFrameworkSagaRepositoryContextFactory`1.<> c__DisplayClass8_0.<< WithinTransaction > g__Create | 0 > d.MoveNext()
-- - End of stack trace from previous location ---
at MassTransit.EntityFrameworkCoreIntegration.Saga.Context.EntityFrameworkSagaRepositoryContextFactory`1.WithinTransaction[T](DbContext context, CancellationToken cancellationToken, Func`1 callback)
at MassTransit.EntityFrameworkCoreIntegration.Saga.Context.EntityFrameworkSagaRepositoryContextFactory`1.WithinTransaction[T](DbContext context, CancellationToken cancellationToken, Func`1 callback)
at MassTransit.EntityFrameworkCoreIntegration.Saga.Context.EntityFrameworkSagaRepositoryContextFactory`1.WithinTransaction[T](DbContext context, CancellationToken cancellationToken, Func`1 callback)
at MassTransit.EntityFrameworkCoreIntegration.Saga.Context.EntityFrameworkSagaRepositoryContextFactory`1.Send[T](ConsumeContext`1 context, IPipe`1 next)
at MassTransit.EntityFrameworkCoreIntegration.Saga.Context.EntityFrameworkSagaRepositoryContextFactory`1.Send[T](ConsumeContext`1 context, IPipe`1 next)
at MassTransit.ExtensionsDependencyInjectionIntegration.ScopeProviders.DependencyInjectionSagaRepositoryContextFactory`1.<> c__DisplayClass6_0`1.<< Send > g__CreateScope | 0 > d.MoveNext()
-- - End of stack trace from previous location ---
at MassTransit.ExtensionsDependencyInjectionIntegration.ScopeProviders.DependencyInjectionSagaRepositoryContextFactory`1.<> c__DisplayClass6_0`1.<< Send > g__CreateScope | 0 > d.MoveNext()
-- - End of stack trace from previous location ---
at MassTransit.Saga.Pipeline.Filters.CorrelatedSagaFilter`2.GreenPipes.IFilter<MassTransit.ConsumeContext<TMessage>>.Send(ConsumeContext`1 context, IPipe`1 next)

我正在使用 .Net 6 并尝试过 MassTransit v 7.3.1 和 v 8.0.0-develop.391,但两者都具有相同的行为。

我尝试将消息定义为接口并将它们发布为匿名类和实际实现,并且还尝试将消息定义为类,但没有成功。

我希望我缺少一些小的配置细节,但我没有想法,所以非常感谢任何帮助。

I'm having a concurrency issue with MassTransit sagas.

I'm currently working on a POC with this flow:

  1. One thread produces 100 event that are published to MassTransit in a sequence.
  2. When the saga is instantiated it publishes another event to MassTransit.
  3. The new event is picked up by a Consumer that perform some business logic and publishes one of two resulting event to MassTransit.
  4. The resulting events from step 3. triggers a state change in the saga

I sometimes get exceptions like this Microsoft.EntityFrameworkCore.DbUpdateConcurrencyException: The database operation was expected to affect 1 row(s), but actually affected 0 row(s); data may have been modified or deleted since entities were loaded. in step 4, and the state change is not persisted.

Here is the business logic code:

public interface IInitialSagaEvent : CorrelatedBy<Guid> { }
public interface IExternalCheckRequest : CorrelatedBy<Guid> { }
public interface IExternalCheckOk : CorrelatedBy<Guid> { }
public interface IExternalCheckNotOk : CorrelatedBy<Guid> { }

public class MySaga : SagaStateMachineInstance
{
    public Guid CorrelationId { get; set; }
    public string CurrentState { get; set; }
    public byte[] RowVersion { get; set; }
}

public class MyStateMachine : MassTransitStateMachine<MySaga>
{
    public MyStateMachine()
    {
        InstanceState(instance => instance.CurrentState);
        Initially(
            When(InitialSagaEvent)
                .ThenAsync(context => context.GetPayload<ConsumeContext>().Publish<IExternalCheckRequest>(new { context.Instance.CorrelationId }))
                .TransitionTo(AwaitingExternalCheck)
        );

        During(AwaitingExternalCheck,
            Ignore(InitialSagaEvent),
            When(ExternalCheckOk)
                .TransitionTo(CheckedOk),
            When(ExternalCheckNotOk)
                .TransitionTo(CheckedNotOk)
        );

        During(CheckedOk,
            When(InitialSagaEvent)
                .ThenAsync(context => context.GetPayload<ConsumeContext>().Publish<IExternalCheckRequest>(new { context.Instance.CorrelationId }))
                .TransitionTo(AwaitingExternalCheck)
        );

        During(CheckedNotOk,
            When(InitialSagaEvent)
                .ThenAsync(context => context.GetPayload<ConsumeContext>().Publish<IExternalCheckRequest>(new { context.Instance.CorrelationId }))
                .TransitionTo(AwaitingExternalCheck)
        );
    }
    public Event<IInitialSagaEvent> InitialSagaEvent { get; private set; }
    public Event<IExternalCheckOk> ExternalCheckOk { get; private set; }
    public Event<IExternalCheckNotOk> ExternalCheckNotOk { get; private set; }
    public State AwaitingExternalCheck { get; private set; }
    public State CheckedOk { get; private set; }
    public State CheckedNotOk { get; private set; }
}

public class ExternalCheckRequestConsumer : IConsumer<IExternalCheckRequest>
{
    private readonly IExternalChecker externalChecker;

    public ExternalCheckRequestConsumer(IExternalChecker externalChecker)
    {
        this.externalChecker = externalChecker;
    }

    public async Task Consume(ConsumeContext<IExternalCheckRequest> context)
    {
        var ok = await externalChecker.PerformCheck(context.Message, context.CancellationToken);
        if (ok)
        {
            await context.Publish<IExternalCheckOk>(new { context.Message.CorrelationId }, context.CancellationToken);
        }
        else
        {
            await context.Publish<IExternalCheckNotOk>(new { context.Message.CorrelationId }, context.CancellationToken);
        }
    }
}

public interface IExternalChecker
{
    Task<bool> PerformCheck(IExternalCheckRequest request, CancellationToken cancellationToken);
}

public class Publisher
{
    private readonly IPublishEndpoint publishEndpoint;

    public Publisher(IPublishEndpoint publishEndpoint)
    {
        this.publishEndpoint = publishEndpoint;
    }

    public async Task Publish(Guid correlationId, CancellationToken cancellationToken)
    {
        await publishEndpoint.Publish<IInitialSagaEvent>(new { CorrelationId = correlationId }, cancellationToken);
    }
}

Here it the configuration code

public class MySagaDbContext : SagaDbContext
{
    public MySagaDbContext(DbContextOptions<MySagaDbContext> options) : base(options) { }

    protected override IEnumerable<ISagaClassMap> Configurations
    {
        get
        {
            yield return new MySagaClassMap();
        }
    }
}

public class MySagaClassMap : SagaClassMap<MySaga>
{
    protected override void Configure(EntityTypeBuilder<MySaga> entity, ModelBuilder model)
    {
        entity.Property(x => x.CurrentState).HasMaxLength(128);
        entity.Property(x => x.RowVersion).IsRowVersion();
    }
}

public class ExternalCheckRequestConsumerDefinition : ConsumerDefinition<ExternalCheckRequestConsumer>
{
    protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator, IConsumerConfigurator<ExternalCheckRequestConsumer> consumerConfigurator) =>
        endpointConfigurator.UseRetry(r =>
        {
            r.Handle<DbUpdateConcurrencyException>();
            // This is the SQLServer error code for duplicate key
            r.Handle<DbUpdateException>(y => y.InnerException is SqlException e && e.Number == 2627);
            r.Interval(5, TimeSpan.FromMilliseconds(100));
        });
}

public class Program
{
    public static async Task Main(string[] args)
    {
        var services = new ServiceCollection();
        services.AddDbContext<DbContext, MySagaDbContext>((provider, builder)
                => builder.UseSqlServer(connectionString, m =>
                {
                    m.MigrationsAssembly(typeof(MySagaDbContext).Assembly.GetName().Name);
                    m.MigrationsHistoryTable(
quot;__EFMigrationsHistory_Sagas");
                }));
        services.AddMassTransit(configureMassTransit =>
        {
            configureMassTransit.AddConsumer<ExternalCheckRequestConsumer, ExternalCheckRequestConsumerDefinition>();
            configureMassTransit.AddSagaStateMachine<MyStateMachine, MySaga>()
                .EntityFrameworkRepository(r =>
                {
                    r.ConcurrencyMode = ConcurrencyMode.Optimistic;
                    r.ExistingDbContext<MySagaDbContext>();
                });
            configureMassTransit.SetEndpointNameFormatter(new DefaultEndpointNameFormatter(true));
            configureMassTransit.UsingActiveMq((context, config) =>
            {
                config.Host("artemis", 61616, configureHost =>
                {
                    configureHost.Username("admin");
                    configureHost.Password("admin");
                });

                config.UseInMemoryOutbox(); // ref https://masstransit-project.com/articles/outbox.html#the-in-memory-outbox
                config.EnableArtemisCompatibility();
                config.ConfigureEndpoints(context);
            });
        });
        var serviceProvider = services.BuildServiceProvider();
        var busControl = serviceProvider.GetRequiredService<IBusControl>();
        await busControl.StartAsync();
        await RunPoc(serviceProvider);
    }

    private static async Task RunPoc(IServiceProvider serviceProvider)
    {
        await Task.CompletedTask;
    }
    static string connectionString = string.Empty;
}

My guess is that I need to get in a UseRetry at the correct point, so I've tried to configure the AddSagaStateMachine with UseRetry like this:

configureMassTransit.AddSagaStateMachine<MyStateMachine, MySaga>(
    configure =>
    {
        configure.UseRetry(r =>
        {
            r.Handle<DbUpdateConcurrencyException>();
            // This is the SQLServer error code for duplicate key
            r.Handle<DbUpdateException>(y => y.InnerException is SqlException e && e.Number == 2627);
            r.Interval(5, TimeSpan.FromMilliseconds(100));
        });
    })
    .EntityFrameworkRepository(r =>
    {
        r.ConcurrencyMode = ConcurrencyMode.Optimistic;
        r.ExistingDbContext<MySagaDbContext>();
    });

But with this UseRetry in AddSagaStateMachine nothing works, I just get loads of exception like this:

fail: MassTransit.ReceiveTransport[0]
R - FAULT activemq://artemis:61616/XXXX
System.ArgumentException: THe message could not be retrieved: IInitialSagaEvent(Parameter 'context')
at MassTransit.Saga.Pipeline.Pipes.SagaMergePipe`2.Send(SagaConsumeContext`1 context)
at GreenPipes.Filters.RetryFilter`1.GreenPipes.IFilter<TContext>.Send(TContext context, IPipe`1 next)
at GreenPipes.Filters.RetryFilter`1.GreenPipes.IFilter<TContext>.Send(TContext context, IPipe`1 next)
at MassTransit.Saga.SendSagaPipe`2.Send(SagaRepositoryContext`2 context)
at MassTransit.Saga.SendSagaPipe`2.Send(SagaRepositoryContext`2 context)
at MassTransit.EntityFrameworkCoreIntegration.Saga.Context.EntityFrameworkSagaRepositoryContextFactory`1.<> c__DisplayClass5_0`1.<< Send > b__1 > d.MoveNext()
-- - End of stack trace from previous location ---
at MassTransit.EntityFrameworkCoreIntegration.Saga.Context.EntityFrameworkSagaRepositoryContextFactory`1.<> c__DisplayClass8_0.<< WithinTransaction > g__Create | 0 > d.MoveNext()
-- - End of stack trace from previous location ---
at MassTransit.EntityFrameworkCoreIntegration.Saga.Context.EntityFrameworkSagaRepositoryContextFactory`1.WithinTransaction[T](DbContext context, CancellationToken cancellationToken, Func`1 callback)
at MassTransit.EntityFrameworkCoreIntegration.Saga.Context.EntityFrameworkSagaRepositoryContextFactory`1.WithinTransaction[T](DbContext context, CancellationToken cancellationToken, Func`1 callback)
at MassTransit.EntityFrameworkCoreIntegration.Saga.Context.EntityFrameworkSagaRepositoryContextFactory`1.WithinTransaction[T](DbContext context, CancellationToken cancellationToken, Func`1 callback)
at MassTransit.EntityFrameworkCoreIntegration.Saga.Context.EntityFrameworkSagaRepositoryContextFactory`1.Send[T](ConsumeContext`1 context, IPipe`1 next)
at MassTransit.EntityFrameworkCoreIntegration.Saga.Context.EntityFrameworkSagaRepositoryContextFactory`1.Send[T](ConsumeContext`1 context, IPipe`1 next)
at MassTransit.ExtensionsDependencyInjectionIntegration.ScopeProviders.DependencyInjectionSagaRepositoryContextFactory`1.<> c__DisplayClass6_0`1.<< Send > g__CreateScope | 0 > d.MoveNext()
-- - End of stack trace from previous location ---
at MassTransit.ExtensionsDependencyInjectionIntegration.ScopeProviders.DependencyInjectionSagaRepositoryContextFactory`1.<> c__DisplayClass6_0`1.<< Send > g__CreateScope | 0 > d.MoveNext()
-- - End of stack trace from previous location ---
at MassTransit.Saga.Pipeline.Filters.CorrelatedSagaFilter`2.GreenPipes.IFilter<MassTransit.ConsumeContext<TMessage>>.Send(ConsumeContext`1 context, IPipe`1 next)

I'm using .Net 6 and have tried MassTransit v 7.3.1 and v 8.0.0-develop.391, but both has the same behavior.

I've tried defining the messages as interfaces and publishing them both as anonymous classes and as actual implementations, and also tried to define the messages as classes, but with no luck.

My hope it that there is just some small configuration detail I'm missing, but I'm out of ideas, so any help is deeply appreciated.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

手长情犹 2025-01-18 14:55:18

SagaDefinition 中的正确配置如下所示。请注意使用 UseMessageRetry,而不是 UseRetry

public class ExternalCheckRequestConsumerDefinition : 
    ConsumerDefinition<ExternalCheckRequestConsumer>
{
    protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator, 
        IConsumerConfigurator<ExternalCheckRequestConsumer> consumerConfigurator) =>
        endpointConfigurator.UseMessageRetry(r =>
        {
            r.Handle<DbUpdateConcurrencyException>();
            // This is the SQLServer error code for duplicate key
            r.Handle<DbUpdateException>(y => y.InnerException is SqlException e && e.Number == 2627);
            r.Interval(5, TimeSpan.FromMilliseconds(100));
        });
}

更新

传奇没有使用上述消费者定义。您需要创建一个 Saga 定义,并在添加 saga 时指定它,以便重试应用于该 saga。这与添加传奇时内联配置它的作用相同:

.AddSagaStateMachine<MyStateMachine, MySaga, MySagaDefinition>(

此外,在状态机中,替换过于嘈杂的:

.ThenAsync(context => context.GetPayload().Publish( new { context.Instance.CorrelationId }))

与:

.PublishAsync(context =>; context.Init(new { context.Instance.CorrelationId }))

The proper configuration in your SagaDefinition is shown below. Note the use of UseMessageRetry, instead of UseRetry.

public class ExternalCheckRequestConsumerDefinition : 
    ConsumerDefinition<ExternalCheckRequestConsumer>
{
    protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator, 
        IConsumerConfigurator<ExternalCheckRequestConsumer> consumerConfigurator) =>
        endpointConfigurator.UseMessageRetry(r =>
        {
            r.Handle<DbUpdateConcurrencyException>();
            // This is the SQLServer error code for duplicate key
            r.Handle<DbUpdateException>(y => y.InnerException is SqlException e && e.Number == 2627);
            r.Interval(5, TimeSpan.FromMilliseconds(100));
        });
}

UPDATE

The above Consumer definition isn't used by the saga. You'd need to create a Saga definition, and specify it when adding the saga, for the retry to apply to the saga. Which would do the same as configuring it inline when adding the saga:

.AddSagaStateMachine<MyStateMachine, MySaga, MySagaDefinition>(

Also, in your state machine, replace the overly noisy:

.ThenAsync(context => context.GetPayload<ConsumeContext>().Publish<IExternalCheckRequest>(new { context.Instance.CorrelationId }))

With:

.PublishAsync(context => context.Init<IExternalCheckRequest>(new { context.Instance.CorrelationId }))

半窗疏影 2025-01-18 14:55:18

这是我使用的 .AddSagaStateMachine,参考其他答案中的 Chris Pattersons 解决方案。

configureMassTransit.AddSagaStateMachine<MyStateMachine, MySaga>(
    configure =>
    {
        configure.UseMessageRetry(r =>
        {
            r.Handle<DbUpdateConcurrencyException>();
            // This is the SQLServer error code for duplicate key
            r.Handle<DbUpdateException>(y => y.InnerException is SqlException e && e.Number == 2627);
            r.Interval(5, TimeSpan.FromMilliseconds(100));
        });
    })
    .EntityFrameworkRepository(r =>
    {
        r.ConcurrencyMode = ConcurrencyMode.Optimistic;
        r.ExistingDbContext<MySagaDbContext>();
    });

Here is the .AddSagaStateMachine I used, ref Chris Pattersons solution in the other answer.

configureMassTransit.AddSagaStateMachine<MyStateMachine, MySaga>(
    configure =>
    {
        configure.UseMessageRetry(r =>
        {
            r.Handle<DbUpdateConcurrencyException>();
            // This is the SQLServer error code for duplicate key
            r.Handle<DbUpdateException>(y => y.InnerException is SqlException e && e.Number == 2627);
            r.Interval(5, TimeSpan.FromMilliseconds(100));
        });
    })
    .EntityFrameworkRepository(r =>
    {
        r.ConcurrencyMode = ConcurrencyMode.Optimistic;
        r.ExistingDbContext<MySagaDbContext>();
    });
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文