如何让kafka消费者处于等待消费状态而不是直接退出?
使用spring-boot spirng-kafka, 建了2个项目, 一个producer,一个consumer,
测试的时候使用producer向kafka发送几条消息, 然后启动consumer能够成功消费到消息。
但我的问题是,这个consumer是消费完就关闭了(因为不是web程序),如何让consumer启动后就处于等待状态,然后一旦发现有新的消息进来就马上进行消费?我是在单元测试里面跑的,如果发布成jar,有如何部署运行使得消费者一直在等待消费状态呢?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(4)
引用来自“battyman”的评论
你用的是spring-kafka官方的例子吧,那个例子里面就设置了消费完就退出的逻辑。O(∩_∩)O
首先声明了
然后在spring-boot的启动函数中调用await
既然调用了await,就会阻塞当前run函数线程,也就是所谓的spring-boot主函数
消费线程调用countDown,将计数器减1,当计数器变为0的时候,就会取消之前await阻塞的线程让他继续运行,从而run那个函数执行完成,最终调用了close()
所以,修改的地方就很明显了:
不要加阻塞,只要让run线程不退出就可以,大不了你也可以不countDown()让它一直阻塞
你用的是spring-kafka官方的例子吧,那个例子里面就设置了消费完就退出的逻辑。O(∩_∩)O
首先声明了
然后在spring-boot的启动函数中调用await
既然调用了await,就会阻塞当前run函数线程,也就是所谓的spring-boot主函数
消费线程调用countDown,将计数器减1,当计数器变为0的时候,就会取消之前await阻塞的线程让他继续运行,从而run那个函数执行完成,最终调用了close()
所以,修改的地方就很明显了:
不要加阻塞,只要让run线程不退出就可以,大不了你也可以不countDown()让它一直阻塞
引用来自“battyman”的评论
用while(true),然后用consumer的poll方法获取records
用while(true),然后用consumer的poll方法获取records