rocketMq 消费者疑问
@Override
public void receiveResult(String id, long timeout) {
//计数器
LOGGER.info(">>>开始接收订单结果:{}", id);
CountDownLatch countDownLatch = new CountDownLatch(1);
DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer(consumerGroup);
defaultMQPushConsumer.setMessageModel(MessageModel.CLUSTERING);
defaultMQPushConsumer.setNamesrvAddr(namesrvAddr);
//从消息队列头开始消费
defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
String result =new String(msgs.get(0).getBody(), "UTF-8");
LOGGER.info(">>>接收到结果:{}",result);
countDownLatch.countDown();
} catch (Exception e) {
LOGGER.error(">>>消息处理异常", e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
try {
//设置过滤的top 和tag
defaultMQPushConsumer.subscribe(topic, id);
// 启动消费者 等待结果
defaultMQPushConsumer.start();
if (countDownLatch.await(timeout, TimeUnit.MILLISECONDS)) {
LOGGER.info(">>>接收成功");
return;
}
LOGGER.info(">>>接收超时");
} catch (Exception e) {
LOGGER.error(">>>等待消息异常", e);
} finally {
shutdown(defaultMQPushConsumer);
}
}
private void shutdown(DefaultMQPushConsumer defaultMQPushConsumer) {
try {
defaultMQPushConsumer.shutdown();
} catch (Exception e) {
LOGGER.info(">>>资源释放异常", e);
}
}
有个业务场景: 在一个同步线程A过程中 执行到某个点的时候暂停 需要等待另外一个线程B通知 才继续往下面执行。 单机的话 可以直接使用countDownLatch 实现。但是在分布式环境中 B线程可能在另外一台机器上。所以我引入了rocketMq. 每次根据订单号传给tag 进行过滤。但是这样的话 就需要每次创建和销毁consumer。总是感觉这样做 不好。但是又没有其他方式实现。请rocketMq大神帮忙看看。
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
这个跟顺序消息 没什么关系吧
rocketMQ不是好像有个顺序消息么?