Apache Flink StreamingFileSink 在写入 S3 时发出多个 HEAD 请求,这会导致速率限制

发布于 2025-01-14 15:42:15 字数 1589 浏览 6 评论 0原文

我有一个 Apache Flink 应用程序,已部署在 Kinesis Data Analytics 上。

该应用程序从 Kafka 读取数据并将其写入 S3。它写入的 S3 存储桶结构是使用 BucketAssigner 计算的。BucketAssigner 的精简版本 这里

我遇到的问题是,假设我们必须写入这个目录结构: s3://myBucket/folder1/folder2/folder3/myFile.json

在发出 PUT 请求之前,它会发出以下 HEAD 请求:

  • HEAD /folder1
  • HEAD /folder1/folder2
  • HEAD /folder1/folder2/folder3/

然后它使PUT 请求。

它对每个请求都执行此操作,这导致 S3 速率限制,并在我的 FLink 应用程序中产生背压。

我发现有人对 BucketingSink 有类似的问题: https://lists.apache.org/thread/rbp2gdbxwdrk7zmvwhd2bw56mlwokpzz

提到的解决方案是切换到StreamingFileSink 这就是我正在做的事情。

关于如何在 StreamingFileSink 中解决此问题有什么想法吗?

我的 SinkConfig 如下:

StreamingFileSink
  .forRowFormat(new Path(s3Bucket), new JsonEncoder<>())
  .withBucketAssigner(bucketAssigner)
  .withRollingPolicy(DefaultRollingPolicy.builder()
                .withRolloverInterval(60000)
                .build())
  .build()

JsonEncoder 获取对象并将其转换为 json 并写出像 this

我已经在这个问题中描述了有关整个管道如何工作的更多细节(如果这有帮助的话):沉重的背压和巨大的检查点大小

I have an Apache Flink application that I have deployed on Kinesis Data analytics.

This application reads from Kafka and writes to S3. The S3 bucket structure it writes to is computed using a BucketAssigner.A stripped down version of the BucketAssigner here

The problem I have is, let us say we have to write to this directory structure: s3://myBucket/folder1/folder2/folder3/myFile.json

Before making the PUT request, it makes a the following HEAD requests:

  • HEAD /folder1
  • HEAD /folder1/folder2
  • HEAD /folder1/folder2/folder3/

And then it makes the PUT request.

It is doing it for each and every request, which is causing S3 rate limiting and there by backpressure in my FLink application.

I found that someone had a similar issue with BucketingSink: https://lists.apache.org/thread/rbp2gdbxwdrk7zmvwhd2bw56mlwokpzz

The solution mentioned there was to switch to StreamingFileSink which is what I am doing .

Any ideas on how to fix this in StreamingFileSink?

My SinkConfig is as follows:

StreamingFileSink
  .forRowFormat(new Path(s3Bucket), new JsonEncoder<>())
  .withBucketAssigner(bucketAssigner)
  .withRollingPolicy(DefaultRollingPolicy.builder()
                .withRolloverInterval(60000)
                .build())
  .build()

The JsonEncoder takes the object and converts it to json and writes out bytes like this

I have described more details about how the whole pipeline works in this question if that helps in anyway: Heavy back pressure and huge checkpoint size

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

萧瑟寒风 2025-01-21 15:42:15

Hadoop S3 文件系统尝试模仿 S3 之上的文件系统。这意味着:

  • 在写入密钥之前,它会通过检查前缀到最后一个“/”的密钥来检查“父目录”是否存在,
  • 它会创建空标记文件来标记此类父目录的存在
  • 所有这些“存在” “请求是 S3 HEAD 请求,既昂贵又开始违反一致的创建后读取可见性。

因此,Hadoop S3 文件系统具有非常高的“创建文件”延迟,并且很快就会达到请求速率限制(HEAD 请求S3 上的请求速率限制非常低)。因此,最好找到写入更少的不同文件的方法。

您还可以探索使用 熵注入。熵注入发生在文件系统级别,因此它应该与 FileSink 一起使用。但我不确定它将如何与接收器完成的分区/分桶交互,因此您可能会或可能不会发现它在实践中可用。如果您尝试过,请回来报告!

The Hadoop S3 file system tries to imitate a filesystem on top of S3. This means that:

  • before writing a key it checks if the "parent directory" exists by checking for a key with the prefix up to the last "/"
  • it creates empty marker files to mark the existence of such a parent directory
  • all these "existence" requests are S3 HEAD requests which are both expensive and start to violate consistent read-after-create visibility

As a result, the Hadoop S3 file system has very high "create file" latency and it hits request rate limits very quickly (HEAD requests have very low request rate limits on S3). As a consequence, it's best to find ways to write to fewer distinct files.

You might also explore using entropy injection. Entropy injection is happening at the file system level, so it should work with the FileSink. Except I'm not sure how it will interact with the partitioning/bucketing being done by the sink, so you may or may not find it useable in practice. If you try it, please report back!

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文