kafka的消费者stream iterator block,读不出消息?
rt,能够向kafka produce数据,kafka里也看得到,但是consumer却读不出数据,offset是0,程序在stream的iterator被阻塞了。demo代码如下:
import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.serializer.StringDecoder; import kafka.utils.VerifiableProperties; /** * Created by Administrator on 2015/10/10. */ public class KafkaConsumer { private final ConsumerConnector consumer; private KafkaConsumer() { Properties props = new Properties(); props.put("auto.offset.reset", "smallest"); //zookeeper 配置 props.put("zookeeper.connect", "Master:2181"); //group 代表一个消费组 props.put("group.id", "consumer3"); // 连接zk的session超时时间 props.put("zookeeper.session.timeout.ms", "4000"); props.put("zookeeper.sync.time.ms", "200");//zk follower落后于zk leader的最长时间 props.put("auto.commit.interval.ms", "1000");//往zookeeper上写offset的频率 props.put("auto.offset.reset", "smallest");//如果offset出了返回,则 smallest: 自动设置reset到最小的offset. largest : 自动设置offset到最大的offset. 其它值不允许,会抛出异常 //序列化类 props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("auto.commit.interval.ms", "1000"); ConsumerConfig config = new ConsumerConfig(props); consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config); } void consume() { Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put("consumertest", new Integer(1)); StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties()); StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties()); Map<String, List<KafkaStream<String, String>>> consumerMap = consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder); KafkaStream<String, String> stream = consumerMap.get("consumertest").get(0); ConsumerIterator<String, String> it = stream.iterator(); System.out.println("consumer starting..."); while (it.hasNext()) System.out.println(it.next().message()); System.out.println("consumer over"); } public static void main(String[] args) { new KafkaConsumer().consume(); } }
谢谢~
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(3)
现在记不清了,当时好像换了个低版本的kafka就可以了... 佛系解决...
回复
@什么都没想到 : 呃, 回头我也试试, 可能是版本不兼容吧. 我那个项目都一直烂尾状态....
help~~~