背景
本文基于Spark 3.5.3
在Spark引入了AQE以后,Spark在运行的时候能够拿到运行时候的Shuffle统计信息,这些信息可以更好的来调整join的策略,当下规则下这种策略的调整是通过增加hint来进行控制的, 规则的目的是防止负优化。
分析
这里会有三种优化场景:
1. 检测大量空分区 → 添加 NO_BROADCAST_HASH HINT
对应的代码如下:
1 private def hasManyEmptyPartitions(mapStats: MapOutputStatistics): Boolean = { 2 val partitionCnt = mapStats.bytesByPartitionId.length 3 val nonZeroCnt = mapStats.bytesByPartitionId.count(_ > 0) 4 partitionCnt > 0 && nonZeroCnt > 0 && 5 (nonZeroCnt * 1.0 / partitionCnt) < conf.nonEmptyPartitionRatioForBroadcastJoin 6 } 7
这里有个配置spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin默认是0.2,
这种情况主要是在于AQE 中 等值join 转换 BroadcastHashJoinExec,防止 转化为BroadcastHashJoinExec,因为此时的shuffle数据已经有了,而且非空的分区数据比较少,直接shuffle join会更快,因为broadcast join还会有broadcast 的过程
2.分区数据小 → 添加 PREFER_SHUFFLE_HASH HINT
对应的代码如下:
1private def preferShuffledHashJoin(mapStats: MapOutputStatistics): Boolean = { 2 val maxShuffledHashJoinLocalMapThreshold = 3 conf.getConf(SQLConf.ADAPTIVE_MAX_SHUFFLE_HASH_JOIN_LOCAL_MAP_THRESHOLD) 4 val advisoryPartitionSize = conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES) 5 advisoryPartitionSize <= maxShuffledHashJoinLocalMapThreshold && 6 mapStats.bytesByPartitionId.forall(_ <= maxShuffledHashJoinLocalMapThreshold) 7 } 8
这里有spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold(默认是0),只要所有分区大小小于这个阈值,就使用hash join,因为如果使用SortMergeJoin的话,在join之前还会有Sort的操作,而hash join没有这个sort操作,数据量小的情况,hash join更快
* 3.两者都满足 → 添加 SHUFFLE_HASH HINT
如果以上两种情况都满足,则直接加 SHUFFLE_HASH HINT
注意:以上发生的转换是在用户没有手动指定HINT的前提下
总体的决策逻辑流程如下:
1 ┌─────────────────────────┐ 2 │ Join 节点 (ExtractEquiJoinKeys) │ 3 └───────────┬─────────────┘ 4 │ 5 ┌─────────────────────┼─────────────────────┐ 6 ▼ ▼ 7 检查 Left 子节点 检查 Right 子节点 8 │ │ 9 ▼ ▼ 10 ┌───────────────────────┐ ┌───────────────────────┐ 11 │ ShuffleQueryStageExec │ │ ShuffleQueryStageExec │ 12 │ 且 isMaterialized │ │ 且 isMaterialized │ 13 │ 且 mapStats.isDefined │ │ 且 mapStats.isDefined │ 14 └───────────┬───────────┘ └───────────┬───────────┘ 15 │ │ 16 ┌──────────┴──────────┐ ┌──────────┴──────────┐ 17 ▼ ▼ ▼ ▼ 18 Many Empty Partitions? All Partitions Many Empty Partitions? All Partitions 19 │ Small? │ Small? 20 ▼ ▼ ▼ ▼ 21 ───────────── ───────────── ───────────── ───────────── 22 │ │ │ │ 23 ▼ ▼ ▼ ▼ 24 demote BHJ: prefer shuffle demote BHJ: prefer shuffle 25 NO_BROADCAST_HASH hash: PREFER_ hash: PREFER_ hash: PREFER_ 26 SHUFFLE_HASH SHUFFLE_HASH SHUFFLE_HASH 27
《Spark DynamicJoinSelection 规则根据AQE统计信息动态调整Join策略》 是转载文章,点击查看原文。