Spark可以使用一个小收藏
我对Spark,其驱动程序和OOM问题有问题。
目前,我有一个数据框架,该数据框架正在使用多个加入的来源(实际上是镶木式格式的不同表),并且有成千上万个单元。他们的日期代表了记录的创建日期,显然是其中的日期。
我做以下操作:
from pyspark.sql.functions import year, month
# ...
selectionRows = inputDataframe.select(year('registration_date').alias('year'), month('registration_date').alias('month')).distinct()
selectionRows.show() # correctly shows 8 tuples
selectionRows = selectionRows.collect() # goes heap space OoM
print(selectionRows)
阅读内存消耗统计信息表明,驱动程序不超过约60%。我认为驱动程序应仅加载独特的子集,而不是整个数据框架。
我想念什么吗?是否可以以更聪明的方式收集这几行?我需要它们作为推高谓词来加载辅助数据框架。
非常感谢!
strong
阅读评论并详细说明我的个人需求后,我在每个“加入/详细”步骤中都捕获了数据
- 编辑/解决方案< /
- 框
- 应用缓存转换
- 打印计数以跟踪基数(主要是用于跟踪 /调试目的),因此应用所有转换 +缓存
- 不受欢迎的先前的浮雕步骤的缓存(如果可用)(tick / tock paradigm),
这降低了一些复杂的ETL作业降至原始时间的20%(如前所述,它在每个计数上都应用了上一步的转换)。
经验教训:)
I've got a problem with Spark, its driver and an OoM issue.
Currently I have a dataframe which is being built with several, joined sources (actually different tables in parquet format), and there are thousands of tuples. They have a date which represents the date of creation of the record, and distinctly they are a few.
I do the following:
from pyspark.sql.functions import year, month
# ...
selectionRows = inputDataframe.select(year('registration_date').alias('year'), month('registration_date').alias('month')).distinct()
selectionRows.show() # correctly shows 8 tuples
selectionRows = selectionRows.collect() # goes heap space OoM
print(selectionRows)
Reading the memory consumption statistics shows that the driver does not exceed ~60%. I thought that the driver should load only the distinct subset, not the entire dataframe.
Am I missing something? Is it possible to collect those few rows in a smarter way? I need them as a pushdown predicate to load a secondary dataframe.
Thank you very much!
EDIT / SOLUTION
After reading the comments and elaborating my personal needs, I cached the dataframe at every "join/elaborate" step, so that in a timeline I do the following:
- Join with loaded table
- Queue required transformations
- Apply the cache transformation
- Print the count to keep track of cardinality (mainly for tracking / debugging purposes) and thus apply all transformations + cache
- Unpersist the cache of the previous sibiling step, if available (tick/tock paradigm)
This reduced some complex ETL jobs down to 20% of the original time (as previously it was applying the transformations of each previous step at each count).
Lesson learned :)
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
阅读评论后,我详细介绍了用例。
如问题所述,我在“目标数据框架”中连接了几个表,在每次迭代中,我都会进行一些转换,例如:
慢速 / OOM的问题是,Spark被迫从由于结束<代码>计数< / code>,在每个表上开始完成,在每个表 /迭代时都会慢慢慢。
我发现的解决方案是在每次迭代中缓存数据框,就像这样:
After reading the comments, I elaborated the solution for my use case.
As mentioned in the question, I join several tables one with each other in a "target dataframe", and at each iteration I do some transformations, like so:
The problem of slowliness / OoM was that Spark was forced to do all the transformations from start to finish at each table due to the ending
count
, making it slower and slower at each table / iteration.The solution I found is to cache the dataframe at each iteration, like so: