扔AmqprejectanddontrequeueException后,RabbitMQ消息仍会重试
我有一个简单的 Spring Boot应用程序在其中我有以下设置 RabbitMQ (Spring-Boot-Starter-Amqp版本为2.7.0):
spring:
rabbitmq:
host: localhost
port: 5672
virtual-host: my_host
username: admin
password: password
template:
retry:
enabled: true
initial-interval: 3s
max-interval: 10s
multiplier: 2
max-attempts: 3
listener:
simple:
retry:
enabled: true
initial-interval: 3s
max-interval: 10s
multiplier: 2
max-attempts: 3
a simpleererabbitlistenerconercontainerfactory < /code>使用
MessagePostProcessor
:
@Bean
public SimpleRabbitListenerContainerFactory myInterceptContainerFactory(final SimpleRabbitListenerContainerFactoryConfigurer configurer,
final ConnectionFactory connectionFactory,
final MyMessagePostProcessor myMessagePostProcessor) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory() {
@Override
protected void initializeContainer(SimpleMessageListenerContainer instance, RabbitListenerEndpoint endpoint) {
instance.setAfterReceivePostProcessors(myMessagePostProcessor);
super.initializeContainer(instance, endpoint);
}
};
((CachingConnectionFactory) connectionFactory).setRequestedHeartBeat(60);
configurer.configure(factory, connectionFactory);
return factory;
}
MessagePostProcessor
非常简单,如果版本错误在标题中,它会抛出 AmqPrejectAndDontrequeueException
:
@Component
public class MyMessagePostProcessor implements MessagePostProcessor {
private static final Logger LOGGER = LogManager.getLogger(MyMessagePostProcessor.class);
@Override
public Message postProcessMessage(Message message) throws AmqpException {
if (!"1.0".equalsIgnoreCase(message.getMessageProperties().getHeader("version"))) {
LOGGER.error("wrong version");
throw new AmqpRejectAndDontRequeueException("wrong version");
}
return message;
}
}
当我发送带有错误版本的消息时,它基于日志正常工作:
MyApp: 2022-07-05 15:49:33,675 ERROR [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] c.g.r.a.q.p.MyMessagePostProcessor:22 - wrong version
MyApp: 2022-07-05 15:49:33,676 WARN [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] o.s.a.r.l.ConditionalRejectingErrorHandler:169 - Execution of Rabbit message listener failed.
org.springframework.amqp.AmqpRejectAndDontRequeueException: wrong version
at com.my.app.queue.postprocess.MyMessagePostProcessor.postProcessMessage(MyMessagePostProcessor.java:23) ~[classes/:1.0.0]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1544) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1499) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:992) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:939) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:84) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1316) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1222) ~[spring-rabbit-2.4.5.jar:2.4.5]
at java.lang.Thread.run(Thread.java:833) ~[?:?]
我的消费者仅执行一个错误log,并立即投掷 amqpreject和DontrequequeueException
@Component
public class MyMessageConsumer {
private static final Logger LOGGER = LogManager.getLogger(MyMessageConsumer.class);
@RabbitListener(queues = "my.queue", containerFactory = "myInterceptContainerFactory")
public void consumerMessage() {
LOGGER.error("error in consuming message");
throw new AmqpRejectAndDontRequeueException("over");
}
}
但基于日志,它清楚地看到了重试3次(因为我的错误日志被打印了3次),只有在此之后,该消息才传递到无聊的队列中:
MyApp: 2022-07-05 15:50:32,182 ERROR [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] c.g.r.a.q.c.MyMessageConsumer:23 - error in consuming message
MyApp: 2022-07-05 15:50:46,603 ERROR [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] c.g.r.a.q.c.MyMessageConsumer:23 - error in consuming message
MyApp: 2022-07-05 15:50:52,617 ERROR [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] c.g.r.a.q.c.MyMessageConsumer:23 - error in consuming message
MyApp: 2022-07-05 15:50:52,618 WARN [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] o.s.a.r.r.RejectAndDontRequeueRecoverer:74 - Retries exhausted for message (Body:'[B@4060893b(byte[75])' MessageProperties [headers={version=1.0, requestor=MyApp}, contentLength=0, receivedDeliveryMode=NON_PERSISTENT, redelivered=false, receivedExchange=, receivedRoutingKey=my.queue, deliveryTag=2, consumerTag=amq.ctag-5Xx6EwwjVokRV6wD9xL8Og, consumerQueue=my.queue])
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener method 'public void com.my.app.queue.consumer.MyMessageConsumer.retrieveNcid()' threw exception
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:271) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandlerAndProcessResult(MessagingMessageListenerAdapter.java:208) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:147) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1657) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1576) ~[spring-rabbit-2.4.5.jar:2.4.5]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[?:?]
at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
at java.lang.reflect.Method.invoke(Method.java:568) ~[?:?]
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344) ~[spring-aop-5.3.20.jar:5.3.20]
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) ~[spring-aop-5.3.20.jar:5.3.20]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.3.20.jar:5.3.20]
at org.springframework.retry.interceptor.RetryOperationsInterceptor$1.doWithRetry(RetryOperationsInterceptor.java:97) ~[spring-retry-1.3.3.jar:?]
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329) ~[spring-retry-1.3.3.jar:?]
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:225) ~[spring-retry-1.3.3.jar:?]
at org.springframework.retry.interceptor.RetryOperationsInterceptor.invoke(RetryOperationsInterceptor.java:122) ~[spring-retry-1.3.3.jar:?]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.3.20.jar:5.3.20]
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:215) ~[spring-aop-5.3.20.jar:5.3.20]
at org.springframework.amqp.rabbit.listener.$Proxy187.invokeListener(Unknown Source) ~[?:2.4.5]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1564) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1555) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1499) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:992) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:939) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:84) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1316) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1222) ~[spring-rabbit-2.4.5.jar:2.4.5]
at java.lang.Thread.run(Thread.java:833) ~[?:?]
Caused by: org.springframework.amqp.AmqpRejectAndDontRequeueException: over
at com.my.app.queue.consumer.MyMessageConsumer.retrieveNcid(MyMessageConsumer.java:24) ~[classes/:1.0.0]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[?:?]
at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
at java.lang.reflect.Method.invoke(Method.java:568) ~[?:?]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:169) ~[spring-messaging-5.3.20.jar:5.3.20]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:119) ~[spring-messaging-5.3.20.jar:5.3.20]
at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:75) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:262) ~[spring-rabbit-2.4.5.jar:2.4.5]
... 27 more
MyApp: 2022-07-05 15:50:52,620 WARN [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] o.s.a.r.l.ConditionalRejectingErrorHandler:169 - Execution of Rabbit message listener failed.
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Retry Policy Exhausted
at org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer.recover(RejectAndDontRequeueRecoverer.java:76) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.config.StatelessRetryOperationsInterceptorFactoryBean.recover(StatelessRetryOperationsInterceptorFactoryBean.java:77) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.retry.interceptor.RetryOperationsInterceptor$ItemRecovererCallback.recover(RetryOperationsInterceptor.java:157) ~[spring-retry-1.3.3.jar:?]
at org.springframework.retry.support.RetryTemplate.handleRetryExhausted(RetryTemplate.java:539) ~[spring-retry-1.3.3.jar:?]
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:387) ~[spring-retry-1.3.3.jar:?]
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:225) ~[spring-retry-1.3.3.jar:?]
at org.springframework.retry.interceptor.RetryOperationsInterceptor.invoke(RetryOperationsInterceptor.java:122) ~[spring-retry-1.3.3.jar:?]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.3.20.jar:5.3.20]
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:215) ~[spring-aop-5.3.20.jar:5.3.20]
at org.springframework.amqp.rabbit.listener.$Proxy187.invokeListener(Unknown Source) ~[?:2.4.5]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1564) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1555) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1499) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:992) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:939) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:84) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1316) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1222) ~[spring-rabbit-2.4.5.jar:2.4.5]
at java.lang.Thread.run(Thread.java:833) ~[?:?]
Caused by: org.springframework.amqp.AmqpRejectAndDontRequeueException
... 19 more
Caused by: org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener method 'public void com.my.app.queue.consumer.MyMessageConsumer.retrieveNcid()' threw exception
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:271) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandlerAndProcessResult(MessagingMessageListenerAdapter.java:208) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:147) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1657) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1576) ~[spring-rabbit-2.4.5.jar:2.4.5]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[?:?]
at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
at java.lang.reflect.Method.invoke(Method.java:568) ~[?:?]
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344) ~[spring-aop-5.3.20.jar:5.3.20]
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) ~[spring-aop-5.3.20.jar:5.3.20]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.3.20.jar:5.3.20]
at org.springframework.retry.interceptor.RetryOperationsInterceptor$1.doWithRetry(RetryOperationsInterceptor.java:97) ~[spring-retry-1.3.3.jar:?]
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329) ~[spring-retry-1.3.3.jar:?]
... 14 more
Caused by: org.springframework.amqp.AmqpRejectAndDontRequeueException: over
at com.my.app.queue.consumer.MyMessageConsumer.retrieveNcid(MyMessageConsumer.java:24) ~[classes/:1.0.0]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[?:?]
at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
at java.lang.reflect.Method.invoke(Method.java:568) ~[?:?]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:169) ~[spring-messaging-5.3.20.jar:5.3.20]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:119) ~[spring-messaging-5.3.20.jar:5.3.20]
at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:75) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:262) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandlerAndProcessResult(MessagingMessageListenerAdapter.java:208) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:147) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1657) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1576) ~[spring-rabbit-2.4.5.jar:2.4.5]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[?:?]
at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
at java.lang.reflect.Method.invoke(Method.java:568) ~[?:?]
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344) ~[spring-aop-5.3.20.jar:5.3.20]
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) ~[spring-aop-5.3.20.jar:5.3.20]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.3.20.jar:5.3.20]
at org.springframework.retry.interceptor.RetryOperationsInterceptor$1.doWithRetry(RetryOperationsInterceptor.java:97) ~[spring-retry-1.3.3.jar:?]
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329) ~[spring-retry-1.3.3.jar:?]
... 14 more
但是我想将消息传递到无需重试3次的情况下将消息传递到无用的队列中。
我猜 LinereXecutionFailedException:重试策略用尽
,但我不明白为什么我的 amqprejectAnddontrequeueexception
包裹在此例外,因为我的 message> MessagePostProcessor
不抛出 listererexecutionfailedexception
,只是我投入代码的那个。
我在这里想念什么?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
正如@gary Russel所建议的那样,我修改了以下重试策略:
仅配置此新豆,一切都按预期工作!
As @Gary Russel suggested, I modified the retry policy as below:
And only configuring this new bean, everything works as expected!
在重新恢复耗尽之前,容器对例外情况一无所知。
您需要自定义重试策略,以免为该例外重试。
目前使用应用程序属性是不可能的。您需要将重试拦截器配置为bean并将其注入容器工厂。
如果您需要帮助,我可以添加一个示例。
The container knows nothing about the exception until retries are exhausted.
You would need to customize the retry policy to not retry for that exception.
This is currently not possible using application properties; you would need to configure the retry interceptor as a bean and inject it into the container factory.
If you need help with that, I can add an example.