我目前正在制作一个Spark Migration项目,该项目旨在迁移所有Spark SQL Pipelines的Spark 3.X版本,并利用其所有性能改进。我的公司正在使用SPARK 2.4.0,但我们的目标是正式使用3.1.1用于所有SPARK SQL数据管道,但没有启用AQE 。主要目标是保持所有内容相同,但使用最新版本。稍后,我们可以轻松地为所有数据管道启用AQE。
对于特定情况,在Spark版本更改后,我们面临以下错误:
org.apache.spark.SparkException: Could not execute broadcast in 300 secs. You can increase the timeout for broadcasts via spark.sql.broadcastTimeout or disable broadcast join by setting spark.sql.autoBroadcastJoinThreshold to -1
我们研究了此问题并查看Spark UI日志,我们注意到查询计划的更改如下:
Spark 2.4.0:
spark 2.4.0使用默认 sortmergejoin
在 tbl_a
和 tbl_b
之间进行加入操作,但是当我们查看查询时来自新Spark 3.1.1的计划:
tbl_a 和 tbl_b
。不仅如此,如果我没有错,那么 broadcastExchange
操作在大桌子侧发生,从我的角度来看,这似乎很奇怪。
作为其他信息,我们具有以下有关执行这两个作业的属性:
-
spark.sql.autobroadcastjointhreshord
= 10MB
-
spark.sql.sql.sql.aptive.enabled.enabled
= false#aqe是禁用的
spark.sql.shuffle.partitions
= 200
和其他非相关特性。
你们对为什么会发生这种情况有任何线索吗?我的问题是:
- 鉴于AQE被禁用并且
spark.sql.autobroadcastjointhreshold
比数据集大小小得多,为什么Spark 3改变了这种情况下的加入方法?
- 这是预期的行为,还是这代表Spark 3.X中的潜在错误?
拜托,让我知道你的想法。我感谢所有的帮助。
更新-2022-07-27
在挖掘火花代码几天后,调试它,我能够理解正在发生的事情。基本上,检索到的统计数据是问题。显然,Spark 3从蜂巢表属性中获取统计信息,称为 RAWDATASIZE
。如果未定义,则可以在以下源代码中看到 petalsize
表属性:
在我的测试期间,此属性的数量非常小(比非常小的数字(低于非常小的数字) AutoBroadcastThreshold属性)使Spark Optimizer认为可以安全地广播正确的关系,但是当实际的广播操作发生时,它显示出更大的尺寸,大致与正确的关系中的图片相同,从而导致超时错误。
我通过在Hive上为特定分区集运行以下命令来解决我的测试问题:
ANALYZE TABLE table_b PARTITION(ds='PARTITION_VALUE', hr='PARTITION_VALUE') COMPUTE STATISTICS;
RAWDATASIZE
现在为零,Spark 3使用 petalsize
(具有合理的数字)作为关系大小,因此在这种情况下不使用BHJ。
现在,问题是弄清楚为什么 rawDatasize
首先是如此之小,甚至偶尔零,鉴于Hive属性 hive.stats.autogather
是 true> true < /code>默认情况下(自动计算每个DML命令的统计信息),但这似乎是另一个问题。
I'm currently working on a Spark migration project that aims to migrate all Spark SQL pipelines for Spark 3.x version and take advantage of all performance improvements on it. My company is using Spark 2.4.0 but we are targeting to use officially the 3.1.1 for all Spark SQL data pipelines but without AQE enabled yet. The primary goal is to keep everything the same but use the newest version. Later on, we can easily enable AQE for all data pipelines.
For a specific case, right after the spark version change, we faced the following error:
org.apache.spark.SparkException: Could not execute broadcast in 300 secs. You can increase the timeout for broadcasts via spark.sql.broadcastTimeout or disable broadcast join by setting spark.sql.autoBroadcastJoinThreshold to -1
We investigated this issue and looking at Spark UI logs, we noticed a change in the query plan as follows:
Spark 2.4.0:

Spark 2.4.0 is using the default SortMergeJoin
to do the join operation between the tbl_a
and tbl_b
, but when we look at query plan from new Spark 3.1.1:

We can notice that instead of SortMergeJoin
it is using the BroadcastHashJoin
to do the join between tbl_a
and tbl_b
. Not only this, but if I'm not wrong, the BroadcastExchange
operation is occurring on the big table side, which seems strange from my perspective.
As additional information, we have the following properties regarding the execution of both jobs:
spark.sql.autoBroadcastJoinThreshold
= 10Mb
spark.sql.adaptive.enabled
= false # AQE is disabled
spark.sql.shuffle.partitions
= 200
and other non-relevant properties.
Do you guys have any clue on why this is happening? My questions are:
- Why Spark 3 has changed the join approach in this situation given that AQE is disabled and the
spark.sql.autoBroadcastJoinThreshold
is much smaller than the data set size?
- Is this the expected behavior or could this represents a potential bug in Spark 3.x?
Please, let me know your thoughts. I appreciate all the help in advance.
UPDATE - 2022-07-27
After digging into Spark code for some days, and debugging it, I was able to understand what is happening. Basically, the retrieved statistics are the problem. Apparently, Spark 3 gets the statistics from a Hive table attribute called rawDataSize
. If this isn't defined, than it looks for totalSize
table property, as we can see in the following source code:
https://github.com/apache/spark/blob/master/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala#L69
During my tests, this property presented a very small number (way lower than the autoBroadcastThreshold property) making Spark Optimizer think it was safe to broadcast the right relation, but when the actual broadcast operation happened, it showed a bigger size, approximately the same as in the picture for the right relation, causing the timeout error.
I fixed the issue for my test by running the following command on Hive for a specific partition set:
ANALYZE TABLE table_b PARTITION(ds='PARTITION_VALUE', hr='PARTITION_VALUE') COMPUTE STATISTICS;
The rawDataSize
now is zero and Spark 3 is using the totalSize
(has a reasonable number) as the relation size and consequently, is not using BHJ for this situation.
Now the issue is figuring out why the rawDataSize
is so small in the first place or even zero, given that the hive property hive.stats.autogather
is true
by default (auto calculates the statistics for every DML command) but it seems to be another problem.
发布评论
评论(1)
Spark在连接方面取得了许多进步。
其中之一是:
当任何联接方的运行时统计信息都小于广播哈希(Hash Join Join)阈值时,AQE转换式 - 合并连接到广播哈希。这并不像计划广播哈希(Hash)首先加入那样有效,但是比继续进行排序连接要好,因为我们可以保存两个加入侧面的排序,并在本地读取Shuffle Files以节省网络流量(如果spark.sql.Adaptive.localshufflereader.enabled为true)
https://spark.apache.org/docs/3.1.1/sql-performance-tuning.html#converting-converting-converting-sort-merge-join-join-join-join-to-broadcast-join
Spark has made many improvements around joins.
One of them is :
AQE converts sort-merge join to broadcast hash join when the runtime statistics of any join side is smaller than the broadcast hash join threshold. This is not as efficient as planning a broadcast hash join in the first place, but it’s better than keep doing the sort-merge join, as we can save the sorting of both the join sides, and read shuffle files locally to save network traffic(if spark.sql.adaptive.localShuffleReader.enabled is true)
https://spark.apache.org/docs/3.1.1/sql-performance-tuning.html#converting-sort-merge-join-to-broadcast-join