tomcat启动,运行kafka-consumer端代码报错

发布于 2021-12-01 13:05:47 字数 3979 浏览 804 评论 1

启动tomcat,访问kafka-Consumer端代码(注意是从tomcat启动访问startJob()方法

public void startJob() throws Exception{
        Properties props1 = new Properties();
        props1.put("zookeeper.connect", "10.0.11.43:2181/kafka");
        props1.put("group.id", "solr-consumertest4");
        props1.put("rebalance.max.retries", "5");
        props1.put("rebalance.backoff.ms", "2000");
        props1.put("zookeeper.session.timeout.ms", "5000");
        props1.put("auto.offset.reset", "smallest");
        props1.put("zookeeper.connectiontimeout.ms", "100000");
        props1.put("zookeeper.session.timeout.ms", "40000");
        props1.put("zookeeper.sync.time.ms", "200");
        props1.put("auto.commit.interval.ms", "100");
        ConsumerConfig consumerConfig = new ConsumerConfig(props1);
        
        ConsumerConnector javaConsumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
        
        Whitelist whitelist = new Whitelist(topic);
        
        List<KafkaStream<byte[], byte[]>> partitions = javaConsumerConnector.createMessageStreamsByFilter(whitelist);
        
        if (CollectionUtils.isEmpty(partitions)) {
            System.out.println("empty!");
            TimeUnit.SECONDS.sleep(1);
        }
        
        //消费消息
         for (KafkaStream<byte[], byte[]> partition : partitions) {
             ConsumerIterator<byte[], byte[]> iterator = partition.iterator();
             while (iterator.hasNext()) {
                 MessageAndMetadata<byte[], byte[]> next = iterator.next();
                 System.out.println("partiton:" + next.partition());
                 System.out.println("offset:" + next.offset());
                 System.out.println("message:" + new String(next.message(), "utf-8"));
             }
         }
    }

这行报错:List<KafkaStream<byte[], byte[]>> partitions = javaConsumerConnector.createMessageStreamsByFilter(whitelist);

Caused by: kafka.common.ConsumerRebalanceFailedException: solr-consumertest3_kongdeyu-1472724776895-ef4780df can't rebalance after 5 retries

网上查都是说有两种情况

1同一个消费者组(consumer group)有多个consumer先后启动,就是一个消费者组内有多个consumer同时负载消费多个partition数据.

2是将consumer端配置改为rebalance.max.retries * rebalance.backoff.ms > zookeeper.session.timeout.ms

可我仔细检查代码,确认就只有启动了一个消费者来取数据,consumer端的配置也改成对应的关系了,可还是报错

死活就是报错

后来我改成从main函数进入startJob()方法

居然就可以取到kafka上的消息了,没有任何错误

真是奇怪不明白为什么,难道kafka不能再tomcat 中运行吗?

求大神解惑

小弟,不胜感激


如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

命硬 2021-12-01 19:57:58

最后找到问题了是jar包的原因

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文