Kafka消息恢复与后背后处理器,用于非重试异常
关于链接 - Kafka 事务回滚不适用于 RecordTooLargeException 的 3 个主题
在EDIT3上,我有以下问题
如何在使用 AfterRollbackProcessor 时将错误发送到数据库并同时发送到 DLQ
DefaultAfterRollbackProcessor
我将 addNotRetryableExceptions ( RecordTooLargeException , IllegalArgumentException, CustomBusinessException )添加到恢复阶段后的 (将错误保存到数据库)并发送到 DLQ) - 如果代码发生重新平衡或重新启动 - 代码再次重试失败记录( RecordTooLargeException ) - 如何跳过 NotRetryableExceptions 错误以进一步尝试
@Bean
AfterRollbackProcessor<Object, Object> arp() {
DefaultAfterRollbackProcessor darp = new DefaultAfterRollbackProcessor<>((rec, ex) -> {
log.error("#### Failed to process {} from topic, partition {}-{}, @{}",
rec.value(), rec.topic(), rec.partition(), rec.offset(), ex);
// If the exception is RetryableExceptions then tell kafka do not send that message if code restarted
}, new FixedBackOff(3000L, 2));
Class<? extends Exception>[] nre = new Class[2];
nre[0] = RecordTooLargeException.class;
nre[1] = IllegalArgumentException.class;
darp.addNotRetryableExceptions(nre);
return darp;
}
根据建议 - 我更新了代码,如下所示
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
private DBHandler dbHandler;
@Bean
AfterRollbackProcessor<Object, Object> arp() {
DefaultAfterRollbackProcessor darp = new DefaultAfterRollbackProcessor<>((rec, ex) -> {
log.error("#### Failed to process {} from topic, partition {}-{}, @{}",
rec.value(), rec.topic(), rec.partition(), rec.offset(), ex);
// Save records to DB
dbHandler.handleFailure((String)rec.key(), (String)rec.value(), ex, rec.topic());
// want to send Data to DLQ - How to do
}, new FixedBackOff(3000L, 3),kafkaTemplate, true);
Class<? extends Exception>[] nre = new Class[2];
nre[0] = RecordTooLargeException.class;
nre[1] = IllegalArgumentException.class;
darp.addNotRetryableExceptions(nre);
return darp;
}
一些如何找到解决方案 解决方案
// 在恢复阶段创建了一个用于将错误记录转储到数据库的类,
@Slf4j
@Service
public class DBPublishingRecordRecoverer implements ConsumerRecordRecoverer {
@Override
public void accept(ConsumerRecord<?, ?> rec, Exception ex) {
log.error("@ DB Operation | process {} from topic, partition {}-{}, @{}",
rec.value(), rec.topic(), rec.partition(), rec.offset(), ex.getMessage());
}
}
创建了一个将相同的失败记录发送到 DLT 的类
@Slf4j
@Service
public class DLTRecordRecoverer {
public DeadLetterPublishingRecoverer dlr(@Nullable KafkaOperations<?, ?> kafkaOperations) {
return new DeadLetterPublishingRecoverer(kafkaOperations) {
@Override
public void accept(ConsumerRecord<?, ?> record, Exception exception) {
log.info("DLQ to process {} from topic, partition {}-{}, @{}",
record.value(), record.topic(), record.partition(), record.offset(), exception.getMessage());
super.accept(record, exception);
}
};
}
}
现在将这 2 个恢复器添加到 AfterRollbackProcessor
@Bean
AfterRollbackProcessor<Object, Object> xyz() {
DefaultAfterRollbackProcessor darp = new DefaultAfterRollbackProcessor<>(testRecoverer
.andThen(dltRecordRecoverer.dlr(kafkaTemplate)),
new FixedBackOff(3000L, 3), kafkaTemplate, true);
Class<? extends Exception>[] nre = new Class[2];
nre[0] = RecordTooLargeException.class;
nre[1] = IllegalArgumentException.class;
darp.addNotRetryableExceptions(nre);
return darp;
}
日志输出
c.t.t.demo.DBPublishingRecordRecoverer : @ DB Operation | process Another example from topic, partition TEST-TOPIC-2, @20
c.t.transaction.demo.DLTRecordRecoverer : DLQ to process Another example from topic, partition TEST-TOPIC-2, @20
o.a.k.c.p.internals.TransactionManager : [Producer clientId=raw-item-producer-client-1, transactionalId=tx-01d1a934-3c0e-45b4-ac1f-5b8fa
在消费者代码中
KafkaMessageListenerContainer : aem-dam-edm-group-id: partitions assigned: [PRICE-TOPIC-0, PRICE-TOPIC-1, PRICE-TOPIC-2]
KafkaMessageListenerContainer : aem-dam-edm-group-id: partitions assigned: [ITEM-TOPIC-1, ITEM-TOPIC-2, ITEM-TOPIC-0]
KafkaMessageListenerContainer : aem-dam-edm-group-id: partitions assigned: [INVENTORY-TOPIC-1, INVENTORY-TOPIC-0, INVENTORY-TOPIC-2]
KafkaMessageListenerContainer : aem-dam-edm-group-id: partitions assigned: [TEST-TOPIC.DLT-1, TEST-TOPIC.DLT-0, TEST-TOPIC.DLT-2]
ransaction.demo.ConsumerService : Received payload. Topic : TEST-TOPIC.DLT , key :TestKey-002 , value : Another example
With respect to the link - Kafka transaction rollback not working with 3 topics for RecordTooLargeException
On EDIT3 I have below question
how to send error to DB and at the same time to DLQ while using AfterRollbackProcessor
I added addNotRetryableExceptions ( RecordTooLargeException , IllegalArgumentException, CustomBusinessException) to the DefaultAfterRollbackProcessor
After recovery phase ( save error to db and send to DLQ) - if re-balancing or restart happens to the code - code again retries the Failed record ( RecordTooLargeException ) - how to skip NotRetryableExceptions error for further try
@Bean
AfterRollbackProcessor<Object, Object> arp() {
DefaultAfterRollbackProcessor darp = new DefaultAfterRollbackProcessor<>((rec, ex) -> {
log.error("#### Failed to process {} from topic, partition {}-{}, @{}",
rec.value(), rec.topic(), rec.partition(), rec.offset(), ex);
// If the exception is RetryableExceptions then tell kafka do not send that message if code restarted
}, new FixedBackOff(3000L, 2));
Class<? extends Exception>[] nre = new Class[2];
nre[0] = RecordTooLargeException.class;
nre[1] = IllegalArgumentException.class;
darp.addNotRetryableExceptions(nre);
return darp;
}
As per suggestion - I updated the code as below
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
private DBHandler dbHandler;
@Bean
AfterRollbackProcessor<Object, Object> arp() {
DefaultAfterRollbackProcessor darp = new DefaultAfterRollbackProcessor<>((rec, ex) -> {
log.error("#### Failed to process {} from topic, partition {}-{}, @{}",
rec.value(), rec.topic(), rec.partition(), rec.offset(), ex);
// Save records to DB
dbHandler.handleFailure((String)rec.key(), (String)rec.value(), ex, rec.topic());
// want to send Data to DLQ - How to do
}, new FixedBackOff(3000L, 3),kafkaTemplate, true);
Class<? extends Exception>[] nre = new Class[2];
nre[0] = RecordTooLargeException.class;
nre[1] = IllegalArgumentException.class;
darp.addNotRetryableExceptions(nre);
return darp;
}
Some how able to find a solution
Solution
// Created below class for dump error record to DB while in recovery phase
@Slf4j
@Service
public class DBPublishingRecordRecoverer implements ConsumerRecordRecoverer {
@Override
public void accept(ConsumerRecord<?, ?> rec, Exception ex) {
log.error("@ DB Operation | process {} from topic, partition {}-{}, @{}",
rec.value(), rec.topic(), rec.partition(), rec.offset(), ex.getMessage());
}
}
created a class who send the same failed record to the DLT
@Slf4j
@Service
public class DLTRecordRecoverer {
public DeadLetterPublishingRecoverer dlr(@Nullable KafkaOperations<?, ?> kafkaOperations) {
return new DeadLetterPublishingRecoverer(kafkaOperations) {
@Override
public void accept(ConsumerRecord<?, ?> record, Exception exception) {
log.info("DLQ to process {} from topic, partition {}-{}, @{}",
record.value(), record.topic(), record.partition(), record.offset(), exception.getMessage());
super.accept(record, exception);
}
};
}
}
Now add these 2 recoverers to the AfterRollbackProcessor
@Bean
AfterRollbackProcessor<Object, Object> xyz() {
DefaultAfterRollbackProcessor darp = new DefaultAfterRollbackProcessor<>(testRecoverer
.andThen(dltRecordRecoverer.dlr(kafkaTemplate)),
new FixedBackOff(3000L, 3), kafkaTemplate, true);
Class<? extends Exception>[] nre = new Class[2];
nre[0] = RecordTooLargeException.class;
nre[1] = IllegalArgumentException.class;
darp.addNotRetryableExceptions(nre);
return darp;
}
The log output
c.t.t.demo.DBPublishingRecordRecoverer : @ DB Operation | process Another example from topic, partition TEST-TOPIC-2, @20
c.t.transaction.demo.DLTRecordRecoverer : DLQ to process Another example from topic, partition TEST-TOPIC-2, @20
o.a.k.c.p.internals.TransactionManager : [Producer clientId=raw-item-producer-client-1, transactionalId=tx-01d1a934-3c0e-45b4-ac1f-5b8fa
In the consumer code
KafkaMessageListenerContainer : aem-dam-edm-group-id: partitions assigned: [PRICE-TOPIC-0, PRICE-TOPIC-1, PRICE-TOPIC-2]
KafkaMessageListenerContainer : aem-dam-edm-group-id: partitions assigned: [ITEM-TOPIC-1, ITEM-TOPIC-2, ITEM-TOPIC-0]
KafkaMessageListenerContainer : aem-dam-edm-group-id: partitions assigned: [INVENTORY-TOPIC-1, INVENTORY-TOPIC-0, INVENTORY-TOPIC-2]
KafkaMessageListenerContainer : aem-dam-edm-group-id: partitions assigned: [TEST-TOPIC.DLT-1, TEST-TOPIC.DLT-0, TEST-TOPIC.DLT-2]
ransaction.demo.ConsumerService : Received payload. Topic : TEST-TOPIC.DLT , key :TestKey-002 , value : Another example
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
为了提交已恢复事务的偏移量,您必须将事务性
KafkaTemplate
传递到DefaultAfterRollbackProcessor
并将commitRecovered
设置为 true。请参阅 javadocIn order to commit the offset of the recovered transaction, you have to pass a transactional
KafkaTemplate
into theDefaultAfterRollbackProcessor
and setcommitRecovered
to true. See the javadocs