kafka的消费者stream iterator block,读不出消息?

发布于 2021-12-03 16:13:11 字数 2638 浏览 696 评论 3

rt,能够向kafka produce数据,kafka里也看得到,但是consumer却读不出数据,offset是0,程序在stream的iterator被阻塞了。demo代码如下:

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
 
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.serializer.StringDecoder;
import kafka.utils.VerifiableProperties;
 
/**
 * Created by Administrator on 2015/10/10.
 */
public class KafkaConsumer {
 
    private final ConsumerConnector consumer;
 
    private KafkaConsumer() {
        Properties props = new Properties();
        props.put("auto.offset.reset", "smallest");
        //zookeeper 配置
        props.put("zookeeper.connect", "Master:2181");
 
        //group 代表一个消费组
        props.put("group.id", "consumer3");
 
        // 连接zk的session超时时间
        props.put("zookeeper.session.timeout.ms", "4000");
        props.put("zookeeper.sync.time.ms", "200");//zk follower落后于zk leader的最长时间        props.put("auto.commit.interval.ms", "1000");//往zookeeper上写offset的频率        props.put("auto.offset.reset", "smallest");//如果offset出了返回,则 smallest: 自动设置reset到最小的offset. largest : 自动设置offset到最大的offset. 其它值不允许,会抛出异常        
        //序列化类 
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("auto.commit.interval.ms", "1000");
        ConsumerConfig config = new ConsumerConfig(props);
 
        consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config);
    }
 
    void consume() {
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put("consumertest", new Integer(1));
 
        StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
        StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());
 
        Map<String, List<KafkaStream<String, String>>> consumerMap =
                consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder);
        KafkaStream<String, String> stream = consumerMap.get("consumertest").get(0);
        ConsumerIterator<String, String> it = stream.iterator();
        System.out.println("consumer starting...");
        while (it.hasNext())
            System.out.println(it.next().message());
        System.out.println("consumer over");
    }
 
    public static void main(String[] args) {
        new KafkaConsumer().consume();
    }
}


谢谢~



如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

爱你是孤单的心事 2021-12-07 11:53:40

现在记不清了,当时好像换了个低版本的kafka就可以了... 佛系解决...

等风来 2021-12-06 12:00:13

回复
@什么都没想到 : 呃, 回头我也试试, 可能是版本不兼容吧. 我那个项目都一直烂尾状态....

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文