如何从S3接收器中的信封类型架构中提取嵌套字段
AVRO架构:
{
"type": "record",
"name": "Envelope",
"namespace": "test",
"fields": [
{
"name": "before",
"type": [
"null",
{
"type": "record",
"name": "Value",
"fields": [
{
"name": "id",
"type": {
"type": "long",
"connect.default": 0
},
"default": 0
},
{
"name": "createdAt",
"type": [
"null",
{
"type": "string",
"connect.version": 1,
"connect.name": "io.debezium.time.ZonedTimestamp"
}
],
"default": null
},
],
"connect.name": "test.Value"
}
],
"default": null
},
{
"name": "after",
"type": [
"null",
"Value"
],
"default": null
}
],
"connect.name": "test.Envelope"
}
S3-Sink连接器配置:
{
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"behavior.on.null.values": "ignore",
"s3.region": "us-west-2",
"flush.size": "1",
"tasks.max": "3",
"timezone": "UTC",
"locale": "US",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"aws.access.key.id": "---",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"s3.bucket.name": "test-s3-sink-created-at-partition",
"partition.duration.ms": "1000",
"topics.regex": "test_topic",
"aws.secret.access.key": "---",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"value.converter.schemas.enable": "false",
"name": "s3-sink-created-at-partition",
"errors.tolerance": "all",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"path.format": "YYYY/MM/dd",
"timestamp.extractor": "RecordField",
"timestamp.field": "createdAt"
}
错误:
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:591)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:326)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:229)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:239)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.DataException: The field 'createdAt' does not exist in
Caused by: org.apache.kafka.connect.errors.DataException: Unable to find nested field 'createdAt'
问题面:
当前,我试图使用上述水槽连接器使用S3桶中的Createat字段从测试主题中获取数据,但它不断为CreationAt字段丢弃错误。 并未使用上述配置创建S3-Bucket。 请为此提供您的建议。
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
您应该能够使用
after.value.value.createDat
- 请参阅我的pr,但更好的选择是。
您需要提前创建存储桶。
You should be able to use
after.Value.createdAt
- See my PRBut the better option is to unwrap the envelope, like you're asking.
You need to create the bucket ahead of time.