configurelistenercontainer时KAFKA消息错误的集成流

发布于 2025-01-21 07:08:44 字数 2589 浏览 4 评论 0 原文

我正在尝试使用IntegrationFlow for Kafka将从Kafka接收到的消息传递到频道。

以下是我的工作代码: -


    @Bean
    public MessageChannel fromKafka() {
        return new DirectChannel();
    }

    @Bean
    public IntegrationFlow topic1ListenerFromKafkaFlow1() throws Exception {
/*        return IntegrationFlows
                .from(Kafka.messageDrivenChannelAdapter(consumerFactory(),
                                KafkaMessageDrivenChannelAdapter.ListenerMode.record, kafkaTopic)
                        .configureListenerContainer( c ->  c.ackMode(AbstractMessageListenerContainer.AckMode.MANUAL)
                                        .id("topic1ListenerContainer"))
                        .recoveryCallback(new ErrorMessageSendingRecoverer(messageFromKafka(),
                                new RawRecordHeaderErrorMessageStrategy()))
                        .retryTemplate(new RetryTemplate())
                        .filterInRetry(true))
                .filter(Message.class, m ->
                                m.getHeaders().get(KafkaHeaders.RECEIVED_MESSAGE_KEY, Integer.class) < 101,
                        f -> f.throwExceptionOnRejection(true))
                .<String, String>transform(String::toUpperCase)
                .channel(c -> c.queue("listeningFromKafkaResults1"))
                .get();*/



        return IntegrationFlows
                .from(Kafka.messageDrivenChannelAdapter(listener(), KafkaMessageDrivenChannelAdapter.ListenerMode.record))
                .channel("fromKafka")
                .get();
    }





    @Bean("listenerkafka")
    public KafkaMessageListenerContainer<String, String> listener() throws Exception {
        ContainerProperties properties = new ContainerProperties(kafkaTopic1);
        properties.setGroupId("kafka-test");
        return new KafkaMessageListenerContainer<>(consumerFactory, properties);
    }

    @ServiceActivator(inputChannel="fromKafka", outputChannel = "somechannel")
    public Message<CreatRequest> fromKafka(Message<?> msg) throws JsonProcessingException {
        CreatRequest  creatRequest =  objectMapper.readValue(msg.getPayload().toString(), CreatRequest.class);
        Message<CreatRequest> message= MessageBuilder.withPayload(creatRequest).build();
        logger.info("Inside fromKafka " + message);
        return  message;
    }

我面临的问题是评论代码在主题1listenerfromkafkaflow中不起作用。 在这里,我找不到C.Ackmode(AbstractMessageListenerContainer.ackmode.manual),

因为它显示了未识别的编译时间错误。 你能在我出错的地方纠正我吗?

另外,我需要将此流传递到另一个线程中,而不是在主线程中。

I am trying to use IntegrationFlow for kafka to pass message received from Kafka to channel.

Below is my working code:-


    @Bean
    public MessageChannel fromKafka() {
        return new DirectChannel();
    }

    @Bean
    public IntegrationFlow topic1ListenerFromKafkaFlow1() throws Exception {
/*        return IntegrationFlows
                .from(Kafka.messageDrivenChannelAdapter(consumerFactory(),
                                KafkaMessageDrivenChannelAdapter.ListenerMode.record, kafkaTopic)
                        .configureListenerContainer( c ->  c.ackMode(AbstractMessageListenerContainer.AckMode.MANUAL)
                                        .id("topic1ListenerContainer"))
                        .recoveryCallback(new ErrorMessageSendingRecoverer(messageFromKafka(),
                                new RawRecordHeaderErrorMessageStrategy()))
                        .retryTemplate(new RetryTemplate())
                        .filterInRetry(true))
                .filter(Message.class, m ->
                                m.getHeaders().get(KafkaHeaders.RECEIVED_MESSAGE_KEY, Integer.class) < 101,
                        f -> f.throwExceptionOnRejection(true))
                .<String, String>transform(String::toUpperCase)
                .channel(c -> c.queue("listeningFromKafkaResults1"))
                .get();*/



        return IntegrationFlows
                .from(Kafka.messageDrivenChannelAdapter(listener(), KafkaMessageDrivenChannelAdapter.ListenerMode.record))
                .channel("fromKafka")
                .get();
    }





    @Bean("listenerkafka")
    public KafkaMessageListenerContainer<String, String> listener() throws Exception {
        ContainerProperties properties = new ContainerProperties(kafkaTopic1);
        properties.setGroupId("kafka-test");
        return new KafkaMessageListenerContainer<>(consumerFactory, properties);
    }

    @ServiceActivator(inputChannel="fromKafka", outputChannel = "somechannel")
    public Message<CreatRequest> fromKafka(Message<?> msg) throws JsonProcessingException {
        CreatRequest  creatRequest =  objectMapper.readValue(msg.getPayload().toString(), CreatRequest.class);
        Message<CreatRequest> message= MessageBuilder.withPayload(creatRequest).build();
        logger.info("Inside fromKafka " + message);
        return  message;
    }

Issue which I am facing is commented code doesn't work inside topic1ListenerFromKafkaFlow1.
Here I am not able to find c.ackMode(AbstractMessageListenerContainer.AckMode.MANUAL)

As it is showing compile time error ackmode not recognised.
Can you please correct me where i am going wrong.

Also I need to pass this flow in another thread and not in main thread.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

南薇 2025-01-28 07:08:44

代替使用KAFKA消息驱动的通道适配器:

https://docs.spring.io/spring-integration/docs/current/referent/referent/html/kafka.html#kafka-inbound

但是,在同一频道上有两个适配器,请求将是圆形的 -罗宾分布在他们之间。如果您希望两者都接收消息,则需要 PublishSubscribechannel

Use the Kafka message-driven channel adapter instead:

https://docs.spring.io/spring-integration/docs/current/reference/html/kafka.html#kafka-inbound

However, with two adapters on the same channel the requests will be round-robin distributed between them. If you want both to receive the message, you need a PublishSubscribeChannel.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文