火花结构化流 - 检查点元数据无限期地生长

发布于 2025-01-25 03:31:17 字数 891 浏览 5 评论 0原文

我使用Spark Struture流媒体3.1.2。我需要使用S3来存储检查点元数据(我知道,它不是检查点元数据的最佳存储)。压实间隔为10(默认),我设置spark.sql.sql.streaming.minbatchestoretain = 5。当作业运行几周后,检查点时间大大增加(导致加工时Dalay几分钟)。我看了检查点元数据结构。那里有一条繁重的路径:检查点/源/0。单个.compact文件权重25GB。我研究了它的内容,并且包含自批次0以来的所有条目(当前批次约为25000)。

我尝试了一些参数来从紧凑文件中删除已经处理过的数据,即: spark.cleaner.referencetracking.cleancheckpoints = true - 不起作用。正如我在代码中看到的那样,它与以前版本的流媒体相关,不是吗? spark.sql.streaming.filesource.log.deletion = true and spark.sql.Streaming.filesink.log.deletion = true不起作用。

即使处理了所有数据(除了最近的检查点除外),紧凑型文件存储了完整的历史记录,因此我希望大多数条目都会被删除。是否有任何参数可以从紧凑文件中删除条目或不时优雅地删除紧凑型文件?

现在,我正在测试停止作业时,正在测试场景,删除大多数checkpoint/source/0/*文件,仅保留一些最近的检查点(未压实),然后重新运行工作。该作业从最近的检查点正确恢复。当涉及到检查点的压实时,它会失败,而缺少最近的压实文件。我可能需要编辑最新的紧凑型文件(而不是删除它),只保留少数最近的记录。看起来可能会解决我的问题,但是使用手动删除检查点文件的情况看起来很丑,因此我更喜欢Spark管理的东西。

I use spark struture streaming 3.1.2. I need to use s3 for storing checkpoint metadata (I know, it's not optimal storage for checkpoint metadata). Compaction interval is 10 (default) and I set spark.sql.streaming.minBatchesToRetain=5. When the job was running for a few weeks then checkpointing time increased significantly (cause a few minutes dalay on processing). I looked at checkpoint metadata structure. There is one heavy path there: checkpoint/source/0. Single .compact file weights 25GB. I looked into its content and it contains all entries since batch 0 (current batch is around 25000).

I tried a few parameters to remove already processed data from the compact file, namely:
spark.cleaner.referenceTracking.cleanCheckpoints=true - does not work. As I've seen in the code it's related to previous version of streaming, isn't it?
spark.sql.streaming.fileSource.log.deletion=true and spark.sql.streaming.fileSink.log.deletion=true doesn't work.

The compact file store full history even if all data were processed (except for the most recent checkpoint), so I expect most of entries would be deleted. Is there any parameter to remove entries from compact file or remove compact file gracefully from time to time?

Now I am testing scenario when I stop the job, delete most of checkpoint/source/0/* files, keeping just a few recent checkpoints (not compacted) and I rerun the job. The job recovers correctly from recent checkpoint. When it comes to compaction of checkpoint then it fails with missing recent compaction file. I would probably need to edit recent compact file (instead of deleting it) and keep only a few recent records there. It looks like possible workaround of my problem, but this scenario with manual delete of checkpoint files looks ugly, so I would prefer something managed by Spark.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

倾城花音 2025-02-01 03:31:17

对于后代:问题是fileStreamSourcelog类。我需要覆盖方法shordretain,默认情况下返回true,其文档说:

默认实现保留所有日志条目。实现应覆盖改变行为的方法。

For posterity: the problem was FileStreamSourceLog class. I needed to overwrite method shouldRetain, that by default returns true and its doc say:

Default implementation retains all log entries. Implementations should override the method to change the behavior.

人│生佛魔见 2025-02-01 03:31:17

Refer to databricks doc . Mainly .option('retention', retention) solved the very same issue.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文