春季kafka |如何使避风式可重击?

发布于 2025-01-23 19:24:51 字数 864 浏览 4 评论 0原文

我有一个生产商,该制片人以 avro 格式发送消息,而消费者则倾听这些消息。

我还通过在我的消费者中使用@retryabletopic来处理错误,从而实现了非阻滞试验。

当消费者无法对消息进行估算(由于模式更改或任何原因)时,它不会将该消息放在-Retry-Retry主题中。它将其直接发送到-DLT主题。

我希望deSerializationException也需要重述。原因是,当这些错误被重述时,我可以在消费者中部署修复程序,以最终可以成功进行。

我在@retryabletopic中尝试了includ选项,但似乎对deserializationException不起作用。

  @RetryableTopic(
    attempts = "${app.consumer.retry.topic.count:5}",
    backoff = @Backoff(delayExpression = "${app.consumer.retry.topic.back-off:2000}"),
    fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC,
    include = {DeserializationException.class}  // does not work
  )

@retryabletopic中的错误吗?还是有其他方法可以实现这一目标?

I have a Producer that sends messages in Avro format and a Consumer which listens to those messages.

I have also implemented non-blocking retries by using @RetryableTopic in my Consumer for handling errors.

When the Consumer is unable to deserialize a message (due to schema change or whatever reasons), it does not put that message in the -retry topic. It directly sends it to the -dlt topic instead.

I want DeserializationExceptions to be retried as well. Reason is that by the time these errors are retried, I can deploy a fix in my Consumer so that the retries could eventually succeed.

I tried the include option in @RetryableTopic but it doesn't seem to work for DeserializationException.

  @RetryableTopic(
    attempts = "${app.consumer.retry.topic.count:5}",
    backoff = @Backoff(delayExpression = "${app.consumer.retry.topic.back-off:2000}"),
    fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC,
    include = {DeserializationException.class}  // does not work
  )

Is it a bug in @RetryableTopic or is there another way to achieve this?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

雨的味道风的声音 2025-01-30 19:24:51

由于Spring Kafka 2.8.3有一组全局致命异常,如您所述,将直接将记录转发到dlt

处理此类例外的通常模式是,在修复了修复程序之后,具有某种控制台应用程序,可以从dlt中检索失败的记录并重新处理,也许是通过将记录发送回记录到第一个重试的主题,因此主题中没有重复。

对于您描述的模式,您可以通过提供destinationTopicResolver bean,例如:

@Bean(name = RetryTopicInternalBeanNames.DESTINATION_TOPIC_CONTAINER_NAME)
public DestinationTopicResolver topicResolver(ApplicationContext applicationContext) {
    DefaultDestinationTopicResolver ddtr = new DefaultDestinationTopicResolver(Clock.systemUTC(), applicationContext);
    ddtr.removeClassification(DeserializationException.class);
    return ddtr;
}

请让我知道是否对您有用,可以管理致命异常。谢谢。

Since Spring Kafka 2.8.3 there's a set of global fatal exceptions that, as you described, will cause the record to be forwarded to the DLT directly.

The usual pattern to handle this kind of exception is, after the fix has been deployed, have some kind of console application to retrieve the failed record from the DLT and reprocess it, perhaps by sending the record back to the first retry topic so that there's no duplicate in the main topic.

For the pattern you described, you can manage this global set of FATAL exceptions by providing a DestinationTopicResolver bean, such as:

@Bean(name = RetryTopicInternalBeanNames.DESTINATION_TOPIC_CONTAINER_NAME)
public DestinationTopicResolver topicResolver(ApplicationContext applicationContext) {
    DefaultDestinationTopicResolver ddtr = new DefaultDestinationTopicResolver(Clock.systemUTC(), applicationContext);
    ddtr.removeClassification(DeserializationException.class);
    return ddtr;
}

Please let me know if that works for you. Thanks.

空名 2025-01-30 19:24:51

这是我们实现的方式:

  @Bean(name = RetryTopicInternalBeanNames.DESTINATION_TOPIC_CONTAINER_NAME)
  public DestinationTopicResolver destinationTopicResolver(ApplicationContext context) {
    DefaultDestinationTopicResolver resolver = new DefaultDestinationTopicResolver(systemUTC(), context);
    resolver.setClassifications(emptyMap(), true);
    return resolver;
  }

这样,我们不必指定每个例外就可以一个一个。另一个解决方案是Tomaz的建议:

  @Bean(name = RetryTopicInternalBeanNames.DESTINATION_TOPIC_CONTAINER_NAME)
  public DestinationTopicResolver topicResolver(ApplicationContext applicationContext) {
    DefaultDestinationTopicResolver ddtr = new DefaultDestinationTopicResolver(systemUTC(), applicationContext);
    ddtr.removeClassification(DeserializationException.class);
    ddtr.removeClassification(ClassCastException.class);
    return ddtr;
  }

Here is how we have achieved it:

  @Bean(name = RetryTopicInternalBeanNames.DESTINATION_TOPIC_CONTAINER_NAME)
  public DestinationTopicResolver destinationTopicResolver(ApplicationContext context) {
    DefaultDestinationTopicResolver resolver = new DefaultDestinationTopicResolver(systemUTC(), context);
    resolver.setClassifications(emptyMap(), true);
    return resolver;
  }

This way we don't have to specify every exception to be included one by one. Another solution is as suggested by Tomaz:

  @Bean(name = RetryTopicInternalBeanNames.DESTINATION_TOPIC_CONTAINER_NAME)
  public DestinationTopicResolver topicResolver(ApplicationContext applicationContext) {
    DefaultDestinationTopicResolver ddtr = new DefaultDestinationTopicResolver(systemUTC(), applicationContext);
    ddtr.removeClassification(DeserializationException.class);
    ddtr.removeClassification(ClassCastException.class);
    return ddtr;
  }
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文