我可以使用不同类型的AVRO架构对不同主题使用单个S3-Sink连接器指向时间戳字段的同一字段名称?

发布于 2025-02-12 17:16:19 字数 2968 浏览 1 评论 0原文

主题t1架构T1

{
  "type": "record",
  "name": "Envelope",
  "namespace": "t1",
  "fields": [
    {
      "name": "before",
      "type": [
        "null",
        {
          "type": "record",
          "name": "Value",
          "fields": [
            {
              "name": "id",
              "type": {
                "type": "long",
                "connect.default": 0
              },
              "default": 0
            },
            {
              "name": "createdAt",
              "type": [
                "null",
                {
                  "type": "string",
                  "connect.version": 1,
                  "connect.name": "io.debezium.time.ZonedTimestamp"
                }
              ],
              "default": null
            },
           
          ],
          "connect.name": "t1.Value"
        }
      ],
      "default": null
    },
    {
      "name": "after",
      "type": [
        "null",
        "Value"
      ],
      "default": null
    }
   
  ],
  "connect.name": "t1.Envelope"
}

架构T2

{
    "type": "record",
    "name": "Value",
    "namespace": "t2",
    "fields": [
          {
              "name": "id",
              "type": {
                "type": "long",
                "connect.default": 0
              },
              "default": 0
          },
          {
            "name": "createdAt",
            "type": [
                "null",
                {
                    "type": "string",
                    "connect.version": 1,
                    "connect.name": "io.debezium.time.ZonedTimestamp"
                }
            ],
            "default": null
          }
    ],
    "connect.name": "t2.Value"
}

S3-sink连接器配置

connector.class=io.confluent.connect.s3.S3SinkConnector
behavior.on.null.values=ignore
s3.region=us-west-2
partition.duration.ms=1000
flush.size=1
tasks.max=3
timezone=UTC
topics.regex=t1,t2
aws.secret.access.key=******
locale=US
format.class=io.confluent.connect.s3.format.json.JsonFormat
partitioner.class=io.confluent.connect.storage.partitioner.TimeBasedPartitioner
value.converter.schemas.enable=false
name=s3-sink-connector
aws.access.key.id=******
errors.tolerance=all
value.converter=org.apache.kafka.connect.json.JsonConverter
storage.class=io.confluent.connect.s3.storage.S3Storage
key.converter=org.apache.kafka.connect.storage.StringConverter
s3.bucket.name=s3-sink-connector-bucket
path.format=YYYY/MM/dd
timestamp.extractor=RecordField
timestamp.field=after.createdAt

通过使用此连接器配置I t2主题 “创建At At createdat Five不存在” 。 如果我设置 timestamp.field = createat ,则为 t1主题丢弃了“创建att field”

我如何通过使用同一连接器对两个模式在两个模式中指出“创建” 字段?

是否可以使用单个S3-Sink连接器配置来实现此目标?

如果可能的情况是可能的,那么我该怎么做,我必须使用哪些属性来实现这一目标?

如果有人对此有所了解,请建议这样做。 如果还有其他方法可以做到这一点,请建议这样做。

schema for topic t1

{
  "type": "record",
  "name": "Envelope",
  "namespace": "t1",
  "fields": [
    {
      "name": "before",
      "type": [
        "null",
        {
          "type": "record",
          "name": "Value",
          "fields": [
            {
              "name": "id",
              "type": {
                "type": "long",
                "connect.default": 0
              },
              "default": 0
            },
            {
              "name": "createdAt",
              "type": [
                "null",
                {
                  "type": "string",
                  "connect.version": 1,
                  "connect.name": "io.debezium.time.ZonedTimestamp"
                }
              ],
              "default": null
            },
           
          ],
          "connect.name": "t1.Value"
        }
      ],
      "default": null
    },
    {
      "name": "after",
      "type": [
        "null",
        "Value"
      ],
      "default": null
    }
   
  ],
  "connect.name": "t1.Envelope"
}

schema for topic t2

{
    "type": "record",
    "name": "Value",
    "namespace": "t2",
    "fields": [
          {
              "name": "id",
              "type": {
                "type": "long",
                "connect.default": 0
              },
              "default": 0
          },
          {
            "name": "createdAt",
            "type": [
                "null",
                {
                    "type": "string",
                    "connect.version": 1,
                    "connect.name": "io.debezium.time.ZonedTimestamp"
                }
            ],
            "default": null
          }
    ],
    "connect.name": "t2.Value"
}

s3-sink Connector configuration

connector.class=io.confluent.connect.s3.S3SinkConnector
behavior.on.null.values=ignore
s3.region=us-west-2
partition.duration.ms=1000
flush.size=1
tasks.max=3
timezone=UTC
topics.regex=t1,t2
aws.secret.access.key=******
locale=US
format.class=io.confluent.connect.s3.format.json.JsonFormat
partitioner.class=io.confluent.connect.storage.partitioner.TimeBasedPartitioner
value.converter.schemas.enable=false
name=s3-sink-connector
aws.access.key.id=******
errors.tolerance=all
value.converter=org.apache.kafka.connect.json.JsonConverter
storage.class=io.confluent.connect.s3.storage.S3Storage
key.converter=org.apache.kafka.connect.storage.StringConverter
s3.bucket.name=s3-sink-connector-bucket
path.format=YYYY/MM/dd
timestamp.extractor=RecordField
timestamp.field=after.createdAt

By using this connector configuration I got error for t2 topic that is "createdAt field does not exist".
If I set timestamp.field = createdAt then error is thrown for t1 topic "createdAt field does not exist".

How can I point "createdAt" field in both schemas at the same time by using same connector for both?

Is it possible to achieve this by using a single s3-sink connector configuration ?

If this scenario is possible then how can I do this, which properties I have to use for achieve this?

If anybody has idea about this, please suggest on this.
If there is any other way to do this then please suggest that way also.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

嘿嘿嘿 2025-02-19 17:16:19

所有主题都需要相同的时间戳字段;无法配置主题到现场映射。

您的T2模式在字段之后没有,因此您需要运行两个单独的连接器,

也需要在所有记录中都存在字段,否则分区者将无法使用。

All topics will need the same timestamp field; there's no way to configure topic-to-field mappings.

Your t2 schema doesn't have an after field, so you need to run two separate connectors

The field is also required to be present in all records, otherwise the partitioner won't work.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文