kafka集成zookeeper,使用最新kafka-client,自动提交,测试异常情况,数据未丢失?
如题和下面consumer代码片段:
Properties props = new Properties(); props.put("bootstrap.servers", "192.168.83.20:9092"); props.put("metadata.broker.list", "192.168.83.20:2181"); props.put("group.id", "default1"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); consumer.subscribe(Arrays.asList("top1")); final int minBatchSize = 1; List<ConsumerRecord<String, String>> buffer = new ArrayList<>(); System.out.println("进入了......"); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.println("进入了......records"); buffer.add(record); try { Thread.sleep(1000); //到这个地方的时候过大概4、5s,把consumer停掉 System.out.println("进入了......records--1000---"); Thread.sleep(20000); } catch (Exception e) { e.printStackTrace(); } System.out.println("fetched from partition " + record.partition() + ", offset: " + record.offset() + ", message: " + record.value()); } // consumer.commitAsync(); }就是producer发送消息 。然后consumer端接受消息,consumer里让当前操作睡眠20s, 在consumer接收到消息5s左右,把consumer停掉。设置自动提交,而且时间是1s,经测试发现数据未丢失,什么情况 ?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
代码和你描述没看太懂,不过consumer停不停跟收到数据没有什么关系。kfk默认保存7天的数据,停这一会儿,在重启consumer后肯定会收到数据