RERRYTEMPLATE与ErrorHandler的性能比较
我有两种方法可以通过
- 在
kafkalistener中使用retryTemplate从kafkalistener错误恢复,
方法:
@KafkaListener(topics: "topic1")
public void handle(command) {
retryTemplate.execute(ctx -> {
processCommand(command);
});
// Retries exhausted, execute the recoverer logic
recover(command);
}
- 将errorHandler设置为MessageListenerContainer通过contagererCustomizer:
@Component
public class ContainerCustomizer {
public ContainerCustomizer(CustomConcurrentContainerListenerFactory factory) {
factory.setContainerCustomizer(container -> {
container.setErrorHandler(new SeekToCurrentErrorHandler((ConsumerRecord<?, ?> record, Exception e) -> {
//logic for recoverer after retries exhausted
recover(convertRecord(record));
}, new ExponentialBackOffWithMaxRetries(2)));
});
}
}
在性能上:涉及性能和阻止消费者线程时,这两个选项如何比较这两个选项?确实说,使用retrytemplate.execute
,在单独的线程中处理重试时,带有containerListener.seterlistener.seterrorhandler
它会阻止主要消费者的线程?
I have two ways of recovering from KafkaListener errors with using
- RetryTemplate in the
KafkaListener
method:
@KafkaListener(topics: "topic1")
public void handle(command) {
retryTemplate.execute(ctx -> {
processCommand(command);
});
// Retries exhausted, execute the recoverer logic
recover(command);
}
- Set ErrorHandler to MessageListenerContainer via ContainerCustomizer:
@Component
public class ContainerCustomizer {
public ContainerCustomizer(CustomConcurrentContainerListenerFactory factory) {
factory.setContainerCustomizer(container -> {
container.setErrorHandler(new SeekToCurrentErrorHandler((ConsumerRecord<?, ?> record, Exception e) -> {
//logic for recoverer after retries exhausted
recover(convertRecord(record));
}, new ExponentialBackOffWithMaxRetries(2)));
});
}
}
When it comes to performance and blocking the consumer thread, how these two options compare? Is it true to say that with RetryTemplate.execute
, retries are handled in a separate thread while with containerListener.setErrorHandler
it blocks the main consumer's thread?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
两者都会阻塞消费者线程——否则你将继续处理记录,并且 Kafka 的排序保证将会丢失。
此外,这两种方法均已弃用,取而代之的是
DefaultErrorHandler
,它是SeekToCurrentErrorHandler
的演变。两者之间的区别在于,使用 Spring Retry 时,所有调用都将在内存中重试,因此您应该确保聚合退避不会超过 max.poll.interval.ms< /code> 否则代理会认为您的服务器已失效并将执行重新平衡。
SeekToCurrentErrorHandler
以及DefaultErrorHandler
都会在每次重试时向代理执行新的查找,因此您只需确保最大延迟加上执行时间不超过 <代码>max.poll.interval.ms。如果您需要
非阻塞重试
,请查看非阻塞重试功能。Both will block the consumer thread - otherwise you'd continue processing records and Kafka's ordering guarantees would be lost.
Also, both approaches are deprecated in favor of
DefaultErrorHandler
, which is an evolution ofSeekToCurrentErrorHandler
.The difference between the two is, with
Spring Retry
, all invocations will be retried in memory, so you should make sure the aggregate backoff won't exceed themax.poll.interval.ms
or the broker will think your server is dead and will perform a rebalance.SeekToCurrentErrorHandler
, as well asDefaultErrorHandler
, will perform a new seek to the broker for each retry, so you just need to make sure the biggest delay plus execution time does not exceedmax.poll.interval.ms
.If you need
non-blocking retries
, take a look at the non-blocking retries feature.