rocketmq 集群,多分区 多消费者

发布于 2021-11-27 17:06:32 字数 1184 浏览 938 评论 5

@linkgo 你好,想跟你请教个问题:

            MQ 我们用16个队列,用了pull 方式消费,数据量很大,单个consumer 消费太慢,因此我们想用多个consumer,但是多个consumer 由于offset 是记录在consumer本地,而且consumer 可能是一个机器上的不同JVM,每个客户端拿到的offset  重复了,会导致重复消费。

            kafka 有个分区消费,多个分区 多个消费者,这样就不会重复了,RocketMQ 也有,但是没找到demo,给点例子吗?

            下面是代码

MQPullConsumerScheduleService scheduleService.registerPullTaskCallback(mqConfig.getTopic(), new PullTaskCallback() { public void doPullTask(MessageQueue mq, PullTaskContext context) {

        DefaultMQPullConsumer consumer = (DefaultMQPullConsumer)context.getPullConsumer(); try { long offset = OffsetManager.getOffset(mq,consumer);
            String nameAddress = consumer.getNamesrvAddr();
            String offsetAddress = OffsetManager.getMqPath(mq,nameAddress);
PullResult pullResult = consumer.pull(mq, mqConfig.getSubExpression(), offset, 32);


而且还有个问题,每次pull 32 是服务端有限制吗?我改数字 1-32 都可以,超过32了,那么还是只能一次32条,不能更多le

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(5

猫性小仙女 2021-12-03 14:42:44

消费端会通过RebalanceService线程,10秒钟做一次基于topic下的所有队列负载: 遍历Consumer下的所有topic,然后根据topic订阅所有的消息获取同一topic和Consumer Group下的所有Consumer然后根据具体的分配策略来分配消费队列,分配的策略包含:平均分配、消费端配置等

奢望 2021-12-03 06:36:34

这个在push 模式的时候很方便,但是pull 的情况,比如Q1.Q2两个队列。 2个 consumer, C1,C2. 如果C1 从Q1,Q2 拉取了一批数据,都从1开始拉取,记录位置,Q1-100,Q2-100,但是位置还没更新到ZK 。这时候C2 也去拉取消息就会拉取拉取刚才的消息。虽然是业务是 幂等的,但是无形的就增加了 服务器压力,以及HBASE 压力

空城仅有旧梦在 2021-12-03 02:41:16

哥们,这个MQ 不能像kafak 从ZK 里面获取brokerName 和msgQueue 信息。 只能启动一个consumer 才能获取。为了获取信息,先启动一个consumer,然后 我需要另外的consumer 进行消费。。就不行了。。。!因为 要做到 一个consumer 消费一个 MsgQueue

心舞飞扬 2021-12-02 20:21:07

消费者是幂等函数就好

檐上三寸雪 2021-11-30 00:23:25

offset 是写在zk 上的。但是只能启动一个consumer ,如果多个consumer 会读取相同的offset 地址,重复消费。 因此必须 N个consumer 消费N个queue ,才能不重复。这种方式没找到。 还有pull 模式,客户端设置了 大于32个,但是无法超过32,是不是得服务端设置最大值。

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文