RERRYTEMPLATE与ErrorHandler的性能比较

发布于 2025-01-18 11:57:44 字数 1276 浏览 4 评论 0原文

我有两种方法可以通过

  1. kafkalistener中使用retryTemplate从kafkalistener错误恢复,方法:
@KafkaListener(topics: "topic1")
public void handle(command) {
    retryTemplate.execute(ctx -> {
        processCommand(command);
    });
    // Retries exhausted, execute the recoverer logic
    recover(command);
}
  1. 将errorHandler设置为MessageListenerContainer通过contagererCustomizer:
@Component
    public class ContainerCustomizer {
        public ContainerCustomizer(CustomConcurrentContainerListenerFactory factory) {
            factory.setContainerCustomizer(container -> {
                    container.setErrorHandler(new SeekToCurrentErrorHandler((ConsumerRecord<?, ?> record, Exception e) -> {
                        //logic for recoverer after retries exhausted
                        recover(convertRecord(record));
                    }, new ExponentialBackOffWithMaxRetries(2)));
            });
        }
    }

在性能上:涉及性能和阻止消费者线程时,这两个选项如何比较这两个选项?确实说,使用retrytemplate.execute,在单独的线程中处理重试时,带有containerListener.seterlistener.seterrorhandler它会阻止主要消费者的线程?

I have two ways of recovering from KafkaListener errors with using

  1. RetryTemplate in the KafkaListener method:
@KafkaListener(topics: "topic1")
public void handle(command) {
    retryTemplate.execute(ctx -> {
        processCommand(command);
    });
    // Retries exhausted, execute the recoverer logic
    recover(command);
}
  1. Set ErrorHandler to MessageListenerContainer via ContainerCustomizer:
@Component
    public class ContainerCustomizer {
        public ContainerCustomizer(CustomConcurrentContainerListenerFactory factory) {
            factory.setContainerCustomizer(container -> {
                    container.setErrorHandler(new SeekToCurrentErrorHandler((ConsumerRecord<?, ?> record, Exception e) -> {
                        //logic for recoverer after retries exhausted
                        recover(convertRecord(record));
                    }, new ExponentialBackOffWithMaxRetries(2)));
            });
        }
    }

When it comes to performance and blocking the consumer thread, how these two options compare? Is it true to say that with RetryTemplate.execute, retries are handled in a separate thread while with containerListener.setErrorHandler it blocks the main consumer's thread?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

2025-01-25 11:57:44

两者都会阻塞消费者线程——否则你将继续处理记录,并且 Kafka 的排序保证将会丢失。

此外,这两种方法均已弃用,取而代之的是 DefaultErrorHandler,它是 SeekToCurrentErrorHandler 的演变。

两者之间的区别在于,使用 Spring Retry 时,所有调用都将在内存中重试,因此您应该确保聚合退避不会超过 max.poll.interval.ms< /code> 否则代理会认为您的服务器已失效并将执行重新平衡。

SeekToCurrentErrorHandler 以及 DefaultErrorHandler 都会在每次重试时向代理执行新的查找,因此您只需确保最大延迟加上执行时间不超过 <代码>max.poll.interval.ms。

如果您需要非阻塞重试,请查看非阻塞重试功能。

Both will block the consumer thread - otherwise you'd continue processing records and Kafka's ordering guarantees would be lost.

Also, both approaches are deprecated in favor of DefaultErrorHandler, which is an evolution of SeekToCurrentErrorHandler.

The difference between the two is, with Spring Retry, all invocations will be retried in memory, so you should make sure the aggregate backoff won't exceed the max.poll.interval.ms or the broker will think your server is dead and will perform a rebalance.

SeekToCurrentErrorHandler, as well as DefaultErrorHandler, will perform a new seek to the broker for each retry, so you just need to make sure the biggest delay plus execution time does not exceed max.poll.interval.ms.

If you need non-blocking retries, take a look at the non-blocking retries feature.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文