Apache Flink-将流写入S3错误-NULL URI主机
我有一个flink数据管道,该管道将下载的日志文件从S3转换,并以Parquet文件格式写回到另一个S3存储桶中。我配置了S3键& flink-conf.yaml
中的Secret in另外
s3.access-key: "key"
s3.secret-key: "secret"
复制了flink-s3-fs-hadoop-1.15.0.jar
& AWS-JAVA-SDK-1.12.217.JAR
to FLINK_HOME/plugins/s3-fs-presto
目录。
当作业使用flink Run
命令将作业提交到群集时,我得到以下例外情况,
Caused by: java.io.IOException: null uri host.
at org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.create(AbstractS3FileSystemFactory.java:166)
at org.apache.flink.core.fs.PluginFileSystemFactory.create(PluginFileSystemFactory.java:62)
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:508)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:409)
at org.apache.flink.connector.file.sink.FileSink$BulkFormatBuilder.createBucketWriter(FileSink.java:669)
at org.apache.flink.connector.file.sink.FileSink$BulkFormatBuilder.getCommittableSerializer(FileSink.java:660)
at org.apache.flink.connector.file.sink.FileSink.getCommittableSerializer(FileSink.java:175)
... 35 more
Caused by: java.lang.NullPointerException: null uri host.
at java.base/java.util.Objects.requireNonNull(Objects.java:246)
at org.apache.hadoop.fs.s3native.S3xLoginHelper.buildFSURI(S3xLoginHelper.java:73)
at org.apache.hadoop.fs.s3a.S3AFileSystem.setUri(S3AFileSystem.java:486)
at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:246)
at org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.create(AbstractS3FileSystemFactory.java:127)
... 41 more
将数据写入S3的代码在下面,
public static FileSink<Recc> getFileSink() {
Item<Recc> item = new Item<>(Recc.class);
ParquetWriterFactory<Recc> factory = AvroParquetWriters.forReflectRecord(item.getT());
Path path = new Path("s3a:", "log-backup/", "vvra/logs"); //s3 also tried
return FileSink.forBulkFormat(
path,
factory)
.build();
}
我什至尝试了Minio,但我仍然会遇到相同的错误。如何解决这个问题?还必须配置什么?
I have a Flink data pipeline that transforms the log file downloaded from S3 and write back in parquet file format to another S3 bucket. I have configured the S3 key & secret in flink-conf.yaml
with the
s3.access-key: "key"
s3.secret-key: "secret"
Additionally copied the flink-s3-fs-hadoop-1.15.0.jar
& aws-java-sdk-1.12.217.jar
to FLINK_HOME/plugins/s3-fs-presto
directory.
When the job is submitted to the cluster using flink run
command, I get the below exception
Caused by: java.io.IOException: null uri host.
at org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.create(AbstractS3FileSystemFactory.java:166)
at org.apache.flink.core.fs.PluginFileSystemFactory.create(PluginFileSystemFactory.java:62)
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:508)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:409)
at org.apache.flink.connector.file.sink.FileSink$BulkFormatBuilder.createBucketWriter(FileSink.java:669)
at org.apache.flink.connector.file.sink.FileSink$BulkFormatBuilder.getCommittableSerializer(FileSink.java:660)
at org.apache.flink.connector.file.sink.FileSink.getCommittableSerializer(FileSink.java:175)
... 35 more
Caused by: java.lang.NullPointerException: null uri host.
at java.base/java.util.Objects.requireNonNull(Objects.java:246)
at org.apache.hadoop.fs.s3native.S3xLoginHelper.buildFSURI(S3xLoginHelper.java:73)
at org.apache.hadoop.fs.s3a.S3AFileSystem.setUri(S3AFileSystem.java:486)
at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:246)
at org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.create(AbstractS3FileSystemFactory.java:127)
... 41 more
The code to write the data to s3 is below
public static FileSink<Recc> getFileSink() {
Item<Recc> item = new Item<>(Recc.class);
ParquetWriterFactory<Recc> factory = AvroParquetWriters.forReflectRecord(item.getT());
Path path = new Path("s3a:", "log-backup/", "vvra/logs"); //s3 also tried
return FileSink.forBulkFormat(
path,
factory)
.build();
}
I even tried the minio but still I get the same error. How do resolve this? What more has to be configured?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
问题是
:
(COLON)。它没有清楚地记录下来The problem was
:
(colon). It was not clearly documented您只需要在方案名称中删除“:”。作为
java.net.uri的Javadocs
状态:org.apache.flink.core.fs.path
依赖于java.net.uri
。You just need to remove the colon ":" in the scheme name. As the JavaDocs of
java.net.URI
state:org.apache.flink.core.fs.Path
relies onjava.net.URI
under the hood.