我可以使用不同类型的AVRO架构对不同主题使用单个S3-Sink连接器指向时间戳字段的同一字段名称?
主题t1架构T1
{
"type": "record",
"name": "Envelope",
"namespace": "t1",
"fields": [
{
"name": "before",
"type": [
"null",
{
"type": "record",
"name": "Value",
"fields": [
{
"name": "id",
"type": {
"type": "long",
"connect.default": 0
},
"default": 0
},
{
"name": "createdAt",
"type": [
"null",
{
"type": "string",
"connect.version": 1,
"connect.name": "io.debezium.time.ZonedTimestamp"
}
],
"default": null
},
],
"connect.name": "t1.Value"
}
],
"default": null
},
{
"name": "after",
"type": [
"null",
"Value"
],
"default": null
}
],
"connect.name": "t1.Envelope"
}
架构T2
{
"type": "record",
"name": "Value",
"namespace": "t2",
"fields": [
{
"name": "id",
"type": {
"type": "long",
"connect.default": 0
},
"default": 0
},
{
"name": "createdAt",
"type": [
"null",
{
"type": "string",
"connect.version": 1,
"connect.name": "io.debezium.time.ZonedTimestamp"
}
],
"default": null
}
],
"connect.name": "t2.Value"
}
S3-sink连接器配置
connector.class=io.confluent.connect.s3.S3SinkConnector
behavior.on.null.values=ignore
s3.region=us-west-2
partition.duration.ms=1000
flush.size=1
tasks.max=3
timezone=UTC
topics.regex=t1,t2
aws.secret.access.key=******
locale=US
format.class=io.confluent.connect.s3.format.json.JsonFormat
partitioner.class=io.confluent.connect.storage.partitioner.TimeBasedPartitioner
value.converter.schemas.enable=false
name=s3-sink-connector
aws.access.key.id=******
errors.tolerance=all
value.converter=org.apache.kafka.connect.json.JsonConverter
storage.class=io.confluent.connect.s3.storage.S3Storage
key.converter=org.apache.kafka.connect.storage.StringConverter
s3.bucket.name=s3-sink-connector-bucket
path.format=YYYY/MM/dd
timestamp.extractor=RecordField
timestamp.field=after.createdAt
通过使用此连接器配置I t2主题 “创建At At createdat Five不存在” 。 如果我设置 timestamp.field = createat ,则为 t1主题丢弃了“创建att field” 。
我如何通过使用同一连接器对两个模式在两个模式中指出“创建” 字段?
是否可以使用单个S3-Sink连接器配置来实现此目标?
如果可能的情况是可能的,那么我该怎么做,我必须使用哪些属性来实现这一目标?
如果有人对此有所了解,请建议这样做。 如果还有其他方法可以做到这一点,请建议这样做。
schema for topic t1
{
"type": "record",
"name": "Envelope",
"namespace": "t1",
"fields": [
{
"name": "before",
"type": [
"null",
{
"type": "record",
"name": "Value",
"fields": [
{
"name": "id",
"type": {
"type": "long",
"connect.default": 0
},
"default": 0
},
{
"name": "createdAt",
"type": [
"null",
{
"type": "string",
"connect.version": 1,
"connect.name": "io.debezium.time.ZonedTimestamp"
}
],
"default": null
},
],
"connect.name": "t1.Value"
}
],
"default": null
},
{
"name": "after",
"type": [
"null",
"Value"
],
"default": null
}
],
"connect.name": "t1.Envelope"
}
schema for topic t2
{
"type": "record",
"name": "Value",
"namespace": "t2",
"fields": [
{
"name": "id",
"type": {
"type": "long",
"connect.default": 0
},
"default": 0
},
{
"name": "createdAt",
"type": [
"null",
{
"type": "string",
"connect.version": 1,
"connect.name": "io.debezium.time.ZonedTimestamp"
}
],
"default": null
}
],
"connect.name": "t2.Value"
}
s3-sink Connector configuration
connector.class=io.confluent.connect.s3.S3SinkConnector
behavior.on.null.values=ignore
s3.region=us-west-2
partition.duration.ms=1000
flush.size=1
tasks.max=3
timezone=UTC
topics.regex=t1,t2
aws.secret.access.key=******
locale=US
format.class=io.confluent.connect.s3.format.json.JsonFormat
partitioner.class=io.confluent.connect.storage.partitioner.TimeBasedPartitioner
value.converter.schemas.enable=false
name=s3-sink-connector
aws.access.key.id=******
errors.tolerance=all
value.converter=org.apache.kafka.connect.json.JsonConverter
storage.class=io.confluent.connect.s3.storage.S3Storage
key.converter=org.apache.kafka.connect.storage.StringConverter
s3.bucket.name=s3-sink-connector-bucket
path.format=YYYY/MM/dd
timestamp.extractor=RecordField
timestamp.field=after.createdAt
By using this connector configuration I got error for t2 topic that is "createdAt field does not exist".
If I set timestamp.field = createdAt then error is thrown for t1 topic "createdAt field does not exist".
How can I point "createdAt" field in both schemas at the same time by using same connector for both?
Is it possible to achieve this by using a single s3-sink connector configuration ?
If this scenario is possible then how can I do this, which properties I have to use for achieve this?
If anybody has idea about this, please suggest on this.
If there is any other way to do this then please suggest that way also.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
所有主题都需要相同的时间戳字段;无法配置主题到现场映射。
您的T2模式在字段之后没有
,因此您需要运行两个单独的连接器,
也需要在所有记录中都存在字段,否则分区者将无法使用。
All topics will need the same timestamp field; there's no way to configure topic-to-field mappings.
Your t2 schema doesn't have an
after
field, so you need to run two separate connectorsThe field is also required to be present in all records, otherwise the partitioner won't work.