使用spring-integration-kafka集成spring和kafka,遇到问题
最近在做springboot与kafka的集成:
环境:kafka_2.11-0.8.2.2,spring-integration-kafka:1.3.1,zk:zookeeper-3.4.6
最近一周都在研究kafka,实在抓狂,在团队里获取不到帮助,因此向大家求助。
问题:使用自己写的程序消费不到消息,使用bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic topicTest1 却可以消费到。
以下是我的程序:
@Bean public ConnectionFactory kafkaBrokerConnectionFactory() throws Exception { return new DefaultConnectionFactory(zkConfiguration()); } @Bean public Configuration zkConfiguration() { return new ZookeeperConfiguration(new ZookeeperConnect(this.zookeeperAddress)); } @Bean public Configuration kafkaConfiguration() { //ip由逗号分隔 String[] split = this.brokerAddress.split(","); int i = 0; BrokerAddress[] brokerAddressList = new BrokerAddress[split.length]; while(i < split.length){ brokerAddressList[i] = BrokerAddress.fromAddress(split[i]); i++; } BrokerAddressListConfiguration configuration = new BrokerAddressListConfiguration(brokerAddressList); configuration.setSocketTimeout(this.socketTimeout); return configuration; } @Bean public OffsetManager offsetManager() { return new KafkaTopicOffsetManager(new ZookeeperConnect(this.zookeeperAddress), this.topic); } @Bean public KafkaMessageListenerContainer container(OffsetManager offsetManager) throws Exception { //String[] split = this.topics.split(","); final KafkaMessageListenerContainer kafkaMessageListenerContainer = new KafkaMessageListenerContainer( kafkaBrokerConnectionFactory(), new Partition(this.topic, 0)); kafkaMessageListenerContainer.setOffsetManager(offsetManager); kafkaMessageListenerContainer.setMaxFetch(100); kafkaMessageListenerContainer.setConcurrency(1); return kafkaMessageListenerContainer; } @Bean public KafkaMessageDrivenChannelAdapter kafkaMessageDrivenChannelAdapter(KafkaMessageListenerContainer container) { KafkaMessageDrivenChannelAdapter kafkaMessageDrivenChannelAdapter = new KafkaMessageDrivenChannelAdapter(container); StringDecoder decoder = new StringDecoder(); kafkaMessageDrivenChannelAdapter.setKeyDecoder(decoder); kafkaMessageDrivenChannelAdapter.setPayloadDecoder(decoder); kafkaMessageDrivenChannelAdapter.setOutputChannel(received()); return kafkaMessageDrivenChannelAdapter; } @Bean public PollableChannel received() { return new QueueChannel(); }
PollableChannel channel = context.getBean("received", PollableChannel.class); Message<?> received = channel.receive(); while(received != null){ System.out.println(received); received = channel.receive(2000); }
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(12)
在kafka的配置文件server.properties 中增加以下定义:
listeners=PLAINTEXT://hostip:9092
advertised.listeners=PLAINTEXT://hostip:9092
大概是在今年4月份做的,时间过很久了,现在也不再继续做这些,sorry.
回复
没去总结呀!
请问解决了吗?我跟你的问题一样。。
我最后在消费端采用了kafka自己的客户端工具,与spring-boot的集成没搞定啊
回复
怎么搞得,求指导?
请问你的问题解决了吗,现在也要集成spring 和kafka,网上说的很多都不使用,经常是消费者程序消费不到
引用来自“小石子-_-”的评论
若是用MessageHandler是不是就不用写
KafkaMessageListenerContainer
若是用MessageHandler是不是就不用写
KafkaMessageListenerContainer
KafkaMessageListenerContainer 配置中是不是缺少了MessageListener
你是用的KafkaMessageDrivenChannelAdapter这个吗?在github上有这样的例子,但是做出来收不到消息。后来我就换成了kafka的java api... 没有用由spring封装的这个。你有什么思路,我们可以探讨下
你解决了么,我收遇到这个问题。
kafka-console-consumer.sh能收到,但程序收不到!什么原因呢