Apache Flink-将流写入S3错误-NULL URI主机

发布于 2025-01-28 05:49:52 字数 2181 浏览 1 评论 0原文

我有一个flink数据管道,该管道将下载的日志文件从S3转换,并以Parquet文件格式写回到另一个S3存储桶中。我配置了S3键& flink-conf.yaml中的Secret in另外

s3.access-key: "key"
s3.secret-key: "secret"

复制了flink-s3-fs-hadoop-1.15.0.jar& AWS-JAVA-SDK-1.12.217.JAR to FLINK_HOME/plugins/s3-fs-presto目录。

当作业使用flink Run命令将作业提交到群集时,我得到以下例外情况,

Caused by: java.io.IOException: null uri host.
    at org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.create(AbstractS3FileSystemFactory.java:166)
    at org.apache.flink.core.fs.PluginFileSystemFactory.create(PluginFileSystemFactory.java:62)
    at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:508)
    at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:409)
    at org.apache.flink.connector.file.sink.FileSink$BulkFormatBuilder.createBucketWriter(FileSink.java:669)
    at org.apache.flink.connector.file.sink.FileSink$BulkFormatBuilder.getCommittableSerializer(FileSink.java:660)
    at org.apache.flink.connector.file.sink.FileSink.getCommittableSerializer(FileSink.java:175)
    ... 35 more
Caused by: java.lang.NullPointerException: null uri host.
    at java.base/java.util.Objects.requireNonNull(Objects.java:246)
    at org.apache.hadoop.fs.s3native.S3xLoginHelper.buildFSURI(S3xLoginHelper.java:73)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.setUri(S3AFileSystem.java:486)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:246)
    at org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.create(AbstractS3FileSystemFactory.java:127)
    ... 41 more

将数据写入S3的代码在下面,

public static FileSink<Recc> getFileSink() {
        Item<Recc> item = new Item<>(Recc.class);
        ParquetWriterFactory<Recc> factory = AvroParquetWriters.forReflectRecord(item.getT());
        Path path = new Path("s3a:", "log-backup/", "vvra/logs"); //s3 also tried
        return FileSink.forBulkFormat(
                path,
                factory)
                .build();
    }

我什至尝试了Minio,但我仍然会遇到相同的错误。如何解决这个问题?还必须配置什么?

I have a Flink data pipeline that transforms the log file downloaded from S3 and write back in parquet file format to another S3 bucket. I have configured the S3 key & secret in flink-conf.yaml with the

s3.access-key: "key"
s3.secret-key: "secret"

Additionally copied the flink-s3-fs-hadoop-1.15.0.jar & aws-java-sdk-1.12.217.jar to FLINK_HOME/plugins/s3-fs-presto directory.

When the job is submitted to the cluster using flink run command, I get the below exception

Caused by: java.io.IOException: null uri host.
    at org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.create(AbstractS3FileSystemFactory.java:166)
    at org.apache.flink.core.fs.PluginFileSystemFactory.create(PluginFileSystemFactory.java:62)
    at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:508)
    at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:409)
    at org.apache.flink.connector.file.sink.FileSink$BulkFormatBuilder.createBucketWriter(FileSink.java:669)
    at org.apache.flink.connector.file.sink.FileSink$BulkFormatBuilder.getCommittableSerializer(FileSink.java:660)
    at org.apache.flink.connector.file.sink.FileSink.getCommittableSerializer(FileSink.java:175)
    ... 35 more
Caused by: java.lang.NullPointerException: null uri host.
    at java.base/java.util.Objects.requireNonNull(Objects.java:246)
    at org.apache.hadoop.fs.s3native.S3xLoginHelper.buildFSURI(S3xLoginHelper.java:73)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.setUri(S3AFileSystem.java:486)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:246)
    at org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.create(AbstractS3FileSystemFactory.java:127)
    ... 41 more

The code to write the data to s3 is below

public static FileSink<Recc> getFileSink() {
        Item<Recc> item = new Item<>(Recc.class);
        ParquetWriterFactory<Recc> factory = AvroParquetWriters.forReflectRecord(item.getT());
        Path path = new Path("s3a:", "log-backup/", "vvra/logs"); //s3 also tried
        return FileSink.forBulkFormat(
                path,
                factory)
                .build();
    }

I even tried the minio but still I get the same error. How do resolve this? What more has to be configured?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

一场春暖 2025-02-04 05:49:52

问题是(COLON)。它没有清楚地记录下来

The problem was : (colon). It was not clearly documented

倾`听者〃 2025-02-04 05:49:52

您只需要在方案名称中删除“:”。作为java.net.uri的Javadocs状态:

...如果给出了一个方案,则将其附加到结果,遵循
由结肠角色(':')。 ...

org.apache.flink.core.fs.path依赖于java.net.uri

You just need to remove the colon ":" in the scheme name. As the JavaDocs of java.net.URI state:

... If a scheme is given then it is appended to the result, followed
by a colon character (':'). ...

org.apache.flink.core.fs.Path relies on java.net.URI under the hood.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文