Kafka消息恢复与后背后处理器,用于非重试异常

发布于 2025-01-17 18:47:17 字数 5359 浏览 2 评论 0原文

关于链接 - Kafka 事务回滚不适用于 RecordTooLargeException 的 3 个主题

EDIT3上,我有以下问题

如何在使用 AfterRollbackProcessor 时将错误发送到数据库并同时发送到 DLQ

DefaultAfterRollbackProcessor

我将 addNotRetryableExceptions ( RecordTooLargeException , IllegalArgumentException, CustomBusinessException )添加到恢复阶段后的 (将错误保存到数据库)并发送到 DLQ) - 如果代码发生重新平衡或重新启动 - 代码再次重试失败记录( RecordTooLargeException ) - 如何跳过 NotRetryableExceptions 错误以进一步尝试

 @Bean
 AfterRollbackProcessor<Object, Object> arp() {
     DefaultAfterRollbackProcessor darp = new DefaultAfterRollbackProcessor<>((rec, ex) -> {
         log.error("#### Failed to process {} from topic, partition {}-{}, @{}",
                 rec.value(), rec.topic(), rec.partition(), rec.offset(), ex);
         // If the exception is RetryableExceptions then tell kafka do not send that message if code restarted
     }, new FixedBackOff(3000L, 2));

     Class<? extends Exception>[] nre = new Class[2];
     nre[0] = RecordTooLargeException.class;
     nre[1] = IllegalArgumentException.class;
     darp.addNotRetryableExceptions(nre);

     return darp;
 }

根据建议 - 我更新了代码,如下所示


   @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    private DBHandler dbHandler;

    @Bean
    AfterRollbackProcessor<Object, Object> arp() {
        DefaultAfterRollbackProcessor darp = new DefaultAfterRollbackProcessor<>((rec, ex) -> {
            log.error("#### Failed to process {} from topic, partition {}-{}, @{}",
                    rec.value(), rec.topic(), rec.partition(), rec.offset(), ex);

            // Save records to DB
            dbHandler.handleFailure((String)rec.key(), (String)rec.value(), ex, rec.topic());
            
// want to send Data to DLQ - How to do 


        }, new FixedBackOff(3000L, 3),kafkaTemplate, true);

        Class<? extends Exception>[] nre = new Class[2];
        nre[0] = RecordTooLargeException.class;
        nre[1] = IllegalArgumentException.class;
        darp.addNotRetryableExceptions(nre);
        return darp;
    }

一些如何找到解决方案 解决方案

// 在恢复阶段创建了一个用于将错误记录转储到数据库的类,

@Slf4j
@Service
public class DBPublishingRecordRecoverer implements ConsumerRecordRecoverer {
    @Override
    public void accept(ConsumerRecord<?, ?> rec, Exception ex) {
        log.error("@ DB Operation |  process {} from topic, partition {}-{}, @{}",
                rec.value(), rec.topic(), rec.partition(), rec.offset(), ex.getMessage());
    }
}

创建了一个将相同的失败记录发送到 DLT 的类


@Slf4j
@Service
public class DLTRecordRecoverer  {

    public DeadLetterPublishingRecoverer dlr(@Nullable KafkaOperations<?, ?> kafkaOperations) {
        return new DeadLetterPublishingRecoverer(kafkaOperations) {

            @Override
            public void accept(ConsumerRecord<?, ?> record, Exception exception) {
                log.info("DLQ to process {} from topic, partition {}-{}, @{}",
                        record.value(), record.topic(), record.partition(), record.offset(), exception.getMessage());
                super.accept(record, exception);
            }

        };
    }
}

现在将这 2 个恢复器添加到 AfterRollbackProcessor


   @Bean
    AfterRollbackProcessor<Object, Object> xyz() {
        DefaultAfterRollbackProcessor darp = new DefaultAfterRollbackProcessor<>(testRecoverer
                .andThen(dltRecordRecoverer.dlr(kafkaTemplate)),
                new FixedBackOff(3000L, 3), kafkaTemplate, true);

        Class<? extends Exception>[] nre = new Class[2];
        nre[0] = RecordTooLargeException.class;
        nre[1] = IllegalArgumentException.class;
        darp.addNotRetryableExceptions(nre);
        return darp;
    }

日志输出

c.t.t.demo.DBPublishingRecordRecoverer   : @ DB Operation |  process Another example from topic, partition TEST-TOPIC-2, @20
c.t.transaction.demo.DLTRecordRecoverer  : DLQ to process Another example from topic, partition TEST-TOPIC-2, @20
o.a.k.c.p.internals.TransactionManager   : [Producer clientId=raw-item-producer-client-1, transactionalId=tx-01d1a934-3c0e-45b4-ac1f-5b8fa

在消费者代码中

KafkaMessageListenerContainer    : aem-dam-edm-group-id: partitions assigned: [PRICE-TOPIC-0, PRICE-TOPIC-1, PRICE-TOPIC-2]
KafkaMessageListenerContainer    : aem-dam-edm-group-id: partitions assigned: [ITEM-TOPIC-1, ITEM-TOPIC-2, ITEM-TOPIC-0]
KafkaMessageListenerContainer    : aem-dam-edm-group-id: partitions assigned: [INVENTORY-TOPIC-1, INVENTORY-TOPIC-0, INVENTORY-TOPIC-2]
KafkaMessageListenerContainer    : aem-dam-edm-group-id: partitions assigned: [TEST-TOPIC.DLT-1, TEST-TOPIC.DLT-0, TEST-TOPIC.DLT-2]
ransaction.demo.ConsumerService  : Received payload. Topic : TEST-TOPIC.DLT , key :TestKey-002 , value : Another example

With respect to the link - Kafka transaction rollback not working with 3 topics for RecordTooLargeException

On EDIT3 I have below question

how to send error to DB and at the same time to DLQ while using AfterRollbackProcessor

I added addNotRetryableExceptions ( RecordTooLargeException , IllegalArgumentException, CustomBusinessException) to the DefaultAfterRollbackProcessor

After recovery phase ( save error to db and send to DLQ) - if re-balancing or restart happens to the code - code again retries the Failed record ( RecordTooLargeException ) - how to skip NotRetryableExceptions error for further try

 @Bean
 AfterRollbackProcessor<Object, Object> arp() {
     DefaultAfterRollbackProcessor darp = new DefaultAfterRollbackProcessor<>((rec, ex) -> {
         log.error("#### Failed to process {} from topic, partition {}-{}, @{}",
                 rec.value(), rec.topic(), rec.partition(), rec.offset(), ex);
         // If the exception is RetryableExceptions then tell kafka do not send that message if code restarted
     }, new FixedBackOff(3000L, 2));

     Class<? extends Exception>[] nre = new Class[2];
     nre[0] = RecordTooLargeException.class;
     nre[1] = IllegalArgumentException.class;
     darp.addNotRetryableExceptions(nre);

     return darp;
 }

As per suggestion - I updated the code as below


   @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    private DBHandler dbHandler;

    @Bean
    AfterRollbackProcessor<Object, Object> arp() {
        DefaultAfterRollbackProcessor darp = new DefaultAfterRollbackProcessor<>((rec, ex) -> {
            log.error("#### Failed to process {} from topic, partition {}-{}, @{}",
                    rec.value(), rec.topic(), rec.partition(), rec.offset(), ex);

            // Save records to DB
            dbHandler.handleFailure((String)rec.key(), (String)rec.value(), ex, rec.topic());
            
// want to send Data to DLQ - How to do 


        }, new FixedBackOff(3000L, 3),kafkaTemplate, true);

        Class<? extends Exception>[] nre = new Class[2];
        nre[0] = RecordTooLargeException.class;
        nre[1] = IllegalArgumentException.class;
        darp.addNotRetryableExceptions(nre);
        return darp;
    }

Some how able to find a solution
Solution

// Created below class for dump error record to DB while in recovery phase

@Slf4j
@Service
public class DBPublishingRecordRecoverer implements ConsumerRecordRecoverer {
    @Override
    public void accept(ConsumerRecord<?, ?> rec, Exception ex) {
        log.error("@ DB Operation |  process {} from topic, partition {}-{}, @{}",
                rec.value(), rec.topic(), rec.partition(), rec.offset(), ex.getMessage());
    }
}

created a class who send the same failed record to the DLT


@Slf4j
@Service
public class DLTRecordRecoverer  {

    public DeadLetterPublishingRecoverer dlr(@Nullable KafkaOperations<?, ?> kafkaOperations) {
        return new DeadLetterPublishingRecoverer(kafkaOperations) {

            @Override
            public void accept(ConsumerRecord<?, ?> record, Exception exception) {
                log.info("DLQ to process {} from topic, partition {}-{}, @{}",
                        record.value(), record.topic(), record.partition(), record.offset(), exception.getMessage());
                super.accept(record, exception);
            }

        };
    }
}

Now add these 2 recoverers to the AfterRollbackProcessor


   @Bean
    AfterRollbackProcessor<Object, Object> xyz() {
        DefaultAfterRollbackProcessor darp = new DefaultAfterRollbackProcessor<>(testRecoverer
                .andThen(dltRecordRecoverer.dlr(kafkaTemplate)),
                new FixedBackOff(3000L, 3), kafkaTemplate, true);

        Class<? extends Exception>[] nre = new Class[2];
        nre[0] = RecordTooLargeException.class;
        nre[1] = IllegalArgumentException.class;
        darp.addNotRetryableExceptions(nre);
        return darp;
    }

The log output

c.t.t.demo.DBPublishingRecordRecoverer   : @ DB Operation |  process Another example from topic, partition TEST-TOPIC-2, @20
c.t.transaction.demo.DLTRecordRecoverer  : DLQ to process Another example from topic, partition TEST-TOPIC-2, @20
o.a.k.c.p.internals.TransactionManager   : [Producer clientId=raw-item-producer-client-1, transactionalId=tx-01d1a934-3c0e-45b4-ac1f-5b8fa

In the consumer code

KafkaMessageListenerContainer    : aem-dam-edm-group-id: partitions assigned: [PRICE-TOPIC-0, PRICE-TOPIC-1, PRICE-TOPIC-2]
KafkaMessageListenerContainer    : aem-dam-edm-group-id: partitions assigned: [ITEM-TOPIC-1, ITEM-TOPIC-2, ITEM-TOPIC-0]
KafkaMessageListenerContainer    : aem-dam-edm-group-id: partitions assigned: [INVENTORY-TOPIC-1, INVENTORY-TOPIC-0, INVENTORY-TOPIC-2]
KafkaMessageListenerContainer    : aem-dam-edm-group-id: partitions assigned: [TEST-TOPIC.DLT-1, TEST-TOPIC.DLT-0, TEST-TOPIC.DLT-2]
ransaction.demo.ConsumerService  : Received payload. Topic : TEST-TOPIC.DLT , key :TestKey-002 , value : Another example

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

尴尬癌患者 2025-01-24 18:47:17

为了提交已恢复事务的偏移量,您必须将事务性 KafkaTemplate 传递到 DefaultAfterRollbackProcessor 并将 commitRecovered 设置为 true。请参阅 javadoc

/**
 * Construct an instance with the provided recoverer which will be called after the
 * backOff returns STOP for a topic/partition/offset.
 * @param recoverer the recoverer; if null, the default (logging) recoverer is used.
 * @param backOff the {@link BackOff}.
 * @param kafkaOperations for sending the recovered offset to the transaction.
 * @param commitRecovered true to commit the recovered record's offset; requires a
 * {@link KafkaOperations}.
 * @since 2.5.3
 */

In order to commit the offset of the recovered transaction, you have to pass a transactional KafkaTemplate into the DefaultAfterRollbackProcessor and set commitRecovered to true. See the javadocs

/**
 * Construct an instance with the provided recoverer which will be called after the
 * backOff returns STOP for a topic/partition/offset.
 * @param recoverer the recoverer; if null, the default (logging) recoverer is used.
 * @param backOff the {@link BackOff}.
 * @param kafkaOperations for sending the recovered offset to the transaction.
 * @param commitRecovered true to commit the recovered record's offset; requires a
 * {@link KafkaOperations}.
 * @since 2.5.3
 */
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文