MirrorSourCeconnector:覆盖消费者钥匙。Serializer属性
我正在尝试从集群A到集群B中的主题运行MirrorSourCeconnector。 创建连接器并消耗第一条消息后,我注意到镜像主题密钥和值总是被序列化为ytearray。在键的情况下,使用自定义类进行转换时,这是一个问题。
在Github中检查了MirrorSourCeConfig课程后,我发现了使用source.admin。和target。但是似乎没有任何不同(在日志中,我仍然可以看到正在使用Bytearray序列化器)。
我的连接器配置看起来像是这样:
{"target.cluster.status.storage.replication.factor": "-1",
"connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"auto.create.mirror.topics.enable": true,
"offset-syncs.topic.replication.factor": "1",
"replication.factor": "1",
"sync.topic.acls.enabled": "false",
"topics": "test-topic",
"target.cluster.config.storage.replication.factor": "-1",
"source.cluster.alias": "source-cluster-dev",
"source.cluster.bootstrap.servers": "source-cluster-dev:9092",
"target.cluster.offset.storage.replication.factor": "-1",
"target.cluster.alias": "target-cluster-dev",
"target.cluster.security.protocol": "PLAINTEXT",
"header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"name": "test-mirror-connector",
"source.admin.key.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
"source.admin.value.deserializer":"org.apache.kafka.common.serialization.ByteArrayDeserializer",
"target.admin.key.serializer": "org.apache.kafka.common.serialization.StringDeserializer",
"target.admin.value.serializer":"org.apache.kafka.common.serialization.ByteArrayDeserializer",
"target.cluster.bootstrap.servers": "target-cluster-dev:9092"}
是否有一种方法可以覆盖消费者和生产者Ser/De-Serialization属性或其他任何使镜像主题与源主题完全相同的方法?在静脉化的意义上。
I am trying to run MirrorSourceConnector from a Topic in cluster A to cluster B.
After creating the connector and consuming first message I noticed that mirrored topic key and value is always serialized as a ByteArray. Which in case of a key is a bit of a problem when doing the transformations with a custom class.
After checking MirrorSourceConfig class in github I found out that with source.admin. and target.admin I could basically add consumer and producer properties. But seems it does not make any different (in logs I could still see that ByteArray serializer is being used).
My connector config looks like that:
{"target.cluster.status.storage.replication.factor": "-1",
"connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"auto.create.mirror.topics.enable": true,
"offset-syncs.topic.replication.factor": "1",
"replication.factor": "1",
"sync.topic.acls.enabled": "false",
"topics": "test-topic",
"target.cluster.config.storage.replication.factor": "-1",
"source.cluster.alias": "source-cluster-dev",
"source.cluster.bootstrap.servers": "source-cluster-dev:9092",
"target.cluster.offset.storage.replication.factor": "-1",
"target.cluster.alias": "target-cluster-dev",
"target.cluster.security.protocol": "PLAINTEXT",
"header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"name": "test-mirror-connector",
"source.admin.key.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
"source.admin.value.deserializer":"org.apache.kafka.common.serialization.ByteArrayDeserializer",
"target.admin.key.serializer": "org.apache.kafka.common.serialization.StringDeserializer",
"target.admin.value.serializer":"org.apache.kafka.common.serialization.ByteArrayDeserializer",
"target.cluster.bootstrap.servers": "target-cluster-dev:9092"}
Is there a way to override Consumer and Producer Ser/De-serialization properties or any other way to make mirror topic to be exactly the same as a source topic? In the meaning of seralization.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论