如何让kafka消费者处于等待消费状态而不是直接退出?

发布于 2021-11-29 16:37:45 字数 258 浏览 912 评论 4

使用spring-boot spirng-kafka, 建了2个项目, 一个producer,一个consumer,

测试的时候使用producer向kafka发送几条消息, 然后启动consumer能够成功消费到消息。

但我的问题是,这个consumer是消费完就关闭了(因为不是web程序),如何让consumer启动后就处于等待状态,然后一旦发现有新的消息进来就马上进行消费?我是在单元测试里面跑的,如果发布成jar,有如何部署运行使得消费者一直在等待消费状态呢?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(4

本宫微胖 2021-12-05 13:04:18

引用来自“battyman”的评论

你用的是spring-kafka官方的例子吧,那个例子里面就设置了消费完就退出的逻辑。O(∩_∩)O

首先声明了

private final CountDownLatch latch = new CountDownLatch(3);

然后在spring-boot的启动函数中调用await

latch.await(60, TimeUnit.SECONDS);

既然调用了await,就会阻塞当前run函数线程,也就是所谓的spring-boot主函数

消费线程调用countDown,将计数器减1,当计数器变为0的时候,就会取消之前await阻塞的线程让他继续运行,从而run那个函数执行完成,最终调用了close()

 

所以,修改的地方就很明显了:

不要加阻塞,只要让run线程不退出就可以,大不了你也可以不countDown()让它一直阻塞

复古式 2021-12-05 10:25:39

你用的是spring-kafka官方的例子吧,那个例子里面就设置了消费完就退出的逻辑。O(∩_∩)O

首先声明了

private final CountDownLatch latch = new CountDownLatch(3);

然后在spring-boot的启动函数中调用await

latch.await(60, TimeUnit.SECONDS);

既然调用了await,就会阻塞当前run函数线程,也就是所谓的spring-boot主函数

消费线程调用countDown,将计数器减1,当计数器变为0的时候,就会取消之前await阻塞的线程让他继续运行,从而run那个函数执行完成,最终调用了close()

 

所以,修改的地方就很明显了:

不要加阻塞,只要让run线程不退出就可以,大不了你也可以不countDown()让它一直阻塞

奢望 2021-12-03 16:47:19

引用来自“battyman”的评论

用while(true),然后用consumer的poll方法获取records

彩扇题诗 2021-11-30 22:56:22

用while(true),然后用consumer的poll方法获取records

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文