Spring Cloud Kafka Stream:使用 Avro 发布到 DLQ 失败

发布于 2025-01-11 08:05:56 字数 2852 浏览 0 评论 0原文

在使用 ErrorHandlingDeserializer 与 Avro 组合处理错误时,我无法发布到 Dlq 主题。以下是发布时的错误。

主题 TOPIC_DLT 在 60000 毫秒后不存在于元数据中。 错误 KafkaConsumerDestination{consumerDestinationName='TOPIC',partitions=6,dlqName='TOPIC_DLT'}.container-0-C-1 osihLoggingHandler:250 - org.springframework.messaging.MessageHandlingException:消息处理程序中发生错误[org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1@49abe531];嵌套异常是 java.lang.RuntimeException: failed, failedMessage=GenericMessage

这是 application.yml

spring:
  cloud:
    stream:
      bindings:
        process-in-0:
          destination: TOPIC
          group: groupID
      kafka:
        binder:
          brokers:
            - xxx:9092
          configuration:
            security.protocol: SASL_SSL
            sasl.mechanism: PLAIN
          jaas:
            loginModule: org.apache.kafka.common.security.plain.PlainLoginModule
            options:
              username: username
              password: pwd
          consumer-properties:
            key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
            value.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
            spring.deserializer.value.delegate.class: io.confluent.kafka.serializers.KafkaAvroDeserializer
          producer-properties:
            key.serializer: org.apache.kafka.common.serialization.StringSerializer
            value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer            
        bindings:
          process-in-0:
            consumer:
              configuration:
                basic.auth.credentials.source: USER_INFO
                schema.registry.url: registryUrl
                schema.registry.basic.auth.user.info: user:pwd
                security.protocol: SASL_SSL
                sasl.mechanism: PLAIN
              max-attempts: 1
              dlqProducerProperties:
                configuration:
                  basic.auth.credentials.source: USER_INFO
                  schema.registry.url: registryUrl
                  schema.registry.basic.auth.user.info: user:pwd
                key.serializer: org.apache.kafka.common.serialization.StringSerializer
                value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
              deserializationExceptionHandler: sendToDlq
              ackEachRecord: true
              enableDlq: true
              dlqName: TOPIC_DLT
              autoCommitOnError: true
              autoCommitOffset: true

我正在使用以下依赖项:

spring-cloud-dependency - 2021.0.1
spring-boot-starter-parent - 2.6.3
spring-cloud-stream-binder-kafka
kafka-schema-registry-client - 5.3.0
kafka-avro-序列化器 - 5.3.0

我不确定我到底缺少什么。

I'm unable to publish to Dlq topic while using ErrorHandlingDeserializer for handling the errors with combination of Avro. Below is the error while publishing.

Topic TOPIC_DLT not present in metadata after 60000 ms.
ERROR KafkaConsumerDestination{consumerDestinationName='TOPIC', partitions=6, dlqName='TOPIC_DLT'}.container-0-C-1 o.s.i.h.LoggingHandler:250 - org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1@49abe531]; nested exception is java.lang.RuntimeException: failed, failedMessage=GenericMessage

And here is the application.yml

spring:
  cloud:
    stream:
      bindings:
        process-in-0:
          destination: TOPIC
          group: groupID
      kafka:
        binder:
          brokers:
            - xxx:9092
          configuration:
            security.protocol: SASL_SSL
            sasl.mechanism: PLAIN
          jaas:
            loginModule: org.apache.kafka.common.security.plain.PlainLoginModule
            options:
              username: username
              password: pwd
          consumer-properties:
            key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
            value.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
            spring.deserializer.value.delegate.class: io.confluent.kafka.serializers.KafkaAvroDeserializer
          producer-properties:
            key.serializer: org.apache.kafka.common.serialization.StringSerializer
            value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer            
        bindings:
          process-in-0:
            consumer:
              configuration:
                basic.auth.credentials.source: USER_INFO
                schema.registry.url: registryUrl
                schema.registry.basic.auth.user.info: user:pwd
                security.protocol: SASL_SSL
                sasl.mechanism: PLAIN
              max-attempts: 1
              dlqProducerProperties:
                configuration:
                  basic.auth.credentials.source: USER_INFO
                  schema.registry.url: registryUrl
                  schema.registry.basic.auth.user.info: user:pwd
                key.serializer: org.apache.kafka.common.serialization.StringSerializer
                value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
              deserializationExceptionHandler: sendToDlq
              ackEachRecord: true
              enableDlq: true
              dlqName: TOPIC_DLT
              autoCommitOnError: true
              autoCommitOffset: true

I'm using the following dependencies:

spring-cloud-dependencies - 2021.0.1
spring-boot-starter-parent - 2.6.3
spring-cloud-stream-binder-kafka
kafka-schema-registry-client - 5.3.0
kafka-avro-serializer - 5.3.0

Im not sure what exactly im missing.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

征﹌骨岁月お 2025-01-18 08:05:56

在查阅了大量文档之后,我发现为了让 spring 完成发布 DLQ 的工作,我们需要为原始主题和 DLT 主题拥有相同数量的分区。如果无法完成,那么我们需要将 dlqPartitions 设置为 1 或手动提供 DlqPartitionFunction bean。通过提供 dlqPartitions: 1,所有消息都将发送到分区 0。

After going through a lot of documentation, I found out that for spring to do the job of posting DLQ, we need to have the same number of partitions for both Original topic and DLT Topic. And if it can't be done then we need to set dlqPartitions to 1 or manually provide the DlqPartitionFunction bean. By providing dlqPartitions: 1 all the messages will go to partition 0.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文