Kafka 的 Confluence S3 Sink 连接器无法使用 MSK / MSK Connect 连接到 S3
我正在尝试使用Apache MSK / MSK连接和Confluent S3接收器连接器将KAFKA消息存储到S3。
当连接器试图达到S3时,我会遇到超时错误。
为了排除权限问题,我赋予了连接器角色对S3的完全访问权限,但这还没有解决该问题。
The configuration I'm using is as follows:
connector.class=io.confluent.connect.s3.S3SinkConnector
format.class=io.confluent.connect.s3.format.json.JsonFormat
flush.size=3
tasks.max=1
name=kafka-connect-s3
storage.class=io.confluent.connect.s3.storage.S3Storage
topics.regex=.*
s3.bucket.name=######-kafka-messages
[Worker-0bfbcc480ad565df0] (io.confluent.connect.storage.partitioner.PartitionerConfig:361)
[Worker-0bfbcc480ad565df0] [2022-04-12 13:52:24,646] INFO [kafka-connect-s3|task-0] Returning new credentials provider based on the configured credentials provider class (io.confluent.connect.s3.storage.S3Storage:186)
[Worker-0bfbcc480ad565df0] [2022-04-12 13:53:05,352] ERROR [kafka-connect-s3|task-0] WorkerSinkTask{id=kafka-connect-s3-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:191)
[Worker-0bfbcc480ad565df0] org.apache.kafka.connect.errors.ConnectException: com.amazonaws.SdkClientException: Unable to execute HTTP request: Connect to s3.ap-southeast-2.amazonaws.com:443 [s3.ap-southeast-2.amazonaws.com/52.95.131.12] failed: connect timed out
[Worker-0bfbcc480ad565df0] at io.confluent.connect.s3.S3SinkTask.start(S3SinkTask.java:138)
[Worker-0bfbcc480ad565df0] at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:308)
[Worker-0bfbcc480ad565df0] at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:196)
[Worker-0bfbcc480ad565df0] at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
[Worker-0bfbcc480ad565df0] at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:238)
[Worker-0bfbcc480ad565df0] at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
[Worker-0bfbcc480ad565df0] at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
[Worker-0bfbcc480ad565df0] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
[Worker-0bfbcc480ad565df0] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
[Worker-0bfbcc480ad565df0] at java.base/java.lang.Thread.run(Thread.java:829)
[Worker-0bfbcc480ad565df0] Caused by: com.amazonaws.SdkClientException: Unable to execute HTTP request: Connect to s3.ap-southeast-2.amazonaws.com:443 [s3.ap-southeast-2.amazonaws.com/52.95.131.12] failed: connect timed out
[Worker-0bfbcc480ad565df0] at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1207)
[Worker-0bfbcc480ad565df0] at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1153)
[Worker-0bfbcc480ad565df0] at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802)
[Worker-0bfbcc480ad565df0] at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770)
[Worker-0bfbcc480ad565df0] at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744)
[Worker-0bfbcc480ad565df0] at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704)
[Worker-0bfbcc480ad565df0] at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686)
[Worker-0bfbcc480ad565df0] at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550)
[Worker-0bfbcc480ad565df0] at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530)
[Worker-0bfbcc480ad565df0] at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5445)
[Worker-0bfbcc480ad565df0] at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5392)
[Worker-0bfbcc480ad565df0] at com.amazonaws.services.s3.AmazonS3Client.getAcl(AmazonS3Client.java:4050)
[Worker-0bfbcc480ad565df0] at com.amazonaws.services.s3.AmazonS3Client.getBucketAcl(AmazonS3Client.java:1273)
[Worker-0bfbcc480ad565df0] at com.amazonaws.services.s3.AmazonS3Client.getBucketAcl(AmazonS3Client.java:1263)
[Worker-0bfbcc480ad565df0] at com.amazonaws.services.s3.AmazonS3Client.doesBucketExistV2(AmazonS3Client.java:1401)
[Worker-0bfbcc480ad565df0] at io.confluent.connect.s3.storage.S3Storage.bucketExists(S3Storage.java:197)
[Worker-0bfbcc480ad565df0] at io.confluent.connect.s3.S3SinkTask.start(S3SinkTask.java:112)
[Worker-0bfbcc480ad565df0] ... 9 more
[Worker-0bfbcc480ad565df0] Caused by: org.apache.http.conn.ConnectTimeoutException: Connect to s3.ap-southeast-2.amazonaws.com:443 [s3.ap-southeast-2.amazonaws.com/52.95.131.12] failed: connect timed out
[Worker-0bfbcc480ad565df0] at org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:151)
[Worker-0bfbcc480ad565df0] at org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:376)
[Worker-0bfbcc480ad565df0] at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
[Worker-0bfbcc480ad565df0] at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
[Worker-0bfbcc480ad565df0] at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
[Worker-0bfbcc480ad565df0] at java.base/java.lang.reflect.Method.invoke(Method.java:566)
[Worker-0bfbcc480ad565df0] at com.amazonaws.http.conn.ClientConnectionManagerFactory$Handler.invoke(ClientConnectionManagerFactory.java:76)
[Worker-0bfbcc480ad565df0] at com.amazonaws.http.conn.$Proxy47.connect(Unknown Source)
[Worker-0bfbcc480ad565df0] at org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:393)
[Worker-0bfbcc480ad565df0] at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:236)
[Worker-0bfbcc480ad565df0] at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186)
[Worker-0bfbcc480ad565df0] at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
[Worker-0bfbcc480ad565df0] at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
[Worker-0bfbcc480ad565df0] at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
[Worker-0bfbcc480ad565df0] at com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72)
[Worker-0bfbcc480ad565df0] at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1331)
[Worker-0bfbcc480ad565df0] at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145)
[Worker-0bfbcc480ad565df0] ... 24 more
[Worker-0bfbcc480ad565df0] Caused by: java.net.SocketTimeoutException: connect timed out
[Worker-0bfbcc480ad565df0] at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
[Worker-0bfbcc480ad565df0] at java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:412)
[Worker-0bfbcc480ad565df0] at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:255)
[Worker-0bfbcc480ad565df0] at java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:237)
[Worker-0bfbcc480ad565df0] at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
[Worker-0bfbcc480ad565df0] at java.base/java.net.Socket.connect(Socket.java:609)
[Worker-0bfbcc480ad565df0] at org.apache.http.conn.ssl.SSLConnectionSocketFactory.connectSocket(SSLConnectionSocketFactory.java:368)
[Worker-0bfbcc480ad565df0] at com.amazonaws.http.conn.ssl.SdkTLSSocketFactory.connectSocket(SdkTLSSocketFactory.java:142)
[Worker-0bfbcc480ad565df0] at org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:142)
[Worker-0bfbcc480ad565df0] ... 40 more
[Worker-0bfbcc480ad565df0] [2022-04-12 13:53:05,354] INFO [kafka-connect-s3|task-0] Metrics scheduler closed (org.apache.kafka.common.metrics.Metrics:668)
[Worker-0bfbcc480ad565df0] [2022-04-12 13:53:05,354] INFO [kafka-connect-s3|task-0] Closing reporter org.apache.kafka.common.metrics.JmxReporter (org.apache.kafka.common.metrics.Metrics:672)
[Worker-0bfbcc480ad565df0] [2022-04-12 13:53:05,354] INFO [kafka-connect-s3|task-0] Metrics reporters closed (org.apache.kafka.common.metrics.Metrics:678)
[Worker-0bfbcc480ad565df0] [2022-04-12 13:53:05,355] INFO [kafka-connect-s3|task-0] App info kafka.consumer for connector-consumer-kafka-connect-s3-0 unregistered (org.apache.kafka.common.utils.AppInfoParser:83)
What could be going wrong?子网可以访问Internet并附上Internet网关。网络ACL是默认设置。
I'm attempting to store my Kafka messages to S3 using Apache MSK / MSK Connect and the Confluent S3 Sink Connector.
I'm experiencing a timeout error when the connector tries to reach S3.
To rule out a permissions issue, I've given the connector role full access to S3, but that hasn't resolved the issue.
The configuration I'm using is as follows:
connector.class=io.confluent.connect.s3.S3SinkConnector
format.class=io.confluent.connect.s3.format.json.JsonFormat
flush.size=3
tasks.max=1
name=kafka-connect-s3
storage.class=io.confluent.connect.s3.storage.S3Storage
topics.regex=.*
s3.bucket.name=######-kafka-messages
[Worker-0bfbcc480ad565df0] (io.confluent.connect.storage.partitioner.PartitionerConfig:361)
[Worker-0bfbcc480ad565df0] [2022-04-12 13:52:24,646] INFO [kafka-connect-s3|task-0] Returning new credentials provider based on the configured credentials provider class (io.confluent.connect.s3.storage.S3Storage:186)
[Worker-0bfbcc480ad565df0] [2022-04-12 13:53:05,352] ERROR [kafka-connect-s3|task-0] WorkerSinkTask{id=kafka-connect-s3-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:191)
[Worker-0bfbcc480ad565df0] org.apache.kafka.connect.errors.ConnectException: com.amazonaws.SdkClientException: Unable to execute HTTP request: Connect to s3.ap-southeast-2.amazonaws.com:443 [s3.ap-southeast-2.amazonaws.com/52.95.131.12] failed: connect timed out
[Worker-0bfbcc480ad565df0] at io.confluent.connect.s3.S3SinkTask.start(S3SinkTask.java:138)
[Worker-0bfbcc480ad565df0] at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:308)
[Worker-0bfbcc480ad565df0] at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:196)
[Worker-0bfbcc480ad565df0] at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
[Worker-0bfbcc480ad565df0] at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:238)
[Worker-0bfbcc480ad565df0] at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
[Worker-0bfbcc480ad565df0] at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
[Worker-0bfbcc480ad565df0] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
[Worker-0bfbcc480ad565df0] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
[Worker-0bfbcc480ad565df0] at java.base/java.lang.Thread.run(Thread.java:829)
[Worker-0bfbcc480ad565df0] Caused by: com.amazonaws.SdkClientException: Unable to execute HTTP request: Connect to s3.ap-southeast-2.amazonaws.com:443 [s3.ap-southeast-2.amazonaws.com/52.95.131.12] failed: connect timed out
[Worker-0bfbcc480ad565df0] at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1207)
[Worker-0bfbcc480ad565df0] at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1153)
[Worker-0bfbcc480ad565df0] at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802)
[Worker-0bfbcc480ad565df0] at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770)
[Worker-0bfbcc480ad565df0] at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744)
[Worker-0bfbcc480ad565df0] at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704)
[Worker-0bfbcc480ad565df0] at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686)
[Worker-0bfbcc480ad565df0] at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550)
[Worker-0bfbcc480ad565df0] at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530)
[Worker-0bfbcc480ad565df0] at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5445)
[Worker-0bfbcc480ad565df0] at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5392)
[Worker-0bfbcc480ad565df0] at com.amazonaws.services.s3.AmazonS3Client.getAcl(AmazonS3Client.java:4050)
[Worker-0bfbcc480ad565df0] at com.amazonaws.services.s3.AmazonS3Client.getBucketAcl(AmazonS3Client.java:1273)
[Worker-0bfbcc480ad565df0] at com.amazonaws.services.s3.AmazonS3Client.getBucketAcl(AmazonS3Client.java:1263)
[Worker-0bfbcc480ad565df0] at com.amazonaws.services.s3.AmazonS3Client.doesBucketExistV2(AmazonS3Client.java:1401)
[Worker-0bfbcc480ad565df0] at io.confluent.connect.s3.storage.S3Storage.bucketExists(S3Storage.java:197)
[Worker-0bfbcc480ad565df0] at io.confluent.connect.s3.S3SinkTask.start(S3SinkTask.java:112)
[Worker-0bfbcc480ad565df0] ... 9 more
[Worker-0bfbcc480ad565df0] Caused by: org.apache.http.conn.ConnectTimeoutException: Connect to s3.ap-southeast-2.amazonaws.com:443 [s3.ap-southeast-2.amazonaws.com/52.95.131.12] failed: connect timed out
[Worker-0bfbcc480ad565df0] at org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:151)
[Worker-0bfbcc480ad565df0] at org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:376)
[Worker-0bfbcc480ad565df0] at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
[Worker-0bfbcc480ad565df0] at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
[Worker-0bfbcc480ad565df0] at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
[Worker-0bfbcc480ad565df0] at java.base/java.lang.reflect.Method.invoke(Method.java:566)
[Worker-0bfbcc480ad565df0] at com.amazonaws.http.conn.ClientConnectionManagerFactory$Handler.invoke(ClientConnectionManagerFactory.java:76)
[Worker-0bfbcc480ad565df0] at com.amazonaws.http.conn.$Proxy47.connect(Unknown Source)
[Worker-0bfbcc480ad565df0] at org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:393)
[Worker-0bfbcc480ad565df0] at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:236)
[Worker-0bfbcc480ad565df0] at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186)
[Worker-0bfbcc480ad565df0] at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
[Worker-0bfbcc480ad565df0] at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
[Worker-0bfbcc480ad565df0] at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
[Worker-0bfbcc480ad565df0] at com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72)
[Worker-0bfbcc480ad565df0] at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1331)
[Worker-0bfbcc480ad565df0] at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145)
[Worker-0bfbcc480ad565df0] ... 24 more
[Worker-0bfbcc480ad565df0] Caused by: java.net.SocketTimeoutException: connect timed out
[Worker-0bfbcc480ad565df0] at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
[Worker-0bfbcc480ad565df0] at java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:412)
[Worker-0bfbcc480ad565df0] at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:255)
[Worker-0bfbcc480ad565df0] at java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:237)
[Worker-0bfbcc480ad565df0] at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
[Worker-0bfbcc480ad565df0] at java.base/java.net.Socket.connect(Socket.java:609)
[Worker-0bfbcc480ad565df0] at org.apache.http.conn.ssl.SSLConnectionSocketFactory.connectSocket(SSLConnectionSocketFactory.java:368)
[Worker-0bfbcc480ad565df0] at com.amazonaws.http.conn.ssl.SdkTLSSocketFactory.connectSocket(SdkTLSSocketFactory.java:142)
[Worker-0bfbcc480ad565df0] at org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:142)
[Worker-0bfbcc480ad565df0] ... 40 more
[Worker-0bfbcc480ad565df0] [2022-04-12 13:53:05,354] INFO [kafka-connect-s3|task-0] Metrics scheduler closed (org.apache.kafka.common.metrics.Metrics:668)
[Worker-0bfbcc480ad565df0] [2022-04-12 13:53:05,354] INFO [kafka-connect-s3|task-0] Closing reporter org.apache.kafka.common.metrics.JmxReporter (org.apache.kafka.common.metrics.Metrics:672)
[Worker-0bfbcc480ad565df0] [2022-04-12 13:53:05,354] INFO [kafka-connect-s3|task-0] Metrics reporters closed (org.apache.kafka.common.metrics.Metrics:678)
[Worker-0bfbcc480ad565df0] [2022-04-12 13:53:05,355] INFO [kafka-connect-s3|task-0] App info kafka.consumer for connector-consumer-kafka-connect-s3-0 unregistered (org.apache.kafka.common.utils.AppInfoParser:83)
What could be going wrong? The subnets can access the Internet and have an Internet Gateway attached. The Network ACLs are the default settings.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
您需要按照 中所述为 S3 创建终端节点文档。然后检查您分配给连接器的安全组是否有权外出。例如,您可以设置出站规则:“所有流量/所有协议/所有端口”> 0.0.0.0/0
You need to create an endpoint for S3 as described in the documentation. Then check a secutiry group you assigned to the connector has permission to go outside. For example, you can set an outbound rule: "All trafic/All protocols/All ports" > 0.0.0.0/0