rocketMq 消费者疑问

发布于 2022-01-04 06:11:25 字数 4636 浏览 707 评论 2

@Override
    public void receiveResult(String id, long timeout) {
        //计数器
        LOGGER.info(">>>开始接收订单结果:{}", id);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer(consumerGroup);
        defaultMQPushConsumer.setMessageModel(MessageModel.CLUSTERING);
        defaultMQPushConsumer.setNamesrvAddr(namesrvAddr);
        //从消息队列头开始消费
        defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                try {
                    String result =new String(msgs.get(0).getBody(), "UTF-8");
                    LOGGER.info(">>>接收到结果:{}",result);
                    countDownLatch.countDown();
                } catch (Exception e) {
                    LOGGER.error(">>>消息处理异常", e);
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        try {
            //设置过滤的top 和tag
            defaultMQPushConsumer.subscribe(topic, id);
            // 启动消费者  等待结果
            defaultMQPushConsumer.start();
            if (countDownLatch.await(timeout, TimeUnit.MILLISECONDS)) {
                LOGGER.info(">>>接收成功");
                return;
            }
            LOGGER.info(">>>接收超时");
        } catch (Exception e) {
            LOGGER.error(">>>等待消息异常", e);
        } finally {
            shutdown(defaultMQPushConsumer);
        }
        
    }

    private void shutdown(DefaultMQPushConsumer defaultMQPushConsumer) {
        try {
            defaultMQPushConsumer.shutdown();
        } catch (Exception e) {
            LOGGER.info(">>>资源释放异常", e);
        }
    }

 

有个业务场景: 在一个同步线程A过程中 执行到某个点的时候暂停 需要等待另外一个线程B通知 才继续往下面执行。 单机的话 可以直接使用countDownLatch 实现。但是在分布式环境中 B线程可能在另外一台机器上。所以我引入了rocketMq. 每次根据订单号传给tag 进行过滤。但是这样的话 就需要每次创建和销毁consumer。总是感觉这样做 不好。但是又没有其他方式实现。请rocketMq大神帮忙看看。

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

各自安好 2022-01-05 11:28:24

这个跟顺序消息 没什么关系吧

高跟鞋的旋律 2022-01-04 17:08:41

rocketMQ不是好像有个顺序消息么?

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文