kafka集成zookeeper,使用最新kafka-client,自动提交,测试异常情况,数据未丢失?

发布于 2021-12-02 11:58:12 字数 1616 浏览 738 评论 1

如题和下面consumer代码片段:


Properties props = new Properties();
props.put("bootstrap.servers", "192.168.83.20:9092");
props.put("metadata.broker.list", "192.168.83.20:2181");
props.put("group.id", "default1");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList("top1"));
final int minBatchSize = 1;
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
System.out.println("进入了......");
while (true) {
  ConsumerRecords<String, String> records = consumer.poll(100);
  for (ConsumerRecord<String, String> record : records) {
   System.out.println("进入了......records");
	  buffer.add(record);
   try {
		Thread.sleep(1000); //到这个地方的时候过大概4、5s,把consumer停掉
		System.out.println("进入了......records--1000---");
		Thread.sleep(20000);
	} catch (Exception e) {
		e.printStackTrace();
	}
	  System.out.println("fetched from partition " + record.partition() + ", offset: " + record.offset() + ", message: " + record.value());  
  }
//          consumer.commitAsync();
}
 就是producer发送消息 。然后consumer端接受消息,consumer里让当前操作睡眠20s, 在consumer接收到消息5s左右,把consumer停掉。设置自动提交,而且时间是1s,经测试发现数据未丢失,什么情况 ? 

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

柒夜笙歌凉 2021-12-03 23:28:21

代码和你描述没看太懂,不过consumer停不停跟收到数据没有什么关系。kfk默认保存7天的数据,停这一会儿,在重启consumer后肯定会收到数据

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文