用弹簧云流处理部分失败的批次
当将Spring-Cloud-tream用于流式应用程序(功能样式)带有批处理时,是否有一种方法可以重试/DLQ失败的消息,但还可以处理(流)非拖放记录?
例如: 功能收到了10个记录的批次,并尝试将其转换为其他类型并返回新记录以生产。假设纪录8未能在映射时失败,是否可以完成记录0-7的生成,然后重试/DLQ记录8?
用索引将批处理列列列列德列德德式置换不会导致先前的消息发送。
Spring Kafka版本:2.8.0
代码:
@Override
public List<Message<Context>> apply(Message<List<Context>> listMessage) {
List<Message<Context>> output = new ArrayList<>();
IntStream.range(0, listMessage.getPayload().size()).forEach(index -> {
try {
Record<Context> record = Record.fromBatch(listMessage, index);
output.add(MessageBuilder.withPayload(record.getValue()).build());
if (index == listMessage.getPayload().size() - 1) {
throw new TransientError("offset " + record.getOffset() + "failed", new RuntimeException());
}
} catch (Exception e) {
throw new BatchListenerFailedException("Tigger retry", e, index);
}
});
return output;
}
定制器:
private CommonErrorHandler getCommonErrorHandler(String group) {
DefaultErrorHandler errorHandler = new DefaultErrorHandler(getRecoverer(group), getBackOff());
errorHandler.setLogLevel(KafkaException.Level.DEBUG);
errorHandler.setAckAfterHandle(true);
errorHandler.setClassifications(Map.of(
PermanentError.class, false,
TransientError.class, true,
SerializationException.class, properties.isRetryDesErrors()),
false);
errorHandler.setRetryListeners(getRetryListener());
return errorHandler;
}
private ConsumerRecordRecoverer getRecoverer(String group) {
KafkaOperations<?, ?> operations = new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(producerProperties()));
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(
operations, getDestinationResolver(group));
recoverer.setHeadersFunction(this::buildAdditionalHeaders);
return recoverer;
}
YAML:
spring:
cloud:
function:
definition: batchFunc
stream:
default-binder: kafka-string-avro
binders:
kafka-string-avro:
type: kafka
environment.spring.cloud.stream.kafka.binder.consumerProperties:
key.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
value.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
spring.deserializer.value.delegate.class: io.confluent.kafka.serializers.KafkaAvroDeserializer
schema.registry.url: ${SCHEMA_REGISTRY_URL:http://localhost:8081}
value.subject.name.strategy: io.confluent.kafka.serializers.subject.TopicNameStrategy
specific.avro.reader: true
environment.spring.cloud.stream.kafka.binder.producerProperties:
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
schema.registry.url: ${SCHEMA_REGISTRY_URL:http://localhost:8081}
value.subject.name.strategy: io.confluent.kafka.serializers.subject.TopicNameStrategy
bindings:
batchFunc-in-0:
binder: kafka-string-avro
destination: records.publisher.output
group: function2-in-group
contentType: application/*+avro
consumer:
batchMode: true
function2-out-0:
binder: kafka-string-avro
destination: reporter.output
producer:
useNativeEncoding: true
kafka:
binder:
brokers: ${KAFKA_BROKER_ADDRESS:localhost:9092}
autoCreateTopics: ${KAFKA_AUTOCREATE_TOPICS:false}
default:
consumer:
startOffset: ${START_OFFSET:latest}
enableDlq: false
default:
consumer:
maxAttempts: 1
defaultRetryable: false
When using spring-cloud-stream for streaming application (functional style) with batches, is there a way to retry/DLQ a failed message but also process (stream) the non-failing records?
for example:
function received batch of 10 records, and attempts to convert them to other type and return the new records for producing. let's say record 8 failed on mapping, is it possible to complete the producing of records 0-7 and then retry/DLQ record 8?
throwing BatchListenerFailedException with the index does not cause the prior messages to be sent.
spring kafka version: 2.8.0
code:
@Override
public List<Message<Context>> apply(Message<List<Context>> listMessage) {
List<Message<Context>> output = new ArrayList<>();
IntStream.range(0, listMessage.getPayload().size()).forEach(index -> {
try {
Record<Context> record = Record.fromBatch(listMessage, index);
output.add(MessageBuilder.withPayload(record.getValue()).build());
if (index == listMessage.getPayload().size() - 1) {
throw new TransientError("offset " + record.getOffset() + "failed", new RuntimeException());
}
} catch (Exception e) {
throw new BatchListenerFailedException("Tigger retry", e, index);
}
});
return output;
}
customizer:
private CommonErrorHandler getCommonErrorHandler(String group) {
DefaultErrorHandler errorHandler = new DefaultErrorHandler(getRecoverer(group), getBackOff());
errorHandler.setLogLevel(KafkaException.Level.DEBUG);
errorHandler.setAckAfterHandle(true);
errorHandler.setClassifications(Map.of(
PermanentError.class, false,
TransientError.class, true,
SerializationException.class, properties.isRetryDesErrors()),
false);
errorHandler.setRetryListeners(getRetryListener());
return errorHandler;
}
private ConsumerRecordRecoverer getRecoverer(String group) {
KafkaOperations<?, ?> operations = new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(producerProperties()));
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(
operations, getDestinationResolver(group));
recoverer.setHeadersFunction(this::buildAdditionalHeaders);
return recoverer;
}
yaml:
spring:
cloud:
function:
definition: batchFunc
stream:
default-binder: kafka-string-avro
binders:
kafka-string-avro:
type: kafka
environment.spring.cloud.stream.kafka.binder.consumerProperties:
key.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
value.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
spring.deserializer.value.delegate.class: io.confluent.kafka.serializers.KafkaAvroDeserializer
schema.registry.url: ${SCHEMA_REGISTRY_URL:http://localhost:8081}
value.subject.name.strategy: io.confluent.kafka.serializers.subject.TopicNameStrategy
specific.avro.reader: true
environment.spring.cloud.stream.kafka.binder.producerProperties:
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
schema.registry.url: ${SCHEMA_REGISTRY_URL:http://localhost:8081}
value.subject.name.strategy: io.confluent.kafka.serializers.subject.TopicNameStrategy
bindings:
batchFunc-in-0:
binder: kafka-string-avro
destination: records.publisher.output
group: function2-in-group
contentType: application/*+avro
consumer:
batchMode: true
function2-out-0:
binder: kafka-string-avro
destination: reporter.output
producer:
useNativeEncoding: true
kafka:
binder:
brokers: ${KAFKA_BROKER_ADDRESS:localhost:9092}
autoCreateTopics: ${KAFKA_AUTOCREATE_TOPICS:false}
default:
consumer:
startOffset: ${START_OFFSET:latest}
enableDlq: false
default:
consumer:
maxAttempts: 1
defaultRetryable: false
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论