Spark可以使用一个小收藏

发布于 2025-01-30 13:32:12 字数 951 浏览 1 评论 0原文

我对Spark,其驱动程序和OOM问题有问题。

目前,我有一个数据框架,该数据框架正在使用多个加入的来源(实际上是镶木式格式的不同表),并且有成千上万个单元。他们的日期代表了记录的创建日期,显然是其中的日期。

我做以下操作:

from pyspark.sql.functions import year, month

# ...

selectionRows = inputDataframe.select(year('registration_date').alias('year'), month('registration_date').alias('month')).distinct()
selectionRows.show() # correctly shows 8 tuples
selectionRows = selectionRows.collect() # goes heap space OoM
print(selectionRows)

阅读内存消耗统计信息表明,驱动程序不超过约60%。我认为驱动程序应仅加载独特的子集,而不是整个数据框架。

我想念什么吗?是否可以以更聪明的方式收集这几行?我需要它们作为推高谓词来加载辅助数据框架。

非常感谢!

strong

阅读评论并详细说明我的个人需求后,我在每个“加入/详细”步骤中都捕获了数据

  • 编辑/解决方案< /
  • 应用缓存转换
  • 打印计数以跟踪基数(主要是用于跟踪 /调试目的),因此应用所有转换 +缓存
  • 不受欢迎的先前的浮雕步骤的缓存(如果可用)(tick / tock paradigm),

这降低了一些复杂的ETL作业降至原始时间的20%(如前所述,它在每个计数上都应用了上一步的转换)。

经验教训:)

I've got a problem with Spark, its driver and an OoM issue.

Currently I have a dataframe which is being built with several, joined sources (actually different tables in parquet format), and there are thousands of tuples. They have a date which represents the date of creation of the record, and distinctly they are a few.

I do the following:

from pyspark.sql.functions import year, month

# ...

selectionRows = inputDataframe.select(year('registration_date').alias('year'), month('registration_date').alias('month')).distinct()
selectionRows.show() # correctly shows 8 tuples
selectionRows = selectionRows.collect() # goes heap space OoM
print(selectionRows)

Reading the memory consumption statistics shows that the driver does not exceed ~60%. I thought that the driver should load only the distinct subset, not the entire dataframe.

Am I missing something? Is it possible to collect those few rows in a smarter way? I need them as a pushdown predicate to load a secondary dataframe.

Thank you very much!

EDIT / SOLUTION

After reading the comments and elaborating my personal needs, I cached the dataframe at every "join/elaborate" step, so that in a timeline I do the following:

  • Join with loaded table
  • Queue required transformations
  • Apply the cache transformation
  • Print the count to keep track of cardinality (mainly for tracking / debugging purposes) and thus apply all transformations + cache
  • Unpersist the cache of the previous sibiling step, if available (tick/tock paradigm)

This reduced some complex ETL jobs down to 20% of the original time (as previously it was applying the transformations of each previous step at each count).

Lesson learned :)

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

榕城若虚 2025-02-06 13:32:12

阅读评论后,我详细介绍了用例。

如问题所述,我在“目标数据框架”中连接了几个表,在每次迭代中,我都会进行一些转换,例如:

# n-th table work
target = target.join(other, how='left')
target = target.filter(...)
target = target.withColumn('a', 'b')
target = target.select(...)
print(f'Target after table "other": {target.count()}')

慢速 / OOM的问题是,Spark被迫从由于结束<代码>计数< / code>,在每个表上开始完成,在每个表 /迭代时都会慢慢慢。

我发现的解决方案是在每次迭代中缓存数据框,就像这样:

cache: DataFrame = null

# ...

# n-th table work
target = target.join(other, how='left')
target = target.filter(...)
target = target.withColumn('a', 'b')
target = target.select(...)

target = target.cache()
target_count = target.count() # actually do the cache
if cache:
  cache.unpersist() # free the memory from the old cache version
cache = target

print(f'Target after table "other": {target_count}')

After reading the comments, I elaborated the solution for my use case.

As mentioned in the question, I join several tables one with each other in a "target dataframe", and at each iteration I do some transformations, like so:

# n-th table work
target = target.join(other, how='left')
target = target.filter(...)
target = target.withColumn('a', 'b')
target = target.select(...)
print(f'Target after table "other": {target.count()}')

The problem of slowliness / OoM was that Spark was forced to do all the transformations from start to finish at each table due to the ending count, making it slower and slower at each table / iteration.

The solution I found is to cache the dataframe at each iteration, like so:

cache: DataFrame = null

# ...

# n-th table work
target = target.join(other, how='left')
target = target.filter(...)
target = target.withColumn('a', 'b')
target = target.select(...)

target = target.cache()
target_count = target.count() # actually do the cache
if cache:
  cache.unpersist() # free the memory from the old cache version
cache = target

print(f'Target after table "other": {target_count}')
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文