创建作业时使用另一个文件系统配置
总结
我们目前面临 Flink 中文件系统抽象的问题。我们有一个可以动态连接到 S3 源的作业(意味着它是在运行时定义的)。 我们在代码中发现了一个错误,这可能是由于对文件系统工作方式的错误假设造成的。
Bug 解释
在作业初始化期间(因此在作业管理器中),我们操作 FS 来检查某些文件是否存在,以便在执行作业之前优雅地失败。 在我们的例子中,我们需要动态设置 FS。它可以是 HDFS、AWS 上的 S3 或 MinIO 上的 S3。 我们希望 FS 配置特定于作业,并且与集群配置不同(不同的访问密钥、不同的端点等)。
以下是我们用于执行此操作的代码摘录:
private void validateFileSystemAccess(Configuration configuration) throws IOException {
// Create a plugin manager from the configuration
PluginManager pluginManager = PluginUtils.createPluginManagerFromRootFolder(configuration);
// Init the FileSystem from the configuration
FileSystem.initialize(configuration, pluginManager);
// Validate the FileSystem: an exception is thrown if FS configuration is wrong
Path archiverPath = new Path(this.archiverPath);
archiverPath.getFileSystem().exists(new Path("/"));
}
开始该特定类型的作业后,我们注意到:
- 检查点不适用于该作业,它会引发凭据错误。
- 作业管理器无法上传历史服务器所需的所有已运行的所有作业(不仅是这种特定类型的作业)所需的工件。
如果我们不部署此类作业,则工件的上传和检查点将按预期在集群上工作。
我们认为这个问题可能来自于覆盖所有文件系统配置的FileSystem.initialize()
。我们认为,正因为如此,下一次调用 FileSystem.get()
将返回我们在 validateFileSystemAccess
中配置的文件系统,而不是集群配置的文件系统。
问题
我们的假设正确吗?如果是这样,我们如何在不影响整个集群的情况下为文件系统提供特定的配置?
Summary
We are currently facing an issue with the FileSystem abstraction in Flink. We have a job that can dynamically connect to an S3 source (meaning it's defined at runtime).
We discovered a bug in our code, and it could be due to a wrong assumption on the way the FileSystem works.
Bug explanation
During the initialization of the job, (so in the job manager) we manipulate the FS to check that some files exist in order to fail gracefully before the job is executed.
In our case, we need to set dynamically the FS. It can be either HDFS, S3 on AWS or S3 on MinIO.
We want the FS configuration to be specific for the job, and different from the cluster one (different access key, different endpoint, etc.).
Here is an extract of the code we are using to do so:
private void validateFileSystemAccess(Configuration configuration) throws IOException {
// Create a plugin manager from the configuration
PluginManager pluginManager = PluginUtils.createPluginManagerFromRootFolder(configuration);
// Init the FileSystem from the configuration
FileSystem.initialize(configuration, pluginManager);
// Validate the FileSystem: an exception is thrown if FS configuration is wrong
Path archiverPath = new Path(this.archiverPath);
archiverPath.getFileSystem().exists(new Path("/"));
}
After starting that specific kind of job, we notice that:
- the checkpointing does not work for this job, it throws a credential error.
- the job manager cannot upload the artifacts needed by the history server for all jobs already running of all kind (not only this specific kind of job).
If we do not deploy that kind of job, the upload of artifacts and the checkpointing work as expected on the cluster.
We think that this issue might come from the FileSystem.initialize()
that overrides the configuration for all the FileSystems. We think that because of this, the next call to FileSystem.get()
returns the FileSystem we configured in validateFileSystemAccess
instead of the cluster configured one.
Questions
Could our hypothesis be correct? If so, how could we provide a specific configuration for the FileSystem without impacting the whole cluster?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论