春季AMQP(RabbitMQ)并在异常时发送到DLQ
我正在使用org.springframework.boot:spring-boot-starter-amqp:2.6.6。 根据文档,我设置@rabbitlistener
- 我使用simpleerabbitlistlistenerconercontainerfactory
,该配置看起来像:
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ObjectMapper om) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
factory.setConcurrentConsumers(rabbitProperties.getUpdater().getConcurrentConsumers());
factory.setMaxConcurrentConsumers(rabbitProperties.getUpdater().getMaxConcurrentConsumers());
factory.setMessageConverter(new Jackson2JsonMessageConverter(om));
factory.setAutoStartup(rabbitProperties.getUpdater().getAutoStartup());
factory.setDefaultRequeueRejected(false);
return factory;
}
服务的逻辑是从兔子接收消息,请与外部联系,联系通过REST API(使用REST模板)进行服务,并根据响应的结果(使用Spring Data JPA)将一些信息放入数据库中。该服务成功实施了它,但是在测试过程中,该服务遇到了问题,即如果在投掷堆栈的人的工作期间发生任何例外,则该消息不会发送到配置的DLQ,而只是将其挂在经纪人中。您能告诉我如何告诉Spring AMQP,如果发生任何错误,您需要将消息重定向到DLQ?
听众本身看起来像这样:
@RabbitListener(
queues = {"${rabbit.updater.consuming.queue.name}"},
containerFactory = "rabbitListenerContainerFactory"
)
@Override
public void listen(
@Valid @Payload MessageDTO message,
Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) Long deliveryTag
) {
log.debug(DebugMessagesConstants.RECEIVED_MESSAGE_FROM_QUEUE, message, deliveryTag);
messageUpdater.process(message);
channel.basicAck(deliveryTag, false);
log.debug(DebugMessagesConstants.PROCESSED_MESSAGE_FROM_QUEUE, message, deliveryTag);
}
在兔子管理中,它看起来像这样: 在此处输入图像描述 并没有被束缚,直到排队消费申请停止
I am using org.springframework.boot:spring-boot-starter-amqp:2.6.6 .
According to the documentation, I set up @RabbitListener
- I use SimpleRabbitListenerContainerFactory
and the configuration looks like this:
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ObjectMapper om) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
factory.setConcurrentConsumers(rabbitProperties.getUpdater().getConcurrentConsumers());
factory.setMaxConcurrentConsumers(rabbitProperties.getUpdater().getMaxConcurrentConsumers());
factory.setMessageConverter(new Jackson2JsonMessageConverter(om));
factory.setAutoStartup(rabbitProperties.getUpdater().getAutoStartup());
factory.setDefaultRequeueRejected(false);
return factory;
}
The logic of the service is to receive messages from rabbitmq, contact an external service via the rest API (using rest template) and put some information into the database based on the results of the response (using spring data jpa). The service implemented it successfully, but during testing it ran into problems that if any exceptions occur during the work of those thrown up the stack, the message is not sent to the configured dlq, but simply hangs in the broker as unacked. Can you please tell me how you can tell spring amqp that if any error occurs, you need to redirect the message to dlq?
The listener itself looks something like this:
@RabbitListener(
queues = {"${rabbit.updater.consuming.queue.name}"},
containerFactory = "rabbitListenerContainerFactory"
)
@Override
public void listen(
@Valid @Payload MessageDTO message,
Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) Long deliveryTag
) {
log.debug(DebugMessagesConstants.RECEIVED_MESSAGE_FROM_QUEUE, message, deliveryTag);
messageUpdater.process(message);
channel.basicAck(deliveryTag, false);
log.debug(DebugMessagesConstants.PROCESSED_MESSAGE_FROM_QUEUE, message, deliveryTag);
}
In rabbit managment it look something like this:
enter image description here
and unacked will hang until the queue consuming application stops
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
请参阅错误处理文档: https> https:// docs.spring.io/spring-amqp/docs/current/referent/html/#annotation-error处理。
因此,您只是不做
grespentgemode.manual
,并且依靠那些在错误时被拒绝的消息的死信交换配置。或尝试使用
this.channel.basicnack(deliveryTag,false,false)
在messageupdater.process(message);
exception的情况下See error handling documentation: https://docs.spring.io/spring-amqp/docs/current/reference/html/#annotation-error-handling.
So, you just don't do an
AcknowledgeMode.MANUAL
and rely on the Dead Letter Exchange configuration for those messages which are rejected in case of error.Or try to use a
this.channel.basicNack(deliveryTag, false, false)
in case ofmessageUpdater.process(message);
exception...