火花结构化流 - 检查点元数据无限期地生长
我使用Spark Struture流媒体3.1.2。我需要使用S3来存储检查点元数据(我知道,它不是检查点元数据的最佳存储)。压实间隔为10(默认),我设置spark.sql.sql.streaming.minbatchestoretain = 5
。当作业运行几周后,检查点时间大大增加(导致加工时Dalay几分钟)。我看了检查点元数据结构。那里有一条繁重的路径:检查点/源/0
。单个.compact文件权重25GB。我研究了它的内容,并且包含自批次0以来的所有条目(当前批次约为25000)。
我尝试了一些参数来从紧凑文件中删除已经处理过的数据,即: spark.cleaner.referencetracking.cleancheckpoints = true
- 不起作用。正如我在代码中看到的那样,它与以前版本的流媒体相关,不是吗? spark.sql.streaming.filesource.log.deletion = true
and spark.sql.Streaming.filesink.log.deletion = true
不起作用。
即使处理了所有数据(除了最近的检查点除外),紧凑型文件存储了完整的历史记录,因此我希望大多数条目都会被删除。是否有任何参数可以从紧凑文件中删除条目或不时优雅地删除紧凑型文件?
现在,我正在测试停止作业时,正在测试场景,删除大多数checkpoint/source/0/*
文件,仅保留一些最近的检查点(未压实),然后重新运行工作。该作业从最近的检查点正确恢复。当涉及到检查点的压实时,它会失败,而缺少最近的压实文件。我可能需要编辑最新的紧凑型文件(而不是删除它),只保留少数最近的记录。看起来可能会解决我的问题,但是使用手动删除检查点文件的情况看起来很丑,因此我更喜欢Spark管理的东西。
I use spark struture streaming 3.1.2. I need to use s3 for storing checkpoint metadata (I know, it's not optimal storage for checkpoint metadata). Compaction interval is 10 (default) and I set spark.sql.streaming.minBatchesToRetain=5
. When the job was running for a few weeks then checkpointing time increased significantly (cause a few minutes dalay on processing). I looked at checkpoint metadata structure. There is one heavy path there: checkpoint/source/0
. Single .compact file weights 25GB. I looked into its content and it contains all entries since batch 0 (current batch is around 25000).
I tried a few parameters to remove already processed data from the compact file, namely:spark.cleaner.referenceTracking.cleanCheckpoints=true
- does not work. As I've seen in the code it's related to previous version of streaming, isn't it?spark.sql.streaming.fileSource.log.deletion=true
and spark.sql.streaming.fileSink.log.deletion=true
doesn't work.
The compact file store full history even if all data were processed (except for the most recent checkpoint), so I expect most of entries would be deleted. Is there any parameter to remove entries from compact file or remove compact file gracefully from time to time?
Now I am testing scenario when I stop the job, delete most of checkpoint/source/0/*
files, keeping just a few recent checkpoints (not compacted) and I rerun the job. The job recovers correctly from recent checkpoint. When it comes to compaction of checkpoint then it fails with missing recent compaction file. I would probably need to edit recent compact file (instead of deleting it) and keep only a few recent records there. It looks like possible workaround of my problem, but this scenario with manual delete of checkpoint files looks ugly, so I would prefer something managed by Spark.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
对于后代:问题是
fileStreamSourcelog
类。我需要覆盖方法shordretain
,默认情况下返回true,其文档说:For posterity: the problem was
FileStreamSourceLog
class. I needed to overwrite methodshouldRetain
, that by default returns true and its doc say:请参阅 。主要的option(“保留”,保留)解决了同样的问题。
Refer to databricks doc . Mainly .option('retention', retention) solved the very same issue.