Kafka 主题 Avro 键未从 Cassandra 源连接器填充
我有一个 Kafka 主题,它具有以下 Avro Key Schema:
{
"type": "record",
"name": "onboardingStatusKey",
"namespace": "onboarding",
"fields": [
{
"name": "time_created",
"type": "long",
"logicalType": "timestamp-millis"
},
{
"name": "local_enterprise_id",
"type": "string"
},
{
"name": "source_request_reference",
"type": "string"
},
{
"name": "onboarding_request_id",
"type": "string"
},
{
"name": "system_code",
"type": "string"
}
],
"version": "1"
}
我在 Cassandra 中还有一个表,其中包含这些字段(以及其他一些字段)。我正在寻找一个源连接器,它将数据从 Cassandra 表移动到我的 Kafka 主题。
我的源连接器配置是:
{
"name": "cassandra-source-{{TableName}}-3",
"config": {
"connector.class": "com.datamountaineer.streamreactor.connect.cassandra.source.CassandraSourceConnector",
"tasks.max": "2",
"connect.cassandra.contact.points": "{{CassandraContactPoints}}",
"connect.cassandra.port": "{{CassandraContactPort}}",
"connect.cassandra.ssl.enabled": "true",
"connect.cassandra.trust.store.path": "/trusted-clusters.jks",
"connect.cassandra.trust.store.password": "{{TruststorePWD}}",
"connect.cassandra.trust.store.type": "JKS",
"connect.cassandra.username": "{{CassandraUsername}}",
"connect.cassandra.password": "{{CassandraPassword}}",
"connect.cassandra.key.space": "{{Keyspace}}",
"connect.cassandra.import.mode": "incremental",
"connect.cassandra.kcql": "INSERT INTO {{TopicName}} SELECT * FROM {{TableName}} PK time_created INCREMENTALMODE=TIMESTAMP WITHKEY(time_created, local_enterprise_id, source_request_reference, onboarding_request_id, system_code)",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "https://{{SchemaUsername}}:{{SchemaPassword}}@kafka-schema.{{SchemaURL}}.cnodes.io:8085",
"key.converter.basic.auth.credentials.source": "URL",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "https://{{SchemaUsername}}:{{SchemaPassword}}@kafka-schema.{{SchemaURL}}.cnodes.io:8085",
"value.converter.basic.auth.credentials.source": "URL",
"errors.tolerance": "all",
"errors.log.include.messages": true,
"errors.log.enable": true
}
}
表中的行正在发送到主题,但每条记录中缺少键(仅出现值部分)
我缺少什么?
I have a Kafka Topic that has the following Avro Key Schema:
{
"type": "record",
"name": "onboardingStatusKey",
"namespace": "onboarding",
"fields": [
{
"name": "time_created",
"type": "long",
"logicalType": "timestamp-millis"
},
{
"name": "local_enterprise_id",
"type": "string"
},
{
"name": "source_request_reference",
"type": "string"
},
{
"name": "onboarding_request_id",
"type": "string"
},
{
"name": "system_code",
"type": "string"
}
],
"version": "1"
}
I also have a table within Cassandra that has these fields (plus a few others). I'm looking for a Source Connector that will move the data from the Cassandra table to my Kafka topic.
My Source Connector config is:
{
"name": "cassandra-source-{{TableName}}-3",
"config": {
"connector.class": "com.datamountaineer.streamreactor.connect.cassandra.source.CassandraSourceConnector",
"tasks.max": "2",
"connect.cassandra.contact.points": "{{CassandraContactPoints}}",
"connect.cassandra.port": "{{CassandraContactPort}}",
"connect.cassandra.ssl.enabled": "true",
"connect.cassandra.trust.store.path": "/trusted-clusters.jks",
"connect.cassandra.trust.store.password": "{{TruststorePWD}}",
"connect.cassandra.trust.store.type": "JKS",
"connect.cassandra.username": "{{CassandraUsername}}",
"connect.cassandra.password": "{{CassandraPassword}}",
"connect.cassandra.key.space": "{{Keyspace}}",
"connect.cassandra.import.mode": "incremental",
"connect.cassandra.kcql": "INSERT INTO {{TopicName}} SELECT * FROM {{TableName}} PK time_created INCREMENTALMODE=TIMESTAMP WITHKEY(time_created, local_enterprise_id, source_request_reference, onboarding_request_id, system_code)",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "https://{{SchemaUsername}}:{{SchemaPassword}}@kafka-schema.{{SchemaURL}}.cnodes.io:8085",
"key.converter.basic.auth.credentials.source": "URL",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "https://{{SchemaUsername}}:{{SchemaPassword}}@kafka-schema.{{SchemaURL}}.cnodes.io:8085",
"value.converter.basic.auth.credentials.source": "URL",
"errors.tolerance": "all",
"errors.log.include.messages": true,
"errors.log.enable": true
}
}
The rows from the table are being sent to the topic but the key is missing from each record (just the value section appears)
What am I missing?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
您似乎缺少
WITHKEY
查询的一部分,用于选择一对多字段作为记录键写入。请记住,源连接器创建自己的 Avro 架构,并且不会使用您在兼容性检查之外预定义的架构(如果启用)
You seem to be missing a
WITHKEY
part of your query for selecting one-to-many fields to write as the record key.Keep in mind that source connectors create their own Avro schemas, and will not use the one you've pre-defined outside of compatibility checks (if enabled)