成功发起请求后无法获得 MassTransit 响应

发布于 2025-01-11 14:18:12 字数 889 浏览 0 评论 0原文

我正在使用 MassTransit 和 Amazon MQ 实施请求/响应。 (.NET 6 WebApi) 主机的配置如下:

services
    .AddMassTransit(x =>
    {
        x.AddConsumer<ConfigurationConsumer>();
        x.UsingRabbitMq((context, cfg) =>
        {
            cfg.Host(new Uri($"amqps://{rabbitMqSettings.Host}:{rabbitMqSettings.Port}/{Uri.EscapeDataString("/vhostname")}"), h =>
            {
                h.Username(rabbitMqSettings.Username);
                h.Password(rabbitMqSettings.Password);
            });
            cfg.ConfigureEndpoints(context);
        });
        x.AddRequestClient<IConfigurationCommand>();
    })
    .AddMassTransitHostedService();

客户端使用相同的配置,但没有 x.AddConsumer() 和 cfg.ConfigureEndpoints(context)。

问题是:在客户端上调用 GetResponse 将成功执行主机上的消费者,该消费者似乎按预期响应,但客户端永远不会得到结果并超时。

另一方面,当我在同一主机上使用 Web api 控制器来启动 GetResponse 时,一切都按预期工作。

I am implementing request/response with MassTransit and Amazon MQ. The (.NET 6 WebApi) host is configured like:

services
    .AddMassTransit(x =>
    {
        x.AddConsumer<ConfigurationConsumer>();
        x.UsingRabbitMq((context, cfg) =>
        {
            cfg.Host(new Uri(
quot;amqps://{rabbitMqSettings.Host}:{rabbitMqSettings.Port}/{Uri.EscapeDataString("/vhostname")}"), h =>
            {
                h.Username(rabbitMqSettings.Username);
                h.Password(rabbitMqSettings.Password);
            });
            cfg.ConfigureEndpoints(context);
        });
        x.AddRequestClient<IConfigurationCommand>();
    })
    .AddMassTransitHostedService();

The client uses the same configuration without x.AddConsumer() and cfg.ConfigureEndpoints(context).

The problem is: calling GetResponse on the client will successfully execute the consumer on the host which seems to respond as intended, but the client never gets a result and times out.

On the other hand, when I am using a web api controller on the same host to initiate GetResponse, everything works as expected.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

悍妇囚夫 2025-01-18 14:18:12

确保主机和客户端应用程序之间的消息类型相同。

另外,请确保使用托管服务或其他方式在客户端上启动总线。

Ensure that the message types between the host and the client applications are the same.

ALSO, make sure the bus is started on the client using the hosted service or otherwise.

小…红帽 2025-01-18 14:18:12

在我的响应类型上使用继承类时,我遇到了同样的超时问题,其中父类有一个失败响应的方法!

最后,当将响应强制转换为子类的确切类型时,问题得到解决。

MyCommon-lib:

public partial interface IEvent
{
    bool Succeeded { get; }
    string Message { get; }
}
public abstract record Event<TModel> : IEvent
    where TModel : IEvent
{
    public bool Succeeded { get; set; } = true;
    public string Message { get; set; } = string.Empty;

    public TModel Failed(Exception exception)
    {
        Succeeded = false;
        Message = exception.InnerException?.Message ?? exception.Message;
        return (TModel)(object)this; //! problem was here, where I cannot return this as simple as 'return this;'
    }
    public TModel Failed(string message)
    {
        Succeeded = false;
        Message = message;
        return (TModel)(object)this;
    }
}

public partial interface IEvent
{
    public record PointHistory(IList<PointHistoryDto>? Group, IEnumerable<PointHistoryDto>? Filtered) : Event<PointHistory>, IEvent;
    public record PointApplied() : Event<PointApplied>, IEvent;
}

Microservice1::MyMediator:

public async Task DoSomething()
{
    ...
    var pointHistory = await bus.Request<ICommand.PointHistory, IEvent.PointHistory>( ... );
    if (!pointHistory.Message.Succeeded) throw new UserException(pointHistory.Message.Message);
    ...
    var pointApplied = await bus.Request<ICommand.PointApply, IEvent.PointApplied>( ... );
    if (!pointHistory.Message.Succeeded) throw new UserException(pointHistory.Message.Message);
    ...
}

Microservice2::MyConsumer1:

internal class MyConsumer1(
    IPointService service
) : IConsumer<ICommand.PointApply>
{
    public async Task Consume(ConsumeContext<ICommand.PointApply> context)
    {
        try
        {
            ICommand.PointApply request = context.Message;
            
            ...
            
            //! response
            await context.RespondAsync(new IEvent.PointApplied());
        }
        catch (UserException err) { await context.RespondAsync(new IEvent.PointApplied().Failed(err)); }
    }
}

I had same timeout issues when using inheritance class on my response types, where the parent class has a method for failed response!

Finally, the problem was solved when cast response to exactly type as child class.

MyCommon-lib:

public partial interface IEvent
{
    bool Succeeded { get; }
    string Message { get; }
}
public abstract record Event<TModel> : IEvent
    where TModel : IEvent
{
    public bool Succeeded { get; set; } = true;
    public string Message { get; set; } = string.Empty;

    public TModel Failed(Exception exception)
    {
        Succeeded = false;
        Message = exception.InnerException?.Message ?? exception.Message;
        return (TModel)(object)this; //! problem was here, where I cannot return this as simple as 'return this;'
    }
    public TModel Failed(string message)
    {
        Succeeded = false;
        Message = message;
        return (TModel)(object)this;
    }
}

public partial interface IEvent
{
    public record PointHistory(IList<PointHistoryDto>? Group, IEnumerable<PointHistoryDto>? Filtered) : Event<PointHistory>, IEvent;
    public record PointApplied() : Event<PointApplied>, IEvent;
}

Microservice1::MyMediator:

public async Task DoSomething()
{
    ...
    var pointHistory = await bus.Request<ICommand.PointHistory, IEvent.PointHistory>( ... );
    if (!pointHistory.Message.Succeeded) throw new UserException(pointHistory.Message.Message);
    ...
    var pointApplied = await bus.Request<ICommand.PointApply, IEvent.PointApplied>( ... );
    if (!pointHistory.Message.Succeeded) throw new UserException(pointHistory.Message.Message);
    ...
}

Microservice2::MyConsumer1:

internal class MyConsumer1(
    IPointService service
) : IConsumer<ICommand.PointApply>
{
    public async Task Consume(ConsumeContext<ICommand.PointApply> context)
    {
        try
        {
            ICommand.PointApply request = context.Message;
            
            ...
            
            //! response
            await context.RespondAsync(new IEvent.PointApplied());
        }
        catch (UserException err) { await context.RespondAsync(new IEvent.PointApplied().Failed(err)); }
    }
}
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文