Apache Flink StreamingFileSink 在写入 S3 时发出多个 HEAD 请求,这会导致速率限制
我有一个 Apache Flink 应用程序,已部署在 Kinesis Data Analytics 上。
该应用程序从 Kafka 读取数据并将其写入 S3。它写入的 S3 存储桶结构是使用 BucketAssigner 计算的。BucketAssigner 的精简版本 这里
我遇到的问题是,假设我们必须写入这个目录结构: s3://myBucket/folder1/folder2/folder3/myFile.json
在发出 PUT
请求之前,它会发出以下 HEAD
请求:
HEAD /folder1
HEAD /folder1/folder2
HEAD /folder1/folder2/folder3/
然后它使PUT
请求。
它对每个请求都执行此操作,这导致 S3 速率限制,并在我的 FLink 应用程序中产生背压。
我发现有人对 BucketingSink 有类似的问题: https://lists.apache.org/thread/rbp2gdbxwdrk7zmvwhd2bw56mlwokpzz
提到的解决方案是切换到StreamingFileSink 这就是我正在做的事情。
关于如何在 StreamingFileSink 中解决此问题有什么想法吗?
我的 SinkConfig 如下:
StreamingFileSink
.forRowFormat(new Path(s3Bucket), new JsonEncoder<>())
.withBucketAssigner(bucketAssigner)
.withRollingPolicy(DefaultRollingPolicy.builder()
.withRolloverInterval(60000)
.build())
.build()
JsonEncoder 获取对象并将其转换为 json 并写出像 this
我已经在这个问题中描述了有关整个管道如何工作的更多细节(如果这有帮助的话):沉重的背压和巨大的检查点大小
I have an Apache Flink application that I have deployed on Kinesis Data analytics.
This application reads from Kafka and writes to S3. The S3 bucket structure it writes to is computed using a BucketAssigner.A stripped down version of the BucketAssigner here
The problem I have is, let us say we have to write to this directory structure: s3://myBucket/folder1/folder2/folder3/myFile.json
Before making the PUT
request, it makes a the following HEAD
requests:
HEAD /folder1
HEAD /folder1/folder2
HEAD /folder1/folder2/folder3/
And then it makes the PUT
request.
It is doing it for each and every request, which is causing S3 rate limiting and there by backpressure in my FLink application.
I found that someone had a similar issue with BucketingSink: https://lists.apache.org/thread/rbp2gdbxwdrk7zmvwhd2bw56mlwokpzz
The solution mentioned there was to switch to StreamingFileSink which is what I am doing .
Any ideas on how to fix this in StreamingFileSink?
My SinkConfig is as follows:
StreamingFileSink
.forRowFormat(new Path(s3Bucket), new JsonEncoder<>())
.withBucketAssigner(bucketAssigner)
.withRollingPolicy(DefaultRollingPolicy.builder()
.withRolloverInterval(60000)
.build())
.build()
The JsonEncoder takes the object and converts it to json and writes out bytes like this
I have described more details about how the whole pipeline works in this question if that helps in anyway: Heavy back pressure and huge checkpoint size
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
Hadoop S3 文件系统尝试模仿 S3 之上的文件系统。这意味着:
因此,Hadoop S3 文件系统具有非常高的“创建文件”延迟,并且很快就会达到请求速率限制(HEAD 请求S3 上的请求速率限制非常低)。因此,最好找到写入更少的不同文件的方法。
您还可以探索使用 熵注入。熵注入发生在文件系统级别,因此它应该与 FileSink 一起使用。但我不确定它将如何与接收器完成的分区/分桶交互,因此您可能会或可能不会发现它在实践中可用。如果您尝试过,请回来报告!
The Hadoop S3 file system tries to imitate a filesystem on top of S3. This means that:
As a result, the Hadoop S3 file system has very high "create file" latency and it hits request rate limits very quickly (HEAD requests have very low request rate limits on S3). As a consequence, it's best to find ways to write to fewer distinct files.
You might also explore using entropy injection. Entropy injection is happening at the file system level, so it should work with the FileSink. Except I'm not sure how it will interact with the partitioning/bucketing being done by the sink, so you may or may not find it useable in practice. If you try it, please report back!