春季kafka |如何使避风式可重击?
我有一个生产商,该制片人以 avro 格式发送消息,而消费者则倾听这些消息。
我还通过在我的消费者中使用@retryabletopic
来处理错误,从而实现了非阻滞试验。
当消费者无法对消息进行估算(由于模式更改或任何原因)时,它不会将该消息放在-Retry-Retry
主题中。它将其直接发送到-DLT
主题。
我希望deSerializationException
也需要重述。原因是,当这些错误被重述时,我可以在消费者中部署修复程序,以最终可以成功进行。
我在@retryabletopic
中尝试了includ
选项,但似乎对deserializationException
不起作用。
@RetryableTopic(
attempts = "${app.consumer.retry.topic.count:5}",
backoff = @Backoff(delayExpression = "${app.consumer.retry.topic.back-off:2000}"),
fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC,
include = {DeserializationException.class} // does not work
)
是@retryabletopic
中的错误吗?还是有其他方法可以实现这一目标?
I have a Producer that sends messages in Avro format and a Consumer which listens to those messages.
I have also implemented non-blocking retries by using @RetryableTopic
in my Consumer for handling errors.
When the Consumer is unable to deserialize a message (due to schema change or whatever reasons), it does not put that message in the -retry
topic. It directly sends it to the -dlt
topic instead.
I want DeserializationException
s to be retried as well. Reason is that by the time these errors are retried, I can deploy a fix in my Consumer so that the retries could eventually succeed.
I tried the include
option in @RetryableTopic
but it doesn't seem to work for DeserializationException
.
@RetryableTopic(
attempts = "${app.consumer.retry.topic.count:5}",
backoff = @Backoff(delayExpression = "${app.consumer.retry.topic.back-off:2000}"),
fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC,
include = {DeserializationException.class} // does not work
)
Is it a bug in @RetryableTopic
or is there another way to achieve this?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
由于
Spring Kafka 2.8.3
有一组全局致命异常,如您所述,将直接将记录转发到dlt
。处理此类例外的通常模式是,在修复了修复程序之后,具有某种控制台应用程序,可以从
dlt
中检索失败的记录并重新处理,也许是通过将记录发送回记录到第一个重试的主题,因此主题中没有重复。对于您描述的模式,您可以通过提供
destinationTopicResolver
bean,例如:请让我知道是否对您有用,可以管理
致命
异常。谢谢。Since
Spring Kafka 2.8.3
there's a set of global fatal exceptions that, as you described, will cause the record to be forwarded to theDLT
directly.The usual pattern to handle this kind of exception is, after the fix has been deployed, have some kind of console application to retrieve the failed record from the
DLT
and reprocess it, perhaps by sending the record back to the first retry topic so that there's no duplicate in the main topic.For the pattern you described, you can manage this global set of
FATAL
exceptions by providing aDestinationTopicResolver
bean, such as:Please let me know if that works for you. Thanks.
这是我们实现的方式:
这样,我们不必指定每个例外就可以一个一个。另一个解决方案是Tomaz的建议:
Here is how we have achieved it:
This way we don't have to specify every exception to be included one by one. Another solution is as suggested by Tomaz: