Spring整合Mina和RabbitMQ的问题

发布于 2021-12-05 22:32:58 字数 8132 浏览 847 评论 0

各位好,想请教一个比较奇怪的问题。在项目中,通过mina接受远程telnet命令,然后将命令中的请求参数放到消息队列里面,由消费者端去异步处理。但是在测试过程中发现,第一次的telnet命令,mq消费者端日志显示确认完成,但是onMessage方法日志内并未打印,业务代码就没执行。但是第二次发送同一个命令之后,一切就正常了。

1. spring-mq配置文件

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
   xmlns:rabbit="http://www.springframework.org/schema/rabbit"
   xsi:schemaLocation="http://www.springframework.org/schema/beans 
    http://www.springframework.org/schema/beans/spring-beans-3.0.xsd 
    http://www.springframework.org/schema/rabbit
    http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd
    http://www.springframework.org/schema/context  
    http://www.springframework.org/schema/context/spring-context-3.0.xsd">


   <bean id="connectionFactory"
      class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
      <property name="host" value="${mq.host}" />
      <property name="username" value="${mq.username}" />
      <property name="password" value="${mq.password}" />
      <property name="port" value="${mq.port}" />
      <property name="requestedHeartBeat" value="${mq.requestedHeartbeat}"/>
      <property name="publisherConfirms" value="${mq.publisherConfirms}" />
      <property name="publisherReturns" value="${mq.publisherReturns}" />
   </bean>

   <rabbit:listener-container id="listenerContainer"
      acknowledge="manual" connection-factory="connectionFactory">
      <rabbit:listener ref="workerListener" queues="${mq.queue.lane.name}" />
   </rabbit:listener-container>

   <rabbit:admin connection-factory="connectionFactory" />

   <rabbit:queue auto-delete="false" durable="true"
      exclusive="false" id="queue_lane" name="${mq.queue.lane.name}" />

   <rabbit:queue auto-delete="false" durable="true"
      exclusive="false" id="queue_payment" name="${mq.queue.payment.name}" />

   <bean id="messageConverter"
        class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />

   <rabbit:direct-exchange name="directExchange"
      auto-delete="false" durable="true" id="directExchange">
      <rabbit:bindings>
         <rabbit:binding queue="queue_lane" key="${mq.queue.lane.name}" />
         <rabbit:binding queue="queue_payment" key="${mq.queue.payment.name}" />
      </rabbit:bindings>
   </rabbit:direct-exchange>

   <bean id="rabbitTempate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
      <property name="connectionFactory" ref="connectionFactory" />
      <property name="confirmCallback" ref="callbackListener" />
      <property name="returnCallback" ref="returnCallbackListener" />
   </bean>

</beans>

2. consumer端代码

@Component("workerListener")
public class LaneWorkerListener implements ChannelAwareMessageListener {
    private static final Logger log = LoggerFactory.getLogger(LaneWorkerListener.class);

    @Autowired
    private LaneProcessor processor;

    public LaneWorkerListener() {
    }

    public void onMessage(Message message, Channel channel) throws Exception {
        try {
            Object receivedObject = SerializationUtils.deserialize(message.getBody());
            log.info("消费者端接收到消息:{}, 消息properties:{} ", receivedObject, message.getMessageProperties());
            // business operations.
            if (receivedObject instanceof ParamsWrapper) {
                ParamsWrapper wrapper = (ParamsWrapper) receivedObject;
                processor.process(SessionManager.getSession(wrapper.getSessionId()), wrapper.getParams());
            }
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); // false只确认当前一个消息收到,true确认所有consumer获得的消息
        } catch (Exception e) {
            log.error(String.format("消息确认失败,详细信息:", e));
            if (message.getMessageProperties().getRedelivered()) {
                log.error("消息MessageId:{}已重复处理失败,拒绝再次接收...", message.getMessageProperties().getMessageId());
                channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息
            } else {
                log.error("消息MessageId:{}即将再次返回队列处理...", message.getMessageProperties().getMessageId());
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); // requeue为是否重新回到队列
            }
        }
    }
}

3.另外还有两个确认机制的监听器

@Component("callbackListener")
public class LaneWorkerCallbackListener implements ConfirmCallback {

   private static final Logger log = Logger.getLogger(LaneWorkerCallbackListener.class);
   
   public void confirm(CorrelationData correlationData, boolean ack, String cause) {
      log.info(String.format("确认消息完成...,{%s},{%s},{%s}", correlationData, ack, cause));
   }
}
@Component("returnCallbackListener")
public class LaneWorkerReturnedCallbackListener implements ReturnCallback {
   private static final Logger log = Logger.getLogger(LaneWorkerCallbackListener.class);

   public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
      log.info(String.format("消息返回处理中...,{%s},{%s},{%s},{%s},{%s}", message, replyCode, replyText,exchange,routingKey));
   }

}

 

在mina中的消息发送:

1. mina spring 配置

<bean id="ioAcceptor" class="org.apache.mina.transport.socket.nio.NioSocketAcceptor" init-method="bind" destroy-method="destroy">
   <property name="defaultLocalAddress" ref="defaultLocalAddress"/>
   <property name="handler" ref="serverIoHandler" />
   <property name="filterChainBuilder" ref="filterChainBuilder" />
   <property name="reuseAddress" value="true" />
</bean>

2. 主要的消息接受处理handler  ServerIOHandler

@Service
public class ServerIoHandler extends IoHandlerAdapter {

   private static final Logger LOGGER = Logger.getLogger(ServerIoHandler.class);

   @Autowired
   private CacheManager cacheManager;
   @Autowired
   private ILaneMessageProducer producer;
   @Autowired
   private LaneQueueContainer queues;
   
   @Override
   public void sessionCreated(IoSession session) throws Exception {
      //为session设置唯一主键id,暂用毫秒时间戳策略。缓存session
      session.setAttributeIfAbsent(LaneConstants.MINA_SESSION_PRIMARY_KEY, System.currentTimeMillis());
      Object sessionId = session.getAttribute(LaneConstants.MINA_SESSION_PRIMARY_KEY);
      SessionManager.addSession(Long.valueOf(sessionId.toString()), session);
      //并且存在缓存中
      String address = ((InetSocketAddress)session.getRemoteAddress()).getAddress().getHostAddress();
      cacheManager.setPerpetual(LaneConfig.getEnv().concat(LaneConstants.CACHE_LANE_SESSION_KEY), address);
   }

   @Override
   public void messageReceived(IoSession session, Object message) throws Exception {
      try {
         LOGGER.info(String.format("Start: [%s] -- address:[%s]",message.toString(),session.getRemoteAddress().toString()));
         // 解密消息
         if (LaneConfig.isEncryptedMsg()) {
            message = AESUtils.decrypt(message.toString());
         }
         JSONObject msg = parseParam(message.toString());
         Long spid = Long.valueOf(session.getAttribute(LaneConstants.MINA_SESSION_PRIMARY_KEY).toString());//session id.
         //消息存入消息队列
         producer.sendMessage(queues.getQueueLane(), new ParamsWrapper(spid, msg));
         LOGGER.info(String.format(">>>>>>消息{%s}已被放入队列中.....时间:[%s]", message.toString(),DateUtils.getCurrentDateTimeMilliSecond(new Date())));
      } catch (Exception e) {
         SocketResponse srs = new SocketResponse();
         srs.setCode(SocketResponse.INTERNAL_SERVER_ERROR);
         srs.setMsg("内部错误");
         session.write(LaneUtils.parseReturn(srs));
         LOGGER.error("Exception:", e);
      } finally {
         message = null;
      }
   }

还请各位大侠多多指点。。。。谢谢!

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文