Spring整合Mina和RabbitMQ的问题
各位好,想请教一个比较奇怪的问题。在项目中,通过mina接受远程telnet命令,然后将命令中的请求参数放到消息队列里面,由消费者端去异步处理。但是在测试过程中发现,第一次的telnet命令,mq消费者端日志显示确认完成,但是onMessage方法日志内并未打印,业务代码就没执行。但是第二次发送同一个命令之后,一切就正常了。
1. spring-mq配置文件
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd"> <bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory"> <property name="host" value="${mq.host}" /> <property name="username" value="${mq.username}" /> <property name="password" value="${mq.password}" /> <property name="port" value="${mq.port}" /> <property name="requestedHeartBeat" value="${mq.requestedHeartbeat}"/> <property name="publisherConfirms" value="${mq.publisherConfirms}" /> <property name="publisherReturns" value="${mq.publisherReturns}" /> </bean> <rabbit:listener-container id="listenerContainer" acknowledge="manual" connection-factory="connectionFactory"> <rabbit:listener ref="workerListener" queues="${mq.queue.lane.name}" /> </rabbit:listener-container> <rabbit:admin connection-factory="connectionFactory" /> <rabbit:queue auto-delete="false" durable="true" exclusive="false" id="queue_lane" name="${mq.queue.lane.name}" /> <rabbit:queue auto-delete="false" durable="true" exclusive="false" id="queue_payment" name="${mq.queue.payment.name}" /> <bean id="messageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" /> <rabbit:direct-exchange name="directExchange" auto-delete="false" durable="true" id="directExchange"> <rabbit:bindings> <rabbit:binding queue="queue_lane" key="${mq.queue.lane.name}" /> <rabbit:binding queue="queue_payment" key="${mq.queue.payment.name}" /> </rabbit:bindings> </rabbit:direct-exchange> <bean id="rabbitTempate" class="org.springframework.amqp.rabbit.core.RabbitTemplate"> <property name="connectionFactory" ref="connectionFactory" /> <property name="confirmCallback" ref="callbackListener" /> <property name="returnCallback" ref="returnCallbackListener" /> </bean> </beans>
2. consumer端代码
@Component("workerListener") public class LaneWorkerListener implements ChannelAwareMessageListener { private static final Logger log = LoggerFactory.getLogger(LaneWorkerListener.class); @Autowired private LaneProcessor processor; public LaneWorkerListener() { } public void onMessage(Message message, Channel channel) throws Exception { try { Object receivedObject = SerializationUtils.deserialize(message.getBody()); log.info("消费者端接收到消息:{}, 消息properties:{} ", receivedObject, message.getMessageProperties()); // business operations. if (receivedObject instanceof ParamsWrapper) { ParamsWrapper wrapper = (ParamsWrapper) receivedObject; processor.process(SessionManager.getSession(wrapper.getSessionId()), wrapper.getParams()); } channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); // false只确认当前一个消息收到,true确认所有consumer获得的消息 } catch (Exception e) { log.error(String.format("消息确认失败,详细信息:", e)); if (message.getMessageProperties().getRedelivered()) { log.error("消息MessageId:{}已重复处理失败,拒绝再次接收...", message.getMessageProperties().getMessageId()); channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息 } else { log.error("消息MessageId:{}即将再次返回队列处理...", message.getMessageProperties().getMessageId()); channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); // requeue为是否重新回到队列 } } } }
3.另外还有两个确认机制的监听器
@Component("callbackListener") public class LaneWorkerCallbackListener implements ConfirmCallback { private static final Logger log = Logger.getLogger(LaneWorkerCallbackListener.class); public void confirm(CorrelationData correlationData, boolean ack, String cause) { log.info(String.format("确认消息完成...,{%s},{%s},{%s}", correlationData, ack, cause)); } }
@Component("returnCallbackListener") public class LaneWorkerReturnedCallbackListener implements ReturnCallback { private static final Logger log = Logger.getLogger(LaneWorkerCallbackListener.class); public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info(String.format("消息返回处理中...,{%s},{%s},{%s},{%s},{%s}", message, replyCode, replyText,exchange,routingKey)); } }
在mina中的消息发送:
1. mina spring 配置
<bean id="ioAcceptor" class="org.apache.mina.transport.socket.nio.NioSocketAcceptor" init-method="bind" destroy-method="destroy"> <property name="defaultLocalAddress" ref="defaultLocalAddress"/> <property name="handler" ref="serverIoHandler" /> <property name="filterChainBuilder" ref="filterChainBuilder" /> <property name="reuseAddress" value="true" /> </bean>
2. 主要的消息接受处理handler ServerIOHandler
@Service public class ServerIoHandler extends IoHandlerAdapter { private static final Logger LOGGER = Logger.getLogger(ServerIoHandler.class); @Autowired private CacheManager cacheManager; @Autowired private ILaneMessageProducer producer; @Autowired private LaneQueueContainer queues; @Override public void sessionCreated(IoSession session) throws Exception { //为session设置唯一主键id,暂用毫秒时间戳策略。缓存session session.setAttributeIfAbsent(LaneConstants.MINA_SESSION_PRIMARY_KEY, System.currentTimeMillis()); Object sessionId = session.getAttribute(LaneConstants.MINA_SESSION_PRIMARY_KEY); SessionManager.addSession(Long.valueOf(sessionId.toString()), session); //并且存在缓存中 String address = ((InetSocketAddress)session.getRemoteAddress()).getAddress().getHostAddress(); cacheManager.setPerpetual(LaneConfig.getEnv().concat(LaneConstants.CACHE_LANE_SESSION_KEY), address); } @Override public void messageReceived(IoSession session, Object message) throws Exception { try { LOGGER.info(String.format("Start: [%s] -- address:[%s]",message.toString(),session.getRemoteAddress().toString())); // 解密消息 if (LaneConfig.isEncryptedMsg()) { message = AESUtils.decrypt(message.toString()); } JSONObject msg = parseParam(message.toString()); Long spid = Long.valueOf(session.getAttribute(LaneConstants.MINA_SESSION_PRIMARY_KEY).toString());//session id. //消息存入消息队列 producer.sendMessage(queues.getQueueLane(), new ParamsWrapper(spid, msg)); LOGGER.info(String.format(">>>>>>消息{%s}已被放入队列中.....时间:[%s]", message.toString(),DateUtils.getCurrentDateTimeMilliSecond(new Date()))); } catch (Exception e) { SocketResponse srs = new SocketResponse(); srs.setCode(SocketResponse.INTERNAL_SERVER_ERROR); srs.setMsg("内部错误"); session.write(LaneUtils.parseReturn(srs)); LOGGER.error("Exception:", e); } finally { message = null; } }
还请各位大侠多多指点。。。。谢谢!
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论