春季AMQP(RabbitMQ)并在异常时发送到DLQ

发布于 2025-02-06 02:52:06 字数 2078 浏览 2 评论 0原文

我正在使用org.springframework.boot:spring-boot-starter-amqp:2.6.6。 根据文档,我设置@rabbitlistener - 我使用simpleerabbitlistlistenerconercontainerfactory,该配置看起来像:

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ObjectMapper om) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        factory.setConcurrentConsumers(rabbitProperties.getUpdater().getConcurrentConsumers());
        factory.setMaxConcurrentConsumers(rabbitProperties.getUpdater().getMaxConcurrentConsumers());
        factory.setMessageConverter(new Jackson2JsonMessageConverter(om));
        factory.setAutoStartup(rabbitProperties.getUpdater().getAutoStartup());
        factory.setDefaultRequeueRejected(false);
        return factory;
    }

服务的逻辑是从兔子接收消息,请与外部联系,联系通过REST API(使用REST模板)进行服务,并根据响应的结果(使用Spring Data JPA)将一些信息放入数据库中。该服务成功实施了它,但是在测试过程中,该服务遇到了问题,即如果在投掷堆栈的人的工作期间发生任何例外,则该消息不会发送到配置的DLQ,而只是将其挂在经纪人中。您能告诉我如何告诉Spring AMQP,如果发生任何错误,您需要将消息重定向到DLQ?

听众本身看起来像这样:


    @RabbitListener(
            queues = {"${rabbit.updater.consuming.queue.name}"},
            containerFactory = "rabbitListenerContainerFactory"
    )
    @Override
    public void listen(
            @Valid @Payload MessageDTO message,
            Channel channel,
            @Header(AmqpHeaders.DELIVERY_TAG) Long deliveryTag
    ) {

        log.debug(DebugMessagesConstants.RECEIVED_MESSAGE_FROM_QUEUE, message, deliveryTag);

        messageUpdater.process(message);
        channel.basicAck(deliveryTag, false);

        log.debug(DebugMessagesConstants.PROCESSED_MESSAGE_FROM_QUEUE, message, deliveryTag);

    }

在兔子管理中,它看起来像这样: 在此处输入图像描述 并没有被束缚,直到排队消费申请停止

I am using org.springframework.boot:spring-boot-starter-amqp:2.6.6 .
According to the documentation, I set up @RabbitListener - I use SimpleRabbitListenerContainerFactory and the configuration looks like this:

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ObjectMapper om) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        factory.setConcurrentConsumers(rabbitProperties.getUpdater().getConcurrentConsumers());
        factory.setMaxConcurrentConsumers(rabbitProperties.getUpdater().getMaxConcurrentConsumers());
        factory.setMessageConverter(new Jackson2JsonMessageConverter(om));
        factory.setAutoStartup(rabbitProperties.getUpdater().getAutoStartup());
        factory.setDefaultRequeueRejected(false);
        return factory;
    }

The logic of the service is to receive messages from rabbitmq, contact an external service via the rest API (using rest template) and put some information into the database based on the results of the response (using spring data jpa). The service implemented it successfully, but during testing it ran into problems that if any exceptions occur during the work of those thrown up the stack, the message is not sent to the configured dlq, but simply hangs in the broker as unacked. Can you please tell me how you can tell spring amqp that if any error occurs, you need to redirect the message to dlq?

The listener itself looks something like this:


    @RabbitListener(
            queues = {"${rabbit.updater.consuming.queue.name}"},
            containerFactory = "rabbitListenerContainerFactory"
    )
    @Override
    public void listen(
            @Valid @Payload MessageDTO message,
            Channel channel,
            @Header(AmqpHeaders.DELIVERY_TAG) Long deliveryTag
    ) {

        log.debug(DebugMessagesConstants.RECEIVED_MESSAGE_FROM_QUEUE, message, deliveryTag);

        messageUpdater.process(message);
        channel.basicAck(deliveryTag, false);

        log.debug(DebugMessagesConstants.PROCESSED_MESSAGE_FROM_QUEUE, message, deliveryTag);

    }

In rabbit managment it look something like this:
enter image description here
and unacked will hang until the queue consuming application stops

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

玩套路吗 2025-02-13 02:52:06

请参阅错误处理文档: https> https:// docs.spring.io/spring-amqp/docs/current/referent/html/#annotation-error处理

因此,您只是不做grespentgemode.manual,并且依靠那些在错误时被拒绝的消息的死信交换配置。

或尝试使用this.channel.basicnack(deliveryTag,false,false)messageupdater.process(message); exception的情况下

See error handling documentation: https://docs.spring.io/spring-amqp/docs/current/reference/html/#annotation-error-handling.

So, you just don't do an AcknowledgeMode.MANUAL and rely on the Dead Letter Exchange configuration for those messages which are rejected in case of error.

Or try to use a this.channel.basicNack(deliveryTag, false, false) in case of messageUpdater.process(message); exception...

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文