Kafka 的 Confluence S3 Sink 连接器无法使用 MSK / MSK Connect 连接到 S3

发布于 2025-01-20 19:37:59 字数 8874 浏览 4 评论 0原文

我正在尝试使用Apache MSK / MSK连接和Confluent S3接收器连接器将KAFKA消息存储到S3。

当连接器试图达到S3时,我会遇到超时错误。

为了排除权限问题,我赋予了连接器角色对S3的完全访问权限,但这还没有解决该问题。

The configuration I'm using is as follows:

connector.class=io.confluent.connect.s3.S3SinkConnector
format.class=io.confluent.connect.s3.format.json.JsonFormat
flush.size=3
tasks.max=1
name=kafka-connect-s3
storage.class=io.confluent.connect.s3.storage.S3Storage
topics.regex=.*
s3.bucket.name=######-kafka-messages
[Worker-0bfbcc480ad565df0]  (io.confluent.connect.storage.partitioner.PartitionerConfig:361)
[Worker-0bfbcc480ad565df0] [2022-04-12 13:52:24,646] INFO [kafka-connect-s3|task-0] Returning new credentials provider based on the configured credentials provider class (io.confluent.connect.s3.storage.S3Storage:186)
[Worker-0bfbcc480ad565df0] [2022-04-12 13:53:05,352] ERROR [kafka-connect-s3|task-0] WorkerSinkTask{id=kafka-connect-s3-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:191)
[Worker-0bfbcc480ad565df0] org.apache.kafka.connect.errors.ConnectException: com.amazonaws.SdkClientException: Unable to execute HTTP request: Connect to s3.ap-southeast-2.amazonaws.com:443 [s3.ap-southeast-2.amazonaws.com/52.95.131.12] failed: connect timed out
[Worker-0bfbcc480ad565df0]  at io.confluent.connect.s3.S3SinkTask.start(S3SinkTask.java:138)
[Worker-0bfbcc480ad565df0]  at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:308)
[Worker-0bfbcc480ad565df0]  at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:196)
[Worker-0bfbcc480ad565df0]  at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
[Worker-0bfbcc480ad565df0]  at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:238)
[Worker-0bfbcc480ad565df0]  at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
[Worker-0bfbcc480ad565df0]  at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
[Worker-0bfbcc480ad565df0]  at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
[Worker-0bfbcc480ad565df0]  at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
[Worker-0bfbcc480ad565df0]  at java.base/java.lang.Thread.run(Thread.java:829)
[Worker-0bfbcc480ad565df0] Caused by: com.amazonaws.SdkClientException: Unable to execute HTTP request: Connect to s3.ap-southeast-2.amazonaws.com:443 [s3.ap-southeast-2.amazonaws.com/52.95.131.12] failed: connect timed out
[Worker-0bfbcc480ad565df0]  at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1207)
[Worker-0bfbcc480ad565df0]  at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1153)
[Worker-0bfbcc480ad565df0]  at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802)
[Worker-0bfbcc480ad565df0]  at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770)
[Worker-0bfbcc480ad565df0]  at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744)
[Worker-0bfbcc480ad565df0]  at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704)
[Worker-0bfbcc480ad565df0]  at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686)
[Worker-0bfbcc480ad565df0]  at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550)
[Worker-0bfbcc480ad565df0]  at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530)
[Worker-0bfbcc480ad565df0]  at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5445)
[Worker-0bfbcc480ad565df0]  at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5392)
[Worker-0bfbcc480ad565df0]  at com.amazonaws.services.s3.AmazonS3Client.getAcl(AmazonS3Client.java:4050)
[Worker-0bfbcc480ad565df0]  at com.amazonaws.services.s3.AmazonS3Client.getBucketAcl(AmazonS3Client.java:1273)
[Worker-0bfbcc480ad565df0]  at com.amazonaws.services.s3.AmazonS3Client.getBucketAcl(AmazonS3Client.java:1263)
[Worker-0bfbcc480ad565df0]  at com.amazonaws.services.s3.AmazonS3Client.doesBucketExistV2(AmazonS3Client.java:1401)
[Worker-0bfbcc480ad565df0]  at io.confluent.connect.s3.storage.S3Storage.bucketExists(S3Storage.java:197)
[Worker-0bfbcc480ad565df0]  at io.confluent.connect.s3.S3SinkTask.start(S3SinkTask.java:112)
[Worker-0bfbcc480ad565df0]  ... 9 more
[Worker-0bfbcc480ad565df0] Caused by: org.apache.http.conn.ConnectTimeoutException: Connect to s3.ap-southeast-2.amazonaws.com:443 [s3.ap-southeast-2.amazonaws.com/52.95.131.12] failed: connect timed out
[Worker-0bfbcc480ad565df0]  at org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:151)
[Worker-0bfbcc480ad565df0]  at org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:376)
[Worker-0bfbcc480ad565df0]  at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
[Worker-0bfbcc480ad565df0]  at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
[Worker-0bfbcc480ad565df0]  at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
[Worker-0bfbcc480ad565df0]  at java.base/java.lang.reflect.Method.invoke(Method.java:566)
[Worker-0bfbcc480ad565df0]  at com.amazonaws.http.conn.ClientConnectionManagerFactory$Handler.invoke(ClientConnectionManagerFactory.java:76)
[Worker-0bfbcc480ad565df0]  at com.amazonaws.http.conn.$Proxy47.connect(Unknown Source)
[Worker-0bfbcc480ad565df0]  at org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:393)
[Worker-0bfbcc480ad565df0]  at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:236)
[Worker-0bfbcc480ad565df0]  at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186)
[Worker-0bfbcc480ad565df0]  at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
[Worker-0bfbcc480ad565df0]  at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
[Worker-0bfbcc480ad565df0]  at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
[Worker-0bfbcc480ad565df0]  at com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72)
[Worker-0bfbcc480ad565df0]  at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1331)
[Worker-0bfbcc480ad565df0]  at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145)
[Worker-0bfbcc480ad565df0]  ... 24 more
[Worker-0bfbcc480ad565df0] Caused by: java.net.SocketTimeoutException: connect timed out
[Worker-0bfbcc480ad565df0]  at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
[Worker-0bfbcc480ad565df0]  at java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:412)
[Worker-0bfbcc480ad565df0]  at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:255)
[Worker-0bfbcc480ad565df0]  at java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:237)
[Worker-0bfbcc480ad565df0]  at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
[Worker-0bfbcc480ad565df0]  at java.base/java.net.Socket.connect(Socket.java:609)
[Worker-0bfbcc480ad565df0]  at org.apache.http.conn.ssl.SSLConnectionSocketFactory.connectSocket(SSLConnectionSocketFactory.java:368)
[Worker-0bfbcc480ad565df0]  at com.amazonaws.http.conn.ssl.SdkTLSSocketFactory.connectSocket(SdkTLSSocketFactory.java:142)
[Worker-0bfbcc480ad565df0]  at org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:142)
[Worker-0bfbcc480ad565df0]  ... 40 more
[Worker-0bfbcc480ad565df0] [2022-04-12 13:53:05,354] INFO [kafka-connect-s3|task-0] Metrics scheduler closed (org.apache.kafka.common.metrics.Metrics:668)
[Worker-0bfbcc480ad565df0] [2022-04-12 13:53:05,354] INFO [kafka-connect-s3|task-0] Closing reporter org.apache.kafka.common.metrics.JmxReporter (org.apache.kafka.common.metrics.Metrics:672)
[Worker-0bfbcc480ad565df0] [2022-04-12 13:53:05,354] INFO [kafka-connect-s3|task-0] Metrics reporters closed (org.apache.kafka.common.metrics.Metrics:678)
[Worker-0bfbcc480ad565df0] [2022-04-12 13:53:05,355] INFO [kafka-connect-s3|task-0] App info kafka.consumer for connector-consumer-kafka-connect-s3-0 unregistered (org.apache.kafka.common.utils.AppInfoParser:83)

What could be going wrong?子网可以访问Internet并附上Internet网关。网络ACL是默认设置。

I'm attempting to store my Kafka messages to S3 using Apache MSK / MSK Connect and the Confluent S3 Sink Connector.

I'm experiencing a timeout error when the connector tries to reach S3.

To rule out a permissions issue, I've given the connector role full access to S3, but that hasn't resolved the issue.

The configuration I'm using is as follows:

connector.class=io.confluent.connect.s3.S3SinkConnector
format.class=io.confluent.connect.s3.format.json.JsonFormat
flush.size=3
tasks.max=1
name=kafka-connect-s3
storage.class=io.confluent.connect.s3.storage.S3Storage
topics.regex=.*
s3.bucket.name=######-kafka-messages
[Worker-0bfbcc480ad565df0]  (io.confluent.connect.storage.partitioner.PartitionerConfig:361)
[Worker-0bfbcc480ad565df0] [2022-04-12 13:52:24,646] INFO [kafka-connect-s3|task-0] Returning new credentials provider based on the configured credentials provider class (io.confluent.connect.s3.storage.S3Storage:186)
[Worker-0bfbcc480ad565df0] [2022-04-12 13:53:05,352] ERROR [kafka-connect-s3|task-0] WorkerSinkTask{id=kafka-connect-s3-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:191)
[Worker-0bfbcc480ad565df0] org.apache.kafka.connect.errors.ConnectException: com.amazonaws.SdkClientException: Unable to execute HTTP request: Connect to s3.ap-southeast-2.amazonaws.com:443 [s3.ap-southeast-2.amazonaws.com/52.95.131.12] failed: connect timed out
[Worker-0bfbcc480ad565df0]  at io.confluent.connect.s3.S3SinkTask.start(S3SinkTask.java:138)
[Worker-0bfbcc480ad565df0]  at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:308)
[Worker-0bfbcc480ad565df0]  at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:196)
[Worker-0bfbcc480ad565df0]  at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
[Worker-0bfbcc480ad565df0]  at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:238)
[Worker-0bfbcc480ad565df0]  at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
[Worker-0bfbcc480ad565df0]  at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
[Worker-0bfbcc480ad565df0]  at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
[Worker-0bfbcc480ad565df0]  at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
[Worker-0bfbcc480ad565df0]  at java.base/java.lang.Thread.run(Thread.java:829)
[Worker-0bfbcc480ad565df0] Caused by: com.amazonaws.SdkClientException: Unable to execute HTTP request: Connect to s3.ap-southeast-2.amazonaws.com:443 [s3.ap-southeast-2.amazonaws.com/52.95.131.12] failed: connect timed out
[Worker-0bfbcc480ad565df0]  at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1207)
[Worker-0bfbcc480ad565df0]  at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1153)
[Worker-0bfbcc480ad565df0]  at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802)
[Worker-0bfbcc480ad565df0]  at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770)
[Worker-0bfbcc480ad565df0]  at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744)
[Worker-0bfbcc480ad565df0]  at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704)
[Worker-0bfbcc480ad565df0]  at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686)
[Worker-0bfbcc480ad565df0]  at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550)
[Worker-0bfbcc480ad565df0]  at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530)
[Worker-0bfbcc480ad565df0]  at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5445)
[Worker-0bfbcc480ad565df0]  at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5392)
[Worker-0bfbcc480ad565df0]  at com.amazonaws.services.s3.AmazonS3Client.getAcl(AmazonS3Client.java:4050)
[Worker-0bfbcc480ad565df0]  at com.amazonaws.services.s3.AmazonS3Client.getBucketAcl(AmazonS3Client.java:1273)
[Worker-0bfbcc480ad565df0]  at com.amazonaws.services.s3.AmazonS3Client.getBucketAcl(AmazonS3Client.java:1263)
[Worker-0bfbcc480ad565df0]  at com.amazonaws.services.s3.AmazonS3Client.doesBucketExistV2(AmazonS3Client.java:1401)
[Worker-0bfbcc480ad565df0]  at io.confluent.connect.s3.storage.S3Storage.bucketExists(S3Storage.java:197)
[Worker-0bfbcc480ad565df0]  at io.confluent.connect.s3.S3SinkTask.start(S3SinkTask.java:112)
[Worker-0bfbcc480ad565df0]  ... 9 more
[Worker-0bfbcc480ad565df0] Caused by: org.apache.http.conn.ConnectTimeoutException: Connect to s3.ap-southeast-2.amazonaws.com:443 [s3.ap-southeast-2.amazonaws.com/52.95.131.12] failed: connect timed out
[Worker-0bfbcc480ad565df0]  at org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:151)
[Worker-0bfbcc480ad565df0]  at org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:376)
[Worker-0bfbcc480ad565df0]  at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
[Worker-0bfbcc480ad565df0]  at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
[Worker-0bfbcc480ad565df0]  at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
[Worker-0bfbcc480ad565df0]  at java.base/java.lang.reflect.Method.invoke(Method.java:566)
[Worker-0bfbcc480ad565df0]  at com.amazonaws.http.conn.ClientConnectionManagerFactory$Handler.invoke(ClientConnectionManagerFactory.java:76)
[Worker-0bfbcc480ad565df0]  at com.amazonaws.http.conn.$Proxy47.connect(Unknown Source)
[Worker-0bfbcc480ad565df0]  at org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:393)
[Worker-0bfbcc480ad565df0]  at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:236)
[Worker-0bfbcc480ad565df0]  at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186)
[Worker-0bfbcc480ad565df0]  at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
[Worker-0bfbcc480ad565df0]  at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
[Worker-0bfbcc480ad565df0]  at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
[Worker-0bfbcc480ad565df0]  at com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72)
[Worker-0bfbcc480ad565df0]  at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1331)
[Worker-0bfbcc480ad565df0]  at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145)
[Worker-0bfbcc480ad565df0]  ... 24 more
[Worker-0bfbcc480ad565df0] Caused by: java.net.SocketTimeoutException: connect timed out
[Worker-0bfbcc480ad565df0]  at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
[Worker-0bfbcc480ad565df0]  at java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:412)
[Worker-0bfbcc480ad565df0]  at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:255)
[Worker-0bfbcc480ad565df0]  at java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:237)
[Worker-0bfbcc480ad565df0]  at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
[Worker-0bfbcc480ad565df0]  at java.base/java.net.Socket.connect(Socket.java:609)
[Worker-0bfbcc480ad565df0]  at org.apache.http.conn.ssl.SSLConnectionSocketFactory.connectSocket(SSLConnectionSocketFactory.java:368)
[Worker-0bfbcc480ad565df0]  at com.amazonaws.http.conn.ssl.SdkTLSSocketFactory.connectSocket(SdkTLSSocketFactory.java:142)
[Worker-0bfbcc480ad565df0]  at org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:142)
[Worker-0bfbcc480ad565df0]  ... 40 more
[Worker-0bfbcc480ad565df0] [2022-04-12 13:53:05,354] INFO [kafka-connect-s3|task-0] Metrics scheduler closed (org.apache.kafka.common.metrics.Metrics:668)
[Worker-0bfbcc480ad565df0] [2022-04-12 13:53:05,354] INFO [kafka-connect-s3|task-0] Closing reporter org.apache.kafka.common.metrics.JmxReporter (org.apache.kafka.common.metrics.Metrics:672)
[Worker-0bfbcc480ad565df0] [2022-04-12 13:53:05,354] INFO [kafka-connect-s3|task-0] Metrics reporters closed (org.apache.kafka.common.metrics.Metrics:678)
[Worker-0bfbcc480ad565df0] [2022-04-12 13:53:05,355] INFO [kafka-connect-s3|task-0] App info kafka.consumer for connector-consumer-kafka-connect-s3-0 unregistered (org.apache.kafka.common.utils.AppInfoParser:83)

What could be going wrong? The subnets can access the Internet and have an Internet Gateway attached. The Network ACLs are the default settings.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

无尽的现实 2025-01-27 19:37:59

您需要按照 中所述为 S3 创建终端节点文档。然后检查您分配给连接器的安全组是否有权外出。例如,您可以设置出站规则:“所有流量/所有协议/所有端口”> 0.0.0.0/0

You need to create an endpoint for S3 as described in the documentation. Then check a secutiry group you assigned to the connector has permission to go outside. For example, you can set an outbound rule: "All trafic/All protocols/All ports" > 0.0.0.0/0

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文