用弹簧云流处理部分失败的批次

发布于 2025-02-07 02:54:47 字数 4293 浏览 3 评论 0原文

当将Spring-Cloud-tream用于流式应用程序(功能样式)带有批处理时,是否有一种方法可以重试/DLQ失败的消息,但还可以处理(流)非拖放记录?

例如: 功能收到了10个记录的批次,并尝试将其转换为其他类型并返回新记录以生产。假设纪录8未能在映射时失败,是否可以完成记录0-7的生成,然后重试/DLQ记录8?

用索引将批处理列列列列德列德德式置换不会导致先前的消息发送。


Spring Kafka版本:2.8.0

代码:

        @Override
public List<Message<Context>> apply(Message<List<Context>> listMessage) {

    List<Message<Context>> output = new ArrayList<>();

    IntStream.range(0, listMessage.getPayload().size()).forEach(index -> {
        try {

            Record<Context> record = Record.fromBatch(listMessage, index);
            output.add(MessageBuilder.withPayload(record.getValue()).build());
            if (index == listMessage.getPayload().size() - 1) {
                throw new TransientError("offset " + record.getOffset() + "failed", new RuntimeException());
            }
        } catch (Exception e) {
          
            throw new BatchListenerFailedException("Tigger retry", e, index);
        }
    });

    return output;
}

定制器:

private CommonErrorHandler getCommonErrorHandler(String group) {
        DefaultErrorHandler errorHandler = new DefaultErrorHandler(getRecoverer(group), getBackOff());
        errorHandler.setLogLevel(KafkaException.Level.DEBUG);
        errorHandler.setAckAfterHandle(true);
        errorHandler.setClassifications(Map.of(
                        PermanentError.class, false,
                        TransientError.class, true,
                        SerializationException.class, properties.isRetryDesErrors()),
                false);
        errorHandler.setRetryListeners(getRetryListener());
        return errorHandler;
    }

    private ConsumerRecordRecoverer getRecoverer(String group) {
        KafkaOperations<?, ?> operations = new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(producerProperties()));
        DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(
                operations, getDestinationResolver(group));
        recoverer.setHeadersFunction(this::buildAdditionalHeaders);
        return recoverer;
    }

YAML:

spring:
  cloud:
    function:
      definition: batchFunc
    stream:
      default-binder: kafka-string-avro
      binders:
        kafka-string-avro:
          type: kafka
          environment.spring.cloud.stream.kafka.binder.consumerProperties:
            key.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
            value.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
            spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
            spring.deserializer.value.delegate.class: io.confluent.kafka.serializers.KafkaAvroDeserializer
            schema.registry.url: ${SCHEMA_REGISTRY_URL:http://localhost:8081}
            value.subject.name.strategy: io.confluent.kafka.serializers.subject.TopicNameStrategy
            specific.avro.reader: true
          environment.spring.cloud.stream.kafka.binder.producerProperties:
            key.serializer: org.apache.kafka.common.serialization.StringSerializer
            value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
            schema.registry.url: ${SCHEMA_REGISTRY_URL:http://localhost:8081}
            value.subject.name.strategy: io.confluent.kafka.serializers.subject.TopicNameStrategy
      bindings:
        batchFunc-in-0:
          binder: kafka-string-avro
          destination: records.publisher.output
          group: function2-in-group
          contentType: application/*+avro
          consumer:
            batchMode: true
        function2-out-0:
          binder: kafka-string-avro
          destination: reporter.output
          producer:
            useNativeEncoding: true
      kafka:
        binder:
          brokers: ${KAFKA_BROKER_ADDRESS:localhost:9092}
          autoCreateTopics: ${KAFKA_AUTOCREATE_TOPICS:false}
        default:
          consumer:
            startOffset: ${START_OFFSET:latest}
            enableDlq: false
      default:
        consumer:
          maxAttempts: 1
          defaultRetryable: false

When using spring-cloud-stream for streaming application (functional style) with batches, is there a way to retry/DLQ a failed message but also process (stream) the non-failing records?

for example:
function received batch of 10 records, and attempts to convert them to other type and return the new records for producing. let's say record 8 failed on mapping, is it possible to complete the producing of records 0-7 and then retry/DLQ record 8?

throwing BatchListenerFailedException with the index does not cause the prior messages to be sent.


spring kafka version: 2.8.0

code:

        @Override
public List<Message<Context>> apply(Message<List<Context>> listMessage) {

    List<Message<Context>> output = new ArrayList<>();

    IntStream.range(0, listMessage.getPayload().size()).forEach(index -> {
        try {

            Record<Context> record = Record.fromBatch(listMessage, index);
            output.add(MessageBuilder.withPayload(record.getValue()).build());
            if (index == listMessage.getPayload().size() - 1) {
                throw new TransientError("offset " + record.getOffset() + "failed", new RuntimeException());
            }
        } catch (Exception e) {
          
            throw new BatchListenerFailedException("Tigger retry", e, index);
        }
    });

    return output;
}

customizer:

private CommonErrorHandler getCommonErrorHandler(String group) {
        DefaultErrorHandler errorHandler = new DefaultErrorHandler(getRecoverer(group), getBackOff());
        errorHandler.setLogLevel(KafkaException.Level.DEBUG);
        errorHandler.setAckAfterHandle(true);
        errorHandler.setClassifications(Map.of(
                        PermanentError.class, false,
                        TransientError.class, true,
                        SerializationException.class, properties.isRetryDesErrors()),
                false);
        errorHandler.setRetryListeners(getRetryListener());
        return errorHandler;
    }

    private ConsumerRecordRecoverer getRecoverer(String group) {
        KafkaOperations<?, ?> operations = new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(producerProperties()));
        DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(
                operations, getDestinationResolver(group));
        recoverer.setHeadersFunction(this::buildAdditionalHeaders);
        return recoverer;
    }

yaml:

spring:
  cloud:
    function:
      definition: batchFunc
    stream:
      default-binder: kafka-string-avro
      binders:
        kafka-string-avro:
          type: kafka
          environment.spring.cloud.stream.kafka.binder.consumerProperties:
            key.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
            value.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
            spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
            spring.deserializer.value.delegate.class: io.confluent.kafka.serializers.KafkaAvroDeserializer
            schema.registry.url: ${SCHEMA_REGISTRY_URL:http://localhost:8081}
            value.subject.name.strategy: io.confluent.kafka.serializers.subject.TopicNameStrategy
            specific.avro.reader: true
          environment.spring.cloud.stream.kafka.binder.producerProperties:
            key.serializer: org.apache.kafka.common.serialization.StringSerializer
            value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
            schema.registry.url: ${SCHEMA_REGISTRY_URL:http://localhost:8081}
            value.subject.name.strategy: io.confluent.kafka.serializers.subject.TopicNameStrategy
      bindings:
        batchFunc-in-0:
          binder: kafka-string-avro
          destination: records.publisher.output
          group: function2-in-group
          contentType: application/*+avro
          consumer:
            batchMode: true
        function2-out-0:
          binder: kafka-string-avro
          destination: reporter.output
          producer:
            useNativeEncoding: true
      kafka:
        binder:
          brokers: ${KAFKA_BROKER_ADDRESS:localhost:9092}
          autoCreateTopics: ${KAFKA_AUTOCREATE_TOPICS:false}
        default:
          consumer:
            startOffset: ${START_OFFSET:latest}
            enableDlq: false
      default:
        consumer:
          maxAttempts: 1
          defaultRetryable: false

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文