kafka Connect S3源连接器忽略密钥
我在AWS S3中有以下文件,由Kafka Connect Connect Connector备份:
test_yordan_kafka_connect+0+0000023000.json
test_yordan_kafka_connect+0+0000023000.keys.json
使用Kafka Connect S3源恢复主题时,密钥文件被忽略了,我在日志中看到以下调试消息:
DEBUG [source-s3|task-0] Removing test-bucket/topics/test_yordan_kafka_connect/partition=1/test_yordan_kafka_connect+1+0000000000.keys.json from common prefixes. (io.confluent.connect.s3.source.S3Storage:333)
我的源配置看起来像:
"connector.class":"io.confluent.connect.s3.source.S3SourceConnector",
"tasks.max":"1",
"s3.region":"eu-central-1",
"s3.bucket.name":"test-bucket",
"topics.dir":"test-bucket/topics",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"partitioner.class":"io.confluent.connect.storage.partitioner.DefaultPartitioner",
"schema.compatibility":"NONE",
"confluent.topic.bootstrap.servers": "blockchain-kafka-kafka-0.blockchain-kafka-kafka-headless.default.svc.cluster.local:9092",
"transforms":"AddPrefix",
"transforms.AddPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex":".*",
"transforms.AddPrefix.replacement":"$0_copy
我应该更改什么,以便将密钥和消息一起存储在Kafka中。
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
我忽略了如何确保
.keys.json
在阅读S3的数据时构造Kafka密钥。我用来实现同一件事的一个技巧只是依靠
valuetekey
smt变形金刚:如果密钥值也是值有效载荷的一部分,那么将其注入键就像添加一样简单org.apache.kafka.connect.transforms.valuetokey
变压器的实例。这是相关文档:
这是一篇很棒的博客文章,介绍了该变压器(这是一系列12个同样出色的帖子的一部分)
https 。
不是做我想要的事情,所以我最终写了自己的
customKeytovalue
我的水槽连接器使用的变压器来确保键存储在S3中,然后customvaluetokey
>在源连接器上,将事物重建为Kafka。编写这样的自定义变压器实际上很容易,在单个Java文件中本质上只有20行代码左右,然后您可以使用
kafka-connect-maven-plugin
包装。 Here is for example the code of the built-inValueToKey
that can serve as inspiration:我们也可以应用相同的原理来保存/检索Kafka标头。
I ignore how to make sure that
.keys.json
is taken into account to construct the Kafka keys when reading the data from S3.One trick I used to achieve the same thing though is simply to rely on the
ValueToKey
SMT transformer: if the key value is also part of the value payload, then injecting it into the key is as simple as adding an instance of theorg.apache.kafka.connect.transforms.ValueToKey
transformer in the source connector config.Here is the relevant doc:
https://docs.confluent.io/platform/current/connect/transforms/valuetokey.html
And here is a great blog post about that transformer (part of a series of 12 posts that are equally great)
https://rmoff.net/2020/12/09/twelve-days-of-smt-day-2-valuetokey-and-extractfield/
In my specific case, the built-in behavior of
ValueToKey
was not doing exactly what I wanted, so I ended up writing my ownCustomKeyToValue
transformer used by my sink connector to make sure the key was stored in S3, and then aCustomValueToKey
on the source connector to rebuild things into Kafka.Writing such custom transformers is actually pretty easy, it's essentially just 20 lines of code or so inside a single java file, that you can then package with the
kafka-connect-maven-plugin
. Here is for example the code of the built-inValueToKey
that can serve as inspiration:https://github.com/apache/kafka/blob/28f013708ffe8e48e46f408c7f570bf2cd5c54b2/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ValueToKey.java/java/org/apache/kafka/connect/transforms/ValueToKey.java
We can apply the same principle to save/retrieve the Kafka headers as well.