MirrorSourCeconnector:覆盖消费者钥匙。Serializer属性

发布于 2025-02-03 09:46:13 字数 1702 浏览 3 评论 0原文

我正在尝试从集群A到集群B中的主题运行MirrorSourCeconnector。 创建连接器并消耗第一条消息后,我注意到镜像主题密钥和值总是被序列化为ytearray。在键的情况下,使用自定义类进行转换时,这是一个问题。

在Github中检查了MirrorSourCeConfig课程后,我发现了使用source.admin。和target。但是似乎没有任何不同(在日志中,我仍然可以看到正在使用Bytearray序列化器)。

我的连接器配置看起来像是这样:

{"target.cluster.status.storage.replication.factor": "-1",
 "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
 "auto.create.mirror.topics.enable": true,
 "offset-syncs.topic.replication.factor": "1",
 "replication.factor": "1",
 "sync.topic.acls.enabled": "false",
 "topics": "test-topic",
 "target.cluster.config.storage.replication.factor": "-1",
 "source.cluster.alias": "source-cluster-dev",
 "source.cluster.bootstrap.servers": "source-cluster-dev:9092",
 "target.cluster.offset.storage.replication.factor": "-1",
 "target.cluster.alias": "target-cluster-dev",
 "target.cluster.security.protocol": "PLAINTEXT",
 "header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
 "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
 "key.converter": "org.apache.kafka.connect.storage.StringConverter",
 "name": "test-mirror-connector",
 "source.admin.key.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
 "source.admin.value.deserializer":"org.apache.kafka.common.serialization.ByteArrayDeserializer",
 "target.admin.key.serializer": "org.apache.kafka.common.serialization.StringDeserializer",
 "target.admin.value.serializer":"org.apache.kafka.common.serialization.ByteArrayDeserializer",
 "target.cluster.bootstrap.servers": "target-cluster-dev:9092"}

是否有一种方法可以覆盖消费者和生产者Ser/De-Serialization属性或其他任何使镜像主题与源主题完全相同的方法?在静脉化的意义上。

I am trying to run MirrorSourceConnector from a Topic in cluster A to cluster B.
After creating the connector and consuming first message I noticed that mirrored topic key and value is always serialized as a ByteArray. Which in case of a key is a bit of a problem when doing the transformations with a custom class.

After checking MirrorSourceConfig class in github I found out that with source.admin. and target.admin I could basically add consumer and producer properties. But seems it does not make any different (in logs I could still see that ByteArray serializer is being used).

My connector config looks like that:

{"target.cluster.status.storage.replication.factor": "-1",
 "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
 "auto.create.mirror.topics.enable": true,
 "offset-syncs.topic.replication.factor": "1",
 "replication.factor": "1",
 "sync.topic.acls.enabled": "false",
 "topics": "test-topic",
 "target.cluster.config.storage.replication.factor": "-1",
 "source.cluster.alias": "source-cluster-dev",
 "source.cluster.bootstrap.servers": "source-cluster-dev:9092",
 "target.cluster.offset.storage.replication.factor": "-1",
 "target.cluster.alias": "target-cluster-dev",
 "target.cluster.security.protocol": "PLAINTEXT",
 "header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
 "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
 "key.converter": "org.apache.kafka.connect.storage.StringConverter",
 "name": "test-mirror-connector",
 "source.admin.key.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
 "source.admin.value.deserializer":"org.apache.kafka.common.serialization.ByteArrayDeserializer",
 "target.admin.key.serializer": "org.apache.kafka.common.serialization.StringDeserializer",
 "target.admin.value.serializer":"org.apache.kafka.common.serialization.ByteArrayDeserializer",
 "target.cluster.bootstrap.servers": "target-cluster-dev:9092"}

Is there a way to override Consumer and Producer Ser/De-serialization properties or any other way to make mirror topic to be exactly the same as a source topic? In the meaning of seralization.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文