RabbitMQ听众没有被解雇

发布于 2025-02-02 17:01:04 字数 3068 浏览 4 评论 0原文

我正在使用ASP.NETCORE 6.0项目,

我正在使用RabbitMQ进行购物车实施。 IE:付款成功后,应进行预订。

首先,我正在创建队列:

var factory = new ConnectionFactory
                    {
                        Uri = new Uri(_config.GetValue<string>("AmpqUrl")),
                    };

                    try
                    {
                        using var connection = factory.CreateConnection();
                        using var channel = connection.CreateModel();

                        channel.QueueDeclare(queue: "confirmed_payments", durable: true, exclusive: false, autoDelete: false, arguments: null);

                        var data = new
                        {
                            transactionId,
                            paymentConfirmedAt = DateTime.UtcNow,
                        };

                        var body = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(data));

                        channel.BasicPublish(exchange: "", routingKey: "confirmed_payments", basicProperties: null, body: body);

                    }
                    catch (RabbitMQ.Client.Exceptions.BrokerUnreachableException ex)
                    {
                               Console.WriteLine("ex.ToString()");

                    }

它的听众(另一个项目):

 public Task ListenPaymentConfimations(CancellationToken cancellationToken)
    {
        var factory = new ConnectionFactory
        {
            Uri = new Uri(_configuration.GetValue<string>("AmpqUrl")),
        };

        _connection = factory.CreateConnection();
        _channel = _connection.CreateModel();

        var confimedPaymentsConsumer = new EventingBasicConsumer(_channel);

        confimedPaymentsConsumer.Received += async (model, ea) =>
        {
            var body = ea.Body.ToArray();
            var response = Encoding.UTF8.GetString(body);

            var data = JsonConvert.DeserializeAnonymousType(response,
                new { transactionId = "", paymentConfirmedAt = "" }
            );

            var date = DateTime.Parse(data.paymentConfirmedAt).ToUniversalTime();

            using var scope = _serviceProvider.CreateScope();

            var dbService =
                scope.ServiceProvider.GetRequiredService<ITechneDbService>();

            var isPaymentConfimed = await dbService.UpdateCartPaymentConfirmedAt(data.transactionId, date);

            _logger.LogInformation("Transaction - {0}", data.transactionId);
            _logger.LogInformation("Transaction - {0}", date);
            _logger.LogInformation("Payment Confirmed - {0}", isPaymentConfimed);

            if (isPaymentConfimed)
            {
               // handle booking
            }
        };

        _channel.BasicConsume(queue: "confirmed_payments",
                     autoAck: true,
                     consumer: confimedPaymentsConsumer);

        return Task.CompletedTask;
    }

当我尝试进行酒店预订时,有时会击中听众,我可以成功预订。 但是有时候听众没有被打击(创建队列时也没有例外)。

我不知道为什么会发生这种情况。

任何人都可以找到这个问题, 请帮我。

I am working a asp.netcore 6.0 project

I am using RabbitMQ to cart implementation. i.e: After payment is success, Booking should be placed.

first I'm creating queue:

var factory = new ConnectionFactory
                    {
                        Uri = new Uri(_config.GetValue<string>("AmpqUrl")),
                    };

                    try
                    {
                        using var connection = factory.CreateConnection();
                        using var channel = connection.CreateModel();

                        channel.QueueDeclare(queue: "confirmed_payments", durable: true, exclusive: false, autoDelete: false, arguments: null);

                        var data = new
                        {
                            transactionId,
                            paymentConfirmedAt = DateTime.UtcNow,
                        };

                        var body = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(data));

                        channel.BasicPublish(exchange: "", routingKey: "confirmed_payments", basicProperties: null, body: body);

                    }
                    catch (RabbitMQ.Client.Exceptions.BrokerUnreachableException ex)
                    {
                               Console.WriteLine("ex.ToString()");

                    }

and listener of it (another project):

 public Task ListenPaymentConfimations(CancellationToken cancellationToken)
    {
        var factory = new ConnectionFactory
        {
            Uri = new Uri(_configuration.GetValue<string>("AmpqUrl")),
        };

        _connection = factory.CreateConnection();
        _channel = _connection.CreateModel();

        var confimedPaymentsConsumer = new EventingBasicConsumer(_channel);

        confimedPaymentsConsumer.Received += async (model, ea) =>
        {
            var body = ea.Body.ToArray();
            var response = Encoding.UTF8.GetString(body);

            var data = JsonConvert.DeserializeAnonymousType(response,
                new { transactionId = "", paymentConfirmedAt = "" }
            );

            var date = DateTime.Parse(data.paymentConfirmedAt).ToUniversalTime();

            using var scope = _serviceProvider.CreateScope();

            var dbService =
                scope.ServiceProvider.GetRequiredService<ITechneDbService>();

            var isPaymentConfimed = await dbService.UpdateCartPaymentConfirmedAt(data.transactionId, date);

            _logger.LogInformation("Transaction - {0}", data.transactionId);
            _logger.LogInformation("Transaction - {0}", date);
            _logger.LogInformation("Payment Confirmed - {0}", isPaymentConfimed);

            if (isPaymentConfimed)
            {
               // handle booking
            }
        };

        _channel.BasicConsume(queue: "confirmed_payments",
                     autoAck: true,
                     consumer: confimedPaymentsConsumer);

        return Task.CompletedTask;
    }

When I try to make hotel booking, sometimes Listener is hitted and I can made booking successfully.
But Sometimes listener in not hitted (There is no exception thrown when creating queue).

I don't know why this happens.

Anyone can find the issue,
Please help me.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

禾厶谷欠 2025-02-09 17:01:04

更改连接的构造函数,以表明应异步派遣消费者。

var factory = new ConnectionFactory
{
    Uri = new Uri(_configuration.GetValue<string>("AmpqUrl")),
    DispatchConsumersAsync = true // <---- this
};

您可能还需要更改为使用asynceventingBasicConsumer而不是EventingBasicConsumer

Change the constructor for your connection to indicate that consumers should be dispatched asynchronously.

var factory = new ConnectionFactory
{
    Uri = new Uri(_configuration.GetValue<string>("AmpqUrl")),
    DispatchConsumersAsync = true // <---- this
};

You'll probably also want to change over to using AsyncEventingBasicConsumer instead of EventingBasicConsumer.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文