在Incomig消息达到用@rabbitlistener注释的方法之前,我该如何拦截它们?
我首先设置了一个拦截器,以进行外向消息,该消息运行顺利,但是 当我尝试拦截消费者中的不连续消息时,后处理方法 被跳过,消息到达用@rabbitlistener注释的方法,Bellow是我整个Proccess的代码,我不重要的代码。
生产者
rabbitmqproducerinterceptor
@Component
@Slf4j
public class RabbitMQProducerInterceptor implements MessagePostProcessor {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
log.info("Getting the current HttpServletRequest");
HttpServletRequest req = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes())
.getRequest();
log.info("Extracting the X-REQUEST-ID from the header of the HttpServletRequest");
String XRequestId = req.getHeader(ShareableConstants.X_REQUEST_ID_HEADER);
log.info("Adding X-REQUEST-ID {} to the RabbitMQ Producer Header", XRequestId);
message.getMessageProperties().getHeaders().put(ShareableConstants.X_REQUEST_ID_HEADER, XRequestId);
return message;
}
}
rabbitmqproducerconfig
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setReplyTimeout(60000);
MessagePostProcessor[] processors = {new RabbitMQProducerInterceptor()};
rabbitTemplate.addBeforePublishPostProcessors(processors);
return rabbitTemplate;
}
向消费者发送消息,向消费者
用户生产商
public UserRegistrationResponseDTO register(UserRegistrationDTO userRegistrationDTO) {
log.info("Sending user registration request {}", userRegistrationDTO);
UserRegistrationDTO response = (UserRegistrationDTO) rabbitTemplate
.convertSendAndReceive(ShareableConstants.EXCHANGE_NAME,
ShareableConstants.CREATE_USER_ROUTING_KEY,
userRegistrationDTO);
return UserRegistrationResponseDTO.builder()
.username(response.getUsername())
.id(response.getId())
.createdAt(response.getCreatedAt()).build();
}
strong>消费者
rabbitmqconmerconfig
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
MessagePostProcessor[] processors = {new RabbitMQConsumerInterceptor()};
rabbitTemplate.setAfterReceivePostProcessors(processors);
return rabbitTemplate;
}
rabbitmqconsumerIntercteptor
@Component
public class RabbitMQConsumerInterceptor implements MessagePostProcessor {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
String XRequestId = message.getMessageProperties().getHeader(ShareableConstants.X_REQUEST_ID_HEADER);
MDC.put(ShareableConstants.X_REQUEST_ID_HEADER, XRequestId);
return message;
}
}
用户消费者
@RabbitListener(bindings =
@QueueBinding(exchange = @Exchange(ShareableConstants.EXCHANGE_NAME),
key = ShareableConstants.CREATE_USER_ROUTING_KEY,
value = @Queue(ShareableConstants.USER_REGISTRATION_QUEUE_NAME)))
public UserRegistrationDTO receiveUser(UserRegistrationDTO userRegistrationDTO) {
log.info("receiving user {} to register ", userRegistrationDTO);
User user = Optional.of(userRegistrationDTO).map(User::new).get();
User createdUser = userService.register(user);
UserRegistrationDTO registrationDTO = UserRegistrationDTO.builder()
.id(createdUser.getId())
.username(createdUser.getUsername())
.createdAt(createdUser.getCreationDate())
.build();
return registrationDTO;
}
这是代码,唯一不例外,唯一的是问题是拦截器被跳过
I Started by setting up an interceptor for outgoing messages which is working smoothly, but
when i try to intercept incomming messages in the consumers, the postProcessMessage method
is skipped and the message reaches the method annotated with @RabbitListener, bellow is my code for the whole proccess, i ommited unimportant code.
Producer
RabbitMQProducerInterceptor
@Component
@Slf4j
public class RabbitMQProducerInterceptor implements MessagePostProcessor {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
log.info("Getting the current HttpServletRequest");
HttpServletRequest req = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes())
.getRequest();
log.info("Extracting the X-REQUEST-ID from the header of the HttpServletRequest");
String XRequestId = req.getHeader(ShareableConstants.X_REQUEST_ID_HEADER);
log.info("Adding X-REQUEST-ID {} to the RabbitMQ Producer Header", XRequestId);
message.getMessageProperties().getHeaders().put(ShareableConstants.X_REQUEST_ID_HEADER, XRequestId);
return message;
}
}
RabbitMQProducerConfig
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setReplyTimeout(60000);
MessagePostProcessor[] processors = {new RabbitMQProducerInterceptor()};
rabbitTemplate.addBeforePublishPostProcessors(processors);
return rabbitTemplate;
}
Sending a message to the consumer
User Producer
public UserRegistrationResponseDTO register(UserRegistrationDTO userRegistrationDTO) {
log.info("Sending user registration request {}", userRegistrationDTO);
UserRegistrationDTO response = (UserRegistrationDTO) rabbitTemplate
.convertSendAndReceive(ShareableConstants.EXCHANGE_NAME,
ShareableConstants.CREATE_USER_ROUTING_KEY,
userRegistrationDTO);
return UserRegistrationResponseDTO.builder()
.username(response.getUsername())
.id(response.getId())
.createdAt(response.getCreatedAt()).build();
}
Consumer
RabbitMQConsumerConfig
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
MessagePostProcessor[] processors = {new RabbitMQConsumerInterceptor()};
rabbitTemplate.setAfterReceivePostProcessors(processors);
return rabbitTemplate;
}
RabbitMQConsumerInterceptor
@Component
public class RabbitMQConsumerInterceptor implements MessagePostProcessor {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
String XRequestId = message.getMessageProperties().getHeader(ShareableConstants.X_REQUEST_ID_HEADER);
MDC.put(ShareableConstants.X_REQUEST_ID_HEADER, XRequestId);
return message;
}
}
User Consumer
@RabbitListener(bindings =
@QueueBinding(exchange = @Exchange(ShareableConstants.EXCHANGE_NAME),
key = ShareableConstants.CREATE_USER_ROUTING_KEY,
value = @Queue(ShareableConstants.USER_REGISTRATION_QUEUE_NAME)))
public UserRegistrationDTO receiveUser(UserRegistrationDTO userRegistrationDTO) {
log.info("receiving user {} to register ", userRegistrationDTO);
User user = Optional.of(userRegistrationDTO).map(User::new).get();
User createdUser = userService.register(user);
UserRegistrationDTO registrationDTO = UserRegistrationDTO.builder()
.id(createdUser.getId())
.username(createdUser.getUsername())
.createdAt(createdUser.getCreationDate())
.build();
return registrationDTO;
}
Here's the code, no exception is thrown, the only problem is the Interceptor being skipped
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
rabbittemplate
不用于接收@rabbitlistener
的消息;听众容器收到消息;您必须在侦听器容器工厂设置AfterReceivePostProcessor
。如果使用Spring Boot,只需添加自动配置
simpleerabbitlistenerconercontainerfactory
作为参数,将您的另一个@bean
s添加到它。The
RabbitTemplate
is not used to receive messages for a@RabbitListener
; messages are received by a listener container; you have to set theafterReceivePostProcessors
on the listener container factory.If you are using Spring Boot, just add the auto-configured
SimpleRabbitListenerContainerFactory
as a parameter to one of your other@Bean
s and set the MPP on it.