rocketmq 集群,多分区 多消费者
@linkgo 你好,想跟你请教个问题:
MQ 我们用16个队列,用了pull 方式消费,数据量很大,单个consumer 消费太慢,因此我们想用多个consumer,但是多个consumer 由于offset 是记录在consumer本地,而且consumer 可能是一个机器上的不同JVM,每个客户端拿到的offset 重复了,会导致重复消费。
kafka 有个分区消费,多个分区 多个消费者,这样就不会重复了,RocketMQ 也有,但是没找到demo,给点例子吗?
下面是代码
MQPullConsumerScheduleService scheduleService.registerPullTaskCallback(mqConfig.getTopic(), new PullTaskCallback() { public void doPullTask(MessageQueue mq, PullTaskContext context) { DefaultMQPullConsumer consumer = (DefaultMQPullConsumer)context.getPullConsumer(); try { long offset = OffsetManager.getOffset(mq,consumer); String nameAddress = consumer.getNamesrvAddr(); String offsetAddress = OffsetManager.getMqPath(mq,nameAddress);
PullResult pullResult = consumer.pull(mq, mqConfig.getSubExpression(), offset, 32);
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(5)
消费端会通过RebalanceService线程,10秒钟做一次基于topic下的所有队列负载: 遍历Consumer下的所有topic,然后根据topic订阅所有的消息获取同一topic和Consumer Group下的所有Consumer然后根据具体的分配策略来分配消费队列,分配的策略包含:平均分配、消费端配置等
这个在push 模式的时候很方便,但是pull 的情况,比如Q1.Q2两个队列。 2个 consumer, C1,C2. 如果C1 从Q1,Q2 拉取了一批数据,都从1开始拉取,记录位置,Q1-100,Q2-100,但是位置还没更新到ZK 。这时候C2 也去拉取消息就会拉取拉取刚才的消息。虽然是业务是 幂等的,但是无形的就增加了 服务器压力,以及HBASE 压力
哥们,这个MQ 不能像kafak 从ZK 里面获取brokerName 和msgQueue 信息。 只能启动一个consumer 才能获取。为了获取信息,先启动一个consumer,然后 我需要另外的consumer 进行消费。。就不行了。。。!因为 要做到 一个consumer 消费一个 MsgQueue
消费者是幂等函数就好
offset 是写在zk 上的。但是只能启动一个consumer ,如果多个consumer 会读取相同的offset 地址,重复消费。 因此必须 N个consumer 消费N个queue ,才能不重复。这种方式没找到。 还有pull 模式,客户端设置了 大于32个,但是无法超过32,是不是得服务端设置最大值。