我会提出什么样的错误,所以我的消息最终出现在DLQ中
我有一个可能令人尴尬的问题,但我有点迷路了。
我已经阅读了这篇文章,我认为Kafkaconnect如何处理DLQ非常全面。 https://www.confluent.io/blog/kafka-connect-deep-deep-dive-error handling-dead-letter-queues/
我将其设置在连接器上:
errors.tolerance: all
errors.log.enable: true
errors.log.include.messages: false
errors.deadletterqueue.topic.name: com.kafkaconnect.jdbc.dlq
errors.deadletterqueue.topic.replication.factor: 1
errors.deadletterqueue.context.headers.enable: true
但是我体验了2件事: 首先,我的DLQ总是空的。 其次,如果我在其中一个主题中有错误,我正在处理所有内容,一切都停止,并且连接器进入错误模式。
我首先想问我的假设: 我的假设是我可以发送到DLQ的消息是失败的消息,无论是ValueConverter还是KeyConverter处理。这将是我的期望。
其次,可以调用ValueConverter期间的错误,以便任务/连接器继续处理其他消息,但错误消息将进入DLQ。
目前,我在ValueConverter中发生故障的代码看起来像这样,这基本上使连接器爆炸而不是将消息发送到DLQ。
public SchemaAndValue toConnectData(String topic, Headers headers, byte[] value) {
try {
Object deserialized = this.deserializer.deserialize(topic, value);
if (deserialized == null) {
return SchemaAndValue.NULL;
} else if (deserialized instanceof Message) {
Message message = (Message) deserialized;
return this.protobufData.toConnectData(message.getDescriptorForType(), message, topic);
}
throw new DataException(String.format(
"Unsupported type returned during deserialization of topic %s ",
topic
));
} catch (SerializationException e) {
throw new DataException(String.format(
"Failed to deserialize data for topic %s to Protobuf: ",
topic
), e);
}
}
我知道这不是问题的一个小场景,但是老实说,我不知道如何轻松创建一个。我更想知道有使用Kafkaconnectors经验的人通常如何实施它,以便在DLQ中出现故障的消息最终会出现。
感谢您的任何反馈。
I have what is likely an embarrassing question but I'm a bit lost.
I've read this article which I think is quite comprehensive about how KafkaConnect can handle DLQs.
https://www.confluent.io/blog/kafka-connect-deep-dive-error-handling-dead-letter-queues/
I set it up on my connector:
errors.tolerance: all
errors.log.enable: true
errors.log.include.messages: false
errors.deadletterqueue.topic.name: com.kafkaconnect.jdbc.dlq
errors.deadletterqueue.topic.replication.factor: 1
errors.deadletterqueue.context.headers.enable: true
However I experience 2 things:
First my DLQ is always empty.
Second if I have an error in one of the topics I am processing simply everything stops and the connector just goes into error mode.
I first want to ask about my assumption:
My assumption would be messages that I can send to the DLQ are messages that fail, either the ValueConverter or the KeyConverter processing. This would be my expectation.
Second, what kind of error during the ValueConverter can be invoked so that the task/connector continues processing the other messages but the error message goes into the DLQ.
At the moment my code for failures in the ValueConverter look like this, and this makes basically the connector explode instead of sending the messages to the DLQ.
public SchemaAndValue toConnectData(String topic, Headers headers, byte[] value) {
try {
Object deserialized = this.deserializer.deserialize(topic, value);
if (deserialized == null) {
return SchemaAndValue.NULL;
} else if (deserialized instanceof Message) {
Message message = (Message) deserialized;
return this.protobufData.toConnectData(message.getDescriptorForType(), message, topic);
}
throw new DataException(String.format(
"Unsupported type returned during deserialization of topic %s ",
topic
));
} catch (SerializationException e) {
throw new DataException(String.format(
"Failed to deserialize data for topic %s to Protobuf: ",
topic
), e);
}
}
I know this is not a small reproduceable scenario of the problem, but honestly I don't know how to create one easily. I'm more wondering for people with experience in using KafkaConnectors how do you normally implement it so that the messages that fail on the deserialization end up in the DLQ.
Thank you for any feedback.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论