kafka Connect S3源连接器忽略密钥

发布于 2025-02-09 01:11:59 字数 1218 浏览 2 评论 0 原文

我在AWS S3中有以下文件,由Kafka Connect Connect Connector备份:

test_yordan_kafka_connect+0+0000023000.json
test_yordan_kafka_connect+0+0000023000.keys.json

使用Kafka Connect S3源恢复主题时,密钥文件被忽略了,我在日志中看到以下调试消息:

DEBUG [source-s3|task-0] Removing test-bucket/topics/test_yordan_kafka_connect/partition=1/test_yordan_kafka_connect+1+0000000000.keys.json from common prefixes. (io.confluent.connect.s3.source.S3Storage:333)

我的源配置看起来像:

"connector.class":"io.confluent.connect.s3.source.S3SourceConnector",
"tasks.max":"1",
"s3.region":"eu-central-1",
"s3.bucket.name":"test-bucket",
"topics.dir":"test-bucket/topics",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"partitioner.class":"io.confluent.connect.storage.partitioner.DefaultPartitioner",
"schema.compatibility":"NONE",
"confluent.topic.bootstrap.servers": "blockchain-kafka-kafka-0.blockchain-kafka-kafka-headless.default.svc.cluster.local:9092",
"transforms":"AddPrefix",
"transforms.AddPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex":".*",
"transforms.AddPrefix.replacement":"$0_copy

我应该更改什么,以便将密钥和消息一起存储在Kafka中。

I have the following files in AWS S3 backed up by the Kafka connect sink connector:

test_yordan_kafka_connect+0+0000023000.json
test_yordan_kafka_connect+0+0000023000.keys.json

When restoring the topic with Kafka connect S3 source the key file is being ignored, I see the following debug message in the log:

DEBUG [source-s3|task-0] Removing test-bucket/topics/test_yordan_kafka_connect/partition=1/test_yordan_kafka_connect+1+0000000000.keys.json from common prefixes. (io.confluent.connect.s3.source.S3Storage:333)

My source config looks like so:

"connector.class":"io.confluent.connect.s3.source.S3SourceConnector",
"tasks.max":"1",
"s3.region":"eu-central-1",
"s3.bucket.name":"test-bucket",
"topics.dir":"test-bucket/topics",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"partitioner.class":"io.confluent.connect.storage.partitioner.DefaultPartitioner",
"schema.compatibility":"NONE",
"confluent.topic.bootstrap.servers": "blockchain-kafka-kafka-0.blockchain-kafka-kafka-headless.default.svc.cluster.local:9092",
"transforms":"AddPrefix",
"transforms.AddPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex":".*",
"transforms.AddPrefix.replacement":"$0_copy

What should I change so that the key is stored in Kafka along with the message.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

一场春暖 2025-02-16 01:11:59

我忽略了如何确保 .keys.json 在阅读S3的数据时构造Kafka密钥。

我用来实现同一件事的一个技巧只是依靠 valuetekey smt变形金刚:如果密钥值也是值有效载荷的一部分,那么将其注入键就像添加一样简单 org.apache.kafka.connect.transforms.valuetokey 变压器的实例。

这是相关文档:

这是一篇很棒的博客文章,介绍了该变压器(这是一系列12个同样出色的帖子的一部分)
https 。

​不是做我想要的事情,所以我最终写了自己的 customKeytovalue 我的水槽连接器使用的变压器来确保键存储在S3中,然后 customvaluetokey >在源连接器上,将事物重建为Kafka。

编写这样的自定义变压器实际上很容易,在单个Java文件中本质上只有20行代码左右,然后您可以使用 kafka-connect-maven-plugin 包装。 Here is for example the code of the built-in ValueToKey that can serve as inspiration:

我们也可以应用相同的原理来保存/检索Kafka标头。

I ignore how to make sure that .keys.json is taken into account to construct the Kafka keys when reading the data from S3.

One trick I used to achieve the same thing though is simply to rely on the ValueToKey SMT transformer: if the key value is also part of the value payload, then injecting it into the key is as simple as adding an instance of the org.apache.kafka.connect.transforms.ValueToKey transformer in the source connector config.

Here is the relevant doc:
https://docs.confluent.io/platform/current/connect/transforms/valuetokey.html

And here is a great blog post about that transformer (part of a series of 12 posts that are equally great)
https://rmoff.net/2020/12/09/twelve-days-of-smt-day-2-valuetokey-and-extractfield/

In my specific case, the built-in behavior of ValueToKey was not doing exactly what I wanted, so I ended up writing my own CustomKeyToValue transformer used by my sink connector to make sure the key was stored in S3, and then a CustomValueToKey on the source connector to rebuild things into Kafka.

Writing such custom transformers is actually pretty easy, it's essentially just 20 lines of code or so inside a single java file, that you can then package with the kafka-connect-maven-plugin. Here is for example the code of the built-in ValueToKey that can serve as inspiration:

https://github.com/apache/kafka/blob/28f013708ffe8e48e46f408c7f570bf2cd5c54b2/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ValueToKey.java/java/org/apache/kafka/connect/transforms/ValueToKey.java

We can apply the same principle to save/retrieve the Kafka headers as well.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文