AWS MSK KAFKA连接S3水槽连接器

发布于 2025-02-12 01:30:06 字数 2254 浏览 2 评论 0原文

我们正在使用S3接收器连接器将AWS MSK的S3数据汇入S3存储桶中。

我们已经在AWS EKS(Kubernetes)上部署了KAFKA S3接收器连接器

当我们启动连接器在S3存储桶上发生多部分时,

。我们对S3存储桶有策略限制,因为启用了服务器端加密(AWS-KMS),即我们无法在没有KMS密钥的情况下上传。

在配置下方,我们用于连接器和您的参考文献中的错误详细信息。

请帮助

{
    "name": "kc-s3-nuoAccountIDs-sink",
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "topics": "test-topic",
    "tasks.max": "1",
    "s3.bucket.name": "s3-bucket-name",
    "value.converter.schemas.enable": "false",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
    "partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
    "behavior.on.null.values": "ignore",
    "schema.compatibility": "NONE",
    "partition.duration.ms": 3600000,
    "path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
    "locale": "en-US",
    "timezone": "UTC",
    "partition.field.name": "accountplatform",
    "s3.region": "eu-west-2",
    "flush.size": 100000
}


kTask.execute(WorkerSinkTask.java:201)
  at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
  at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
  at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
  at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
  at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
  at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
  at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.connect.errors.ConnectException: Unable to initiate MultipartUpload
  at io.confluent.connect.s3.storage.S3OutputStream.newMultipartUpload(S3OutputStream.java:230)
  at io.confluent.connect.s3.storage.S3OutputStream.uploadPart(S3OutputStream.java:139)
  at io.confluent.connect.s3.storage.S3OutputStream.commit(S3OutputStream.java:165)
  at io.confluent.connect.s3.format.avro.AvroRecordWriterProvider$1.commit(AvroRecordWriterProvider.java:102)
  at io.confluent.connect.s3.format.KeyValueHeaderRecordWriterProvider$1.commit(KeyValueHeaderRecordWriterProvider.java:126)
at 

We are using S3 Sink connector to sink data in S3 bucket from our aws MSK.

we have deployed Kafka S3 Sink connector on AWS EKS(Kubernetes)

When we are starting the connector getting below errors when multipart upload is happening on S3 bucket.

We have policy restriction on S3 bucket as server side encryption(AWS-KMS) enabled i.e. we can not upload without having KMS key.

below configuration we are using for connector and below error details for your ref.

Kindly help

{
    "name": "kc-s3-nuoAccountIDs-sink",
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "topics": "test-topic",
    "tasks.max": "1",
    "s3.bucket.name": "s3-bucket-name",
    "value.converter.schemas.enable": "false",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
    "partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
    "behavior.on.null.values": "ignore",
    "schema.compatibility": "NONE",
    "partition.duration.ms": 3600000,
    "path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
    "locale": "en-US",
    "timezone": "UTC",
    "partition.field.name": "accountplatform",
    "s3.region": "eu-west-2",
    "flush.size": 100000
}


kTask.execute(WorkerSinkTask.java:201)
  at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
  at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
  at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
  at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
  at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
  at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
  at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.connect.errors.ConnectException: Unable to initiate MultipartUpload
  at io.confluent.connect.s3.storage.S3OutputStream.newMultipartUpload(S3OutputStream.java:230)
  at io.confluent.connect.s3.storage.S3OutputStream.uploadPart(S3OutputStream.java:139)
  at io.confluent.connect.s3.storage.S3OutputStream.commit(S3OutputStream.java:165)
  at io.confluent.connect.s3.format.avro.AvroRecordWriterProvider$1.commit(AvroRecordWriterProvider.java:102)
  at io.confluent.connect.s3.format.KeyValueHeaderRecordWriterProvider$1.commit(KeyValueHeaderRecordWriterProvider.java:126)
at 

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

不疑不惑不回忆 2025-02-19 01:30:06

在我们的情况下,我们需要通过S3连接器的KMS密钥设置。

基于上面提到的设置,我们使用了以下S3连接器配置中的两个附加设置:

"s3.sse.kms.key.id": "<kms-key-id-here>",
"s3.ssea.name": "aws:kms"

我们现在能够在S3存储桶中获取数据。

In our case we need to pass the KMS key settings for S3 Connector.

Based up on the official documentation, along with the above mentioned settings, we have used the below two additional settings in S3 connector configuration:

"s3.sse.kms.key.id": "<kms-key-id-here>",
"s3.ssea.name": "aws:kms"

We are able to get data in our s3 bucket now.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文