Apache Spark Writer partitionby造成OOM
可以使用大小超过700GB的镶木quet文件的数据集。镶板由2列组成,每个列都带有JSON文档。 我现在想转换这些镶木木材文件并用分区保存它们。阅读,转换和保存。最后,有一个新文件夹,上面有分区和相应的镶木片文件。这么多的想法。
读取源数据是用 spark.read.parquet(“/my/folder/**/em> .parquet”)**的。 使用一些JSON辅助方法在此数据框架上进行转换。完成后,提供了具有多个列的数据框。除了日期(Yyyy-MM-DD)外,还有其他列,而且原始数据仍然可用。 对于写作,我执行 partitionByrange(“ date”,“ col1”,“ col2”), sortwithInpartitions(“ date”,“ col1”)和 write.partititionby(“ date”)。 我的小火花集群(6名工人,有4个核心和2GB RAM)现在忙了几个小时。但是,在写作时,总会有一个OOM。我的驾驶员(Spark-Shell)配备了24GB RAM,机器没有提供更多。 这些文件可以单独处理,我的问题似乎是数据量。我的猜测是:合并工人的部分结果导致驾驶员的OOM。不幸的是,我还尝试了MaxRecordSperfile选项,但没有成功。 还有哪些其他可能性可以避免OOM?
archiveDF
.repartitionByRange($"xxxx", $"startTime",$"uuid") // !!! causes oom !!!
.sortWithinPartitions("xxxx","startTime")
.write
.mode("append")
.option("maxRecordsPerFile", 50000)
.partitionBy("xxxx")
.format("parquet")
.save("/long-term-archive/data-store")
A dataset of Parquet files with a size of more than 700gb is available. The Parquet consists of 2 columns, each with a JSON document.
I would now like to convert these Parquet files and save them with partitions. Read, transform and save. Finally, there is a new folder with partitions and corresponding Parquet files. So much for the idea.
Reading the source data is done with a spark.read.parquet("/my/folder/ **/.parquet")*.
The transformation is done on this dataframe with some JSON auxiliary methods. After completion, a dataframe with several columns is available. In addition to a date (YYYY-MM-DD), there are other columns, and the original data is also still available.
For writing, I execute a partitionByRange("date", "col1", "col2"), sortWithinPartitions("date", "col1") and a write.partititionBy("date").
My small Spark cluster (6 workers, with 4 core and 2GB ram) are now busy for a few hours. When writing, however, there is always an oom. My driver (spark-shell) is equipped with 24gb ram and the machine does not give more.
The files individually can be processed well, my problem seems to be the amount of data. My guess: Merging the partial results from the workers leads to an oom in the driver. I have also experimented with the maxRecordsPerFile option, unfortunately without success.
What other possibilities are there to avoid oom?
archiveDF
.repartitionByRange(quot;xxxx", quot;startTime",quot;uuid") // !!! causes oom !!!
.sortWithinPartitions("xxxx","startTime")
.write
.mode("append")
.option("maxRecordsPerFile", 50000)
.partitionBy("xxxx")
.format("parquet")
.save("/long-term-archive/data-store")
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
当您使用repartitionByrange(无论如何在Spark 3.2.1中)而无需提供许多通缉分区时,Spark使用
spark.sql.shuffle.shuffle.partitions
(默认情况下为200)作为您想要的分区数结束。如果将总数据除以分区的NR,则获得700 GB/200个分区= 3.5 GB/分区。这很大(通常约为100MB分区是一个好主意),而您的执行者只有0.5GB RAM/CORE。因此,就您而言,您可以尝试使用7000个分区,看看是否得到这样的结果。类似:
我假设在那些重新分配键上没有巨大的数据偏差。如果偏斜很大,您需要加钥匙。
另一个问题:写作时再次重新分配的用途是什么?重新分配需要一个洗牌操作,这通常是更昂贵的操作之一。您应该试图以最少的次数重新分配。
When you use repartitionByRange (in Spark 3.2.1 anyway) without supplying a number of wanted partitions, spark uses
spark.sql.shuffle.partitions
(which is 200 by default) as the number of partitions you want to end up with.If you divide your total data by the nr of partitions, you get 700 GB/200 partitions = 3.5 GB/partition. This is very large (typically around 100MB partitions are a good idea) and your executors have only 0.5GB RAM/core. So in your case, you could try to use 7000 partitions and see if you get a better result like that. Something like:
I'm assuming there is no huge data skew on those repartitioning keys. If there is a large skew, you want have to salt your keys.
Another question: what is the use of repartitioning again when writing? Repartitioning requires a shuffle operation which is typically one of the more expensive operations there are. You should try to aim to repartition the least amount of times.