Kafka 主题 Avro 键未从 Cassandra 源连接器填充

发布于 2025-01-09 12:28:39 字数 2495 浏览 1 评论 0原文

我有一个 Kafka 主题,它具有以下 Avro Key Schema:

{
  "type": "record",
  "name": "onboardingStatusKey",
  "namespace": "onboarding",
  "fields": [
    {
      "name": "time_created",
      "type": "long",
      "logicalType": "timestamp-millis"
    },
    {
      "name": "local_enterprise_id",
      "type": "string"
    },
    {
      "name": "source_request_reference",
      "type": "string"
    },
    {
      "name": "onboarding_request_id",
      "type": "string"
    },
    {
      "name": "system_code",
      "type": "string"
    }
  ],
  "version": "1"
}

我在 Cassandra 中还有一个表,其中包含这些字段(以及其他一些字段)。我正在寻找一个源连接器,它将数据从 Cassandra 表移动到我的 Kafka 主题。

我的源连接器配置是:

{
    "name": "cassandra-source-{{TableName}}-3",
    "config": {
        "connector.class": "com.datamountaineer.streamreactor.connect.cassandra.source.CassandraSourceConnector",
        "tasks.max": "2",
        "connect.cassandra.contact.points": "{{CassandraContactPoints}}",
        "connect.cassandra.port": "{{CassandraContactPort}}",
        "connect.cassandra.ssl.enabled": "true",
        "connect.cassandra.trust.store.path": "/trusted-clusters.jks",
        "connect.cassandra.trust.store.password": "{{TruststorePWD}}",
        "connect.cassandra.trust.store.type": "JKS",
        "connect.cassandra.username": "{{CassandraUsername}}",
        "connect.cassandra.password": "{{CassandraPassword}}",
        "connect.cassandra.key.space": "{{Keyspace}}",

        "connect.cassandra.import.mode": "incremental",
        "connect.cassandra.kcql": "INSERT INTO {{TopicName}} SELECT * FROM {{TableName}} PK time_created INCREMENTALMODE=TIMESTAMP WITHKEY(time_created, local_enterprise_id, source_request_reference, onboarding_request_id, system_code)",

        "key.converter": "io.confluent.connect.avro.AvroConverter",
        "key.converter.schema.registry.url": "https://{{SchemaUsername}}:{{SchemaPassword}}@kafka-schema.{{SchemaURL}}.cnodes.io:8085",
        "key.converter.basic.auth.credentials.source": "URL",

        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url": "https://{{SchemaUsername}}:{{SchemaPassword}}@kafka-schema.{{SchemaURL}}.cnodes.io:8085",
        "value.converter.basic.auth.credentials.source": "URL",

        "errors.tolerance": "all",
        "errors.log.include.messages": true,
        "errors.log.enable": true
    }
}

表中的行正在发送到主题,但每条记录中缺少键(仅出现值部分)

我缺少什么?

I have a Kafka Topic that has the following Avro Key Schema:

{
  "type": "record",
  "name": "onboardingStatusKey",
  "namespace": "onboarding",
  "fields": [
    {
      "name": "time_created",
      "type": "long",
      "logicalType": "timestamp-millis"
    },
    {
      "name": "local_enterprise_id",
      "type": "string"
    },
    {
      "name": "source_request_reference",
      "type": "string"
    },
    {
      "name": "onboarding_request_id",
      "type": "string"
    },
    {
      "name": "system_code",
      "type": "string"
    }
  ],
  "version": "1"
}

I also have a table within Cassandra that has these fields (plus a few others). I'm looking for a Source Connector that will move the data from the Cassandra table to my Kafka topic.

My Source Connector config is:

{
    "name": "cassandra-source-{{TableName}}-3",
    "config": {
        "connector.class": "com.datamountaineer.streamreactor.connect.cassandra.source.CassandraSourceConnector",
        "tasks.max": "2",
        "connect.cassandra.contact.points": "{{CassandraContactPoints}}",
        "connect.cassandra.port": "{{CassandraContactPort}}",
        "connect.cassandra.ssl.enabled": "true",
        "connect.cassandra.trust.store.path": "/trusted-clusters.jks",
        "connect.cassandra.trust.store.password": "{{TruststorePWD}}",
        "connect.cassandra.trust.store.type": "JKS",
        "connect.cassandra.username": "{{CassandraUsername}}",
        "connect.cassandra.password": "{{CassandraPassword}}",
        "connect.cassandra.key.space": "{{Keyspace}}",

        "connect.cassandra.import.mode": "incremental",
        "connect.cassandra.kcql": "INSERT INTO {{TopicName}} SELECT * FROM {{TableName}} PK time_created INCREMENTALMODE=TIMESTAMP WITHKEY(time_created, local_enterprise_id, source_request_reference, onboarding_request_id, system_code)",

        "key.converter": "io.confluent.connect.avro.AvroConverter",
        "key.converter.schema.registry.url": "https://{{SchemaUsername}}:{{SchemaPassword}}@kafka-schema.{{SchemaURL}}.cnodes.io:8085",
        "key.converter.basic.auth.credentials.source": "URL",

        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url": "https://{{SchemaUsername}}:{{SchemaPassword}}@kafka-schema.{{SchemaURL}}.cnodes.io:8085",
        "value.converter.basic.auth.credentials.source": "URL",

        "errors.tolerance": "all",
        "errors.log.include.messages": true,
        "errors.log.enable": true
    }
}

The rows from the table are being sent to the topic but the key is missing from each record (just the value section appears)

What am I missing?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

聊慰 2025-01-16 12:28:39

免责声明:我没有使用此连接器的经验

您似乎缺少 WITHKEY 查询的一部分,用于选择一对多字段作为记录键写入。

请记住,源连接器创建自己的 Avro 架构,并且不会使用您在兼容性检查之外预定义的架构(如果启用)

Disclaimer: I have no experience with this connector

You seem to be missing a WITHKEY part of your query for selecting one-to-many fields to write as the record key.

Keep in mind that source connectors create their own Avro schemas, and will not use the one you've pre-defined outside of compatibility checks (if enabled)

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文