从Spark 2.4.0到Spark 3.1.1迁移导致SortMergeJoin更改为BroadcasthashJoin

发布于 2025-02-11 12:49:32 字数 2464 浏览 1 评论 0 原文

我目前正在制作一个Spark Migration项目,该项目旨在迁移所有Spark SQL Pipelines的Spark 3.X版本,并利用其所有性能改进。我的公司正在使用SPARK 2.4.0,但我们的目标是正式使用3.1.1用于所有SPARK SQL数据管道,但没有启用AQE 。主要目标是保持所有内容相同,但使用最新版本。稍后,我们可以轻松地为所有数据管道启用AQE。

对于特定情况,在Spark版本更改后,我们面临以下错误:

org.apache.spark.SparkException: Could not execute broadcast in 300 secs. You can increase the timeout for broadcasts via spark.sql.broadcastTimeout or disable broadcast join by setting spark.sql.autoBroadcastJoinThreshold to -1

我们研究了此问题并查看Spark UI日志,我们注意到查询计划的更改如下:

Spark 2.4.0:

spark 2.4.0使用默认 sortmergejoin tbl_a tbl_b 之间进行加入操作,但是当我们查看查询时来自新Spark 3.1.1的计划:

​tbl_a 和 tbl_b 。不仅如此,如果我没有错,那么 broadcastExchange 操作在大桌子侧发生,从我的角度来看,这似乎很奇怪。

作为其他信息,我们具有以下有关执行这两个作业的属性:

  • spark.sql.autobroadcastjointhreshord = 10MB
  • spark.sql.sql.sql.aptive.enabled.enabled = false#aqe是禁用的
  • spark.sql.shuffle.partitions = 200 和其他非相关特性。

你们对为什么会发生这种情况有任何线索吗?我的问题是:

  • 鉴于AQE被禁用并且 spark.sql.autobroadcastjointhreshold 比数据集大小小得多,为什么Spark 3改变了这种情况下的加入方法?
  • 这是预期的行为,还是这代表Spark 3.X中的潜在错误?

拜托,让我知道你的想法。我感谢所有的帮助。

更新-2022-07-27

在挖掘火花代码几天后,调试它,我能够理解正在发生的事情。基本上,检索到的统计数据是问题。显然,Spark 3从蜂巢表属性中获取统计信息,称为 RAWDATASIZE 。如果未定义,则可以在以下源代码中看到 petalsize 表属性:

在我的测试期间,此属性的数量非常小(比非常小的数字(低于非常小的数字) AutoBroadcastThreshold属性)使Spark Optimizer认为可以安全地广播正确的关系,但是当实际的广播操作发生时,它显示出更大的尺寸,大致与正确的关系中的图片相同,从而导致超时错误。

我通过在Hive上为特定分区集运行以下命令来解决我的测试问题:

ANALYZE TABLE table_b PARTITION(ds='PARTITION_VALUE', hr='PARTITION_VALUE') COMPUTE STATISTICS;

RAWDATASIZE 现在为零,Spark 3使用 petalsize (具有合理的数字)作为关系大小,因此在这种情况下不使用BHJ。

现在,问题是弄清楚为什么 rawDatasize 首先是如此之小,甚至偶尔零,鉴于Hive属性 hive.stats.autogather true> true < /code>默认情况下(自动计算每个DML命令的统计信息),但这似乎是另一个问题。

I'm currently working on a Spark migration project that aims to migrate all Spark SQL pipelines for Spark 3.x version and take advantage of all performance improvements on it. My company is using Spark 2.4.0 but we are targeting to use officially the 3.1.1 for all Spark SQL data pipelines but without AQE enabled yet. The primary goal is to keep everything the same but use the newest version. Later on, we can easily enable AQE for all data pipelines.

For a specific case, right after the spark version change, we faced the following error:

org.apache.spark.SparkException: Could not execute broadcast in 300 secs. You can increase the timeout for broadcasts via spark.sql.broadcastTimeout or disable broadcast join by setting spark.sql.autoBroadcastJoinThreshold to -1

We investigated this issue and looking at Spark UI logs, we noticed a change in the query plan as follows:

Spark 2.4.0:

enter image description here

Spark 2.4.0 is using the default SortMergeJoin to do the join operation between the tbl_a and tbl_b, but when we look at query plan from new Spark 3.1.1:

enter image description here

We can notice that instead of SortMergeJoin it is using the BroadcastHashJoin to do the join between tbl_a and tbl_b. Not only this, but if I'm not wrong, the BroadcastExchange operation is occurring on the big table side, which seems strange from my perspective.

As additional information, we have the following properties regarding the execution of both jobs:

  • spark.sql.autoBroadcastJoinThreshold = 10Mb
  • spark.sql.adaptive.enabled = false # AQE is disabled
  • spark.sql.shuffle.partitions = 200
    and other non-relevant properties.

Do you guys have any clue on why this is happening? My questions are:

  • Why Spark 3 has changed the join approach in this situation given that AQE is disabled and the spark.sql.autoBroadcastJoinThreshold is much smaller than the data set size?
  • Is this the expected behavior or could this represents a potential bug in Spark 3.x?

Please, let me know your thoughts. I appreciate all the help in advance.

UPDATE - 2022-07-27

After digging into Spark code for some days, and debugging it, I was able to understand what is happening. Basically, the retrieved statistics are the problem. Apparently, Spark 3 gets the statistics from a Hive table attribute called rawDataSize. If this isn't defined, than it looks for totalSize table property, as we can see in the following source code:

https://github.com/apache/spark/blob/master/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala#L69

During my tests, this property presented a very small number (way lower than the autoBroadcastThreshold property) making Spark Optimizer think it was safe to broadcast the right relation, but when the actual broadcast operation happened, it showed a bigger size, approximately the same as in the picture for the right relation, causing the timeout error.

I fixed the issue for my test by running the following command on Hive for a specific partition set:

ANALYZE TABLE table_b PARTITION(ds='PARTITION_VALUE', hr='PARTITION_VALUE') COMPUTE STATISTICS;

The rawDataSize now is zero and Spark 3 is using the totalSize (has a reasonable number) as the relation size and consequently, is not using BHJ for this situation.

Now the issue is figuring out why the rawDataSize is so small in the first place or even zero, given that the hive property hive.stats.autogather is true by default (auto calculates the statistics for every DML command) but it seems to be another problem.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

送你一个梦 2025-02-18 12:49:32

Spark在连接方面取得了许多进步。

其中之一是:

当任何联接方的运行时统计信息都小于广播哈希(Hash Join Join)阈值时,AQE转换式 - 合并连接到广播哈希。这并不像计划广播哈希(Hash)首先加入那样有效,但是比继续进行排序连接要好,因为我们可以保存两个加入侧面的排序,并在本地读取Shuffle Files以节省网络流量(如果spark.sql.Adaptive.localshufflereader.enabled为true)

https://spark.apache.org/docs/3.1.1/sql-performance-tuning.html#converting-converting-converting-sort-merge-join-join-join-join-to-broadcast-join

Spark has made many improvements around joins.

One of them is :

AQE converts sort-merge join to broadcast hash join when the runtime statistics of any join side is smaller than the broadcast hash join threshold. This is not as efficient as planning a broadcast hash join in the first place, but it’s better than keep doing the sort-merge join, as we can save the sorting of both the join sides, and read shuffle files locally to save network traffic(if spark.sql.adaptive.localShuffleReader.enabled is true)

https://spark.apache.org/docs/3.1.1/sql-performance-tuning.html#converting-sort-merge-join-to-broadcast-join

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文