使用spring-integration-kafka集成spring和kafka,遇到问题

发布于 2021-12-02 02:07:12 字数 3111 浏览 817 评论 12

最近在做springboot与kafka的集成:

环境:kafka_2.11-0.8.2.2,spring-integration-kafka:1.3.1,zk:zookeeper-3.4.6

最近一周都在研究kafka,实在抓狂,在团队里获取不到帮助,因此向大家求助。

问题:使用自己写的程序消费不到消息,使用bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic topicTest1 却可以消费到。

以下是我的程序:

@Bean
    public ConnectionFactory kafkaBrokerConnectionFactory() throws Exception {
        return new DefaultConnectionFactory(zkConfiguration());
    }

    @Bean
    public Configuration zkConfiguration() {
        return new ZookeeperConfiguration(new ZookeeperConnect(this.zookeeperAddress));
    }

    @Bean
    public Configuration kafkaConfiguration() {
        //ip由逗号分隔
        String[] split = this.brokerAddress.split(",");

        int i = 0;
        BrokerAddress[] brokerAddressList = new BrokerAddress[split.length];
        while(i < split.length){
            brokerAddressList[i] = BrokerAddress.fromAddress(split[i]);
            i++;
        }
        BrokerAddressListConfiguration configuration = new BrokerAddressListConfiguration(brokerAddressList);
        configuration.setSocketTimeout(this.socketTimeout);
        return configuration;
    }


    @Bean
    public OffsetManager offsetManager() {
        return new KafkaTopicOffsetManager(new ZookeeperConnect(this.zookeeperAddress), this.topic);
    }

    @Bean
    public KafkaMessageListenerContainer container(OffsetManager offsetManager) throws Exception {
        //String[] split = this.topics.split(",");

        final KafkaMessageListenerContainer kafkaMessageListenerContainer = new KafkaMessageListenerContainer(
                kafkaBrokerConnectionFactory(), new Partition(this.topic, 0));
        kafkaMessageListenerContainer.setOffsetManager(offsetManager);
        kafkaMessageListenerContainer.setMaxFetch(100);
        kafkaMessageListenerContainer.setConcurrency(1);
        return kafkaMessageListenerContainer;
    }

    @Bean
    public KafkaMessageDrivenChannelAdapter kafkaMessageDrivenChannelAdapter(KafkaMessageListenerContainer container) {
        KafkaMessageDrivenChannelAdapter kafkaMessageDrivenChannelAdapter =
                new KafkaMessageDrivenChannelAdapter(container);
        StringDecoder decoder = new StringDecoder();
        kafkaMessageDrivenChannelAdapter.setKeyDecoder(decoder);
        kafkaMessageDrivenChannelAdapter.setPayloadDecoder(decoder);
        kafkaMessageDrivenChannelAdapter.setOutputChannel(received());
        return kafkaMessageDrivenChannelAdapter;
    }

    @Bean
    public PollableChannel received() {
       return new QueueChannel();
    }



PollableChannel channel = context.getBean("received", PollableChannel.class);
        Message<?> received = channel.receive();
        while(received != null){
            System.out.println(received);
            received = channel.receive(2000);
        }



希望各位了解的可以帮助我,谢谢。

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(12

沙与沫 2021-12-05 09:17:26

在kafka的配置文件server.properties 中增加以下定义:

listeners=PLAINTEXT://hostip:9092
advertised.listeners=PLAINTEXT://hostip:9092

夜无邪 2021-12-05 09:16:14

大概是在今年4月份做的,时间过很久了,现在也不再继续做这些,sorry.

嘦怹 2021-12-05 09:15:01

回复
没去总结呀!

醉生梦死 2021-12-05 09:10:08

请问解决了吗?我跟你的问题一样。。

左岸枫 2021-12-05 09:08:25

我最后在消费端采用了kafka自己的客户端工具,与spring-boot的集成没搞定啊

把回忆走一遍 2021-12-05 09:04:41

回复
怎么搞得,求指导?

终陌 2021-12-05 08:57:06

请问你的问题解决了吗,现在也要集成spring 和kafka,网上说的很多都不使用,经常是消费者程序消费不到

梅窗月明清似水 2021-12-05 08:27:07

引用来自“小石子-_-”的评论

若是用MessageHandler是不是就不用写
KafkaMessageListenerContainer

不再见 2021-12-05 06:41:38

若是用MessageHandler是不是就不用写
KafkaMessageListenerContainer

坏尐絯 2021-12-04 05:17:38

KafkaMessageListenerContainer 配置中是不是缺少了MessageListener

成熟稳重的好男人 2021-12-04 01:58:36

你是用的KafkaMessageDrivenChannelAdapter这个吗?在github上有这样的例子,但是做出来收不到消息。后来我就换成了kafka的java api... 没有用由spring封装的这个。你有什么思路,我们可以探讨下

命硬 2021-12-02 02:53:49

你解决了么,我收遇到这个问题。
kafka-console-consumer.sh能收到,但程序收不到!什么原因呢

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文