带有DSL Kafka的Kafka嵌入式服务器。

发布于 2025-01-30 04:58:24 字数 3034 浏览 5 评论 0原文

我正在使用Kafka消息驱动的通道适配器来阅读KAFKA主题的消息。它可以正常工作,但是当我使用嵌入式Kafka经纪人进行集成测试时,它不起作用,也不听取该主题。 这里的代码

    public IntegrationFlow topic1ListenerKafkaFlow() {
        return IntegrationFlows
                .from(Kafka.messageDrivenChannelAdapter(kafkaMessageListenerContainer(), KafkaMessageDrivenChannelAdapter.ListenerMode.batch)
                        .batchMessageConverter(batchMessageConverter)
                        .errorChannel(errorChannel)
                        .outputChannel(inputQueueChannel)).get();
    }



    public AbstractMessageListenerContainer kafkaMessageListenerContainer(){
        AbstractMessageListenerContainer kafkaMessageListenerContainer = new KafkaMessageListenerContainer(consumerFactory(), containerProperties());
        kafkaMessageListenerContainer.setAutoStartup(true);
        return kafkaMessageListenerContainer;
    }


    private ContainerProperties containerProperties() {
        ContainerProperties properties = new ContainerProperties("TEST_TOPIC");

        properties.setAckMode(ContainerProperties.AckMode.BATCH);
        properties.setConsumerTaskExecutor(createDefaultTaskExecutor());
        return properties;
    }


    private ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> properties = new HashMap<>();
        properties.put("bootstrap.servers", "127.0.0.1:9092");
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "testConsumerGroup-1");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        return new DefaultKafkaConsumerFactory<>(properties);
    }

这是侦听器嵌入式经纪人在

    public EmbeddedKafkaBroker embeddedKafkaBroker() {
        return new EmbeddedKafkaBroker(1, false, 1, "TEST_TOPIC")
                .kafkaPorts(9092).brokerProperty("listeners", "PLAINTEXT://localhost:9092");
    }

是生产者配置,

    public Properties getKafkaProperties() {
        final Properties properties = new Properties();
        properties.put("bootstrap.servers", "127.0.0.1:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        return properties;
    }

    public Future<RecordMetadata> sendMessage(String TEST_TOPIC, String message){
        KafkaProducer<String, String> producer = new KafkaProducer<>(getKafkaProperties());
        ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TEST_TOPIC,"Key", message);
        return  producer.send(producerRecord);
    }

当我发送消息已成功发送到主题时,但是 kafka.messagedrivenrivenchanneladapter 没有接收该消息。

我尝试了很多,但找不到根本原因。

I am using kafka messages driven channel adapter to read messages from kafka topic. It works fine, but when I use embedded Kafka broker for integration test, it not working and doesn't listen to that topic..
Here is the code for Listener

    public IntegrationFlow topic1ListenerKafkaFlow() {
        return IntegrationFlows
                .from(Kafka.messageDrivenChannelAdapter(kafkaMessageListenerContainer(), KafkaMessageDrivenChannelAdapter.ListenerMode.batch)
                        .batchMessageConverter(batchMessageConverter)
                        .errorChannel(errorChannel)
                        .outputChannel(inputQueueChannel)).get();
    }



    public AbstractMessageListenerContainer kafkaMessageListenerContainer(){
        AbstractMessageListenerContainer kafkaMessageListenerContainer = new KafkaMessageListenerContainer(consumerFactory(), containerProperties());
        kafkaMessageListenerContainer.setAutoStartup(true);
        return kafkaMessageListenerContainer;
    }


    private ContainerProperties containerProperties() {
        ContainerProperties properties = new ContainerProperties("TEST_TOPIC");

        properties.setAckMode(ContainerProperties.AckMode.BATCH);
        properties.setConsumerTaskExecutor(createDefaultTaskExecutor());
        return properties;
    }


    private ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> properties = new HashMap<>();
        properties.put("bootstrap.servers", "127.0.0.1:9092");
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "testConsumerGroup-1");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        return new DefaultKafkaConsumerFactory<>(properties);
    }

Embedded broker is

    public EmbeddedKafkaBroker embeddedKafkaBroker() {
        return new EmbeddedKafkaBroker(1, false, 1, "TEST_TOPIC")
                .kafkaPorts(9092).brokerProperty("listeners", "PLAINTEXT://localhost:9092");
    }

here is producer config

    public Properties getKafkaProperties() {
        final Properties properties = new Properties();
        properties.put("bootstrap.servers", "127.0.0.1:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        return properties;
    }

    public Future<RecordMetadata> sendMessage(String TEST_TOPIC, String message){
        KafkaProducer<String, String> producer = new KafkaProducer<>(getKafkaProperties());
        ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TEST_TOPIC,"Key", message);
        return  producer.send(producerRecord);
    }

when i send a message it send successfully to topic but Kafka.messageDrivenChannelAdapter is not picking up that messages.

I tried a lot but could not find the root cause.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文