相对于 Spark 2.3/2.4,Spark 3 提供了一些新特性,极大优化了超大数据量的查询。主要有三点:引入 Adaptive Query Execution 自适应执行,解决了热点数据倾斜的问题支持 Hint 提示,增加了SQL的可操控性增加了一些 set 参数,增加SQL的可操控性Adaptive Query Execution 默认配置 Spark 3 默认启用AQE,启用以后Spark会根据运行时的统计信息,动态优化查询计划。set spark.sql.adaptive.enabled=true; 针对数据倾斜,Spark3 默认启用了 AQE,并预设置了参数:set spark.sql.adaptive.skewJoin.enabled=true; set spark.sql.adaptive.skewJoin.skewedPartitionFactor=5; set spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes=256MB; set spark.sql.adaptive.advisoryPartitionSizeInBytes=64MB; 一个分区被判定为数据倾斜,必须满足两个条件:分区数据的大小超过 skewedPartitionThresholdInBytes,默认256M分区数据的大小超过 所有分区大小的中位数*skewedPartitionFactor。 假如一个stage shuffle以后有200个分区,分区大小中位数是 128M,skewedPartitionFactor=5,那么超过640MB的分区会被判定为数据倾斜。 对于倾斜的分区进行拆分时,每个分区的大小接近 advisoryPartitionSizeInBytes。 在TB数据量级的Hive表上查询时,如果Yarn集群比较大,skewedPartitionThresholdInBytes 和 advisoryPartitionSizeInBytes 默认值就有点小了,可以根据情况自己情况调大。但skewedPartitionThresholdInBytes 理论上应该比 advisoryPartitionSizeInBytes 大。利用AEQ 合并分区 在 JOIN 或 Aggregate 以后,shuffle结果的分区可能会特别多,导致下一个Stage的任务数特别多,或者Hive表包含了太多的小文件。Spark默认启用了 coalescePartitions 功能用来优化这种情况:set spark.sql.adaptive.coalescePartitions.enabled=true; set spark.sql.adaptive.coalescePartitions.parallelismFirst=true; set spark.sql.adaptive.coalescePartitions.minPartitionSize=1MB; 这个参数有两种工作模式: 高并发模式:coalescePartitions.parallelismFirst=true 时,通过 coalescePartitions.minPartitionSize 判断一个 partition 是否需要合并; 低并发模式:coalescePartitions.parallelismFirst=false 时,通过前面提到的 advisoryPartitionSizeInBytes=64MB 判断一个partition 是否需要合并; 可以看到,Spark默认选择了高并发模式,带来了结果是 shuffle后任务 (Task) 过多,如果每个任务都处理极少的数据量,调度带来的性能损耗会大于计算带来的效率增益。 由于这个参数是全局生效的,所有的 shuffle 里都会生效。在实际使用中,可以调高 coalescePartitions.minPartitionSize,以取得比较好的平衡。或者使用 Spark hints 对特定的环节进行合并。Spark Hints Spark Dataset API 提供了很多方法来精细化控制每个环节的执行,比如 repartition、coalesce、broadcast 等,SQL 基础的语法框架并不支持这些。在不改变 SQL 基本语法的前提下,Spark 引入了 Hint (提示)。 我们可以通过类似于注释的语法,实现对特定环节的精细化控制。 场景一:控制 Join 模式 从性能角度 Broadcast Join > Shuffle Hash Join > SortMerge Join。与TB级的表 JOIN 时,我们尽量用空间换时间,选择Broadcast Join。通常情况下,spark.sql.autoBroadcastJoinThreshold 默认值是10MB,我们可以调大到512MB:set spark.sql.autoBroadcastJoinThreshold=536870912; 10MB 可以容纳 131万个 BIGINT,但涉及到的字段比较多时,支持的数据条数会剧烈下降。增加这个阈值,上千万级别的大表都可以被 Broadcast,而TB级的表不需要做shuffle,这个空间损耗远远小于 TB级大表做Shuffle带来的时间和空间损耗。 在个别情况下,比如对表大小的统计不准确,autoBroadcastJoinThreshold 无法生效的情况,可以使用 spark hint 强制走 broadcast:SELECT /*+ BROADCAST(t1) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key; 好的一点是:Broadcast hint 的优先级要高于autoBroadcastJoinThreshold参数。 对于JOIN 两侧表都比较大的情况,也可以通过 hint 触发 shuffle hash join:SELECT /*+ SHUFFLE_HASH(t1) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key; 场景二:Partitioning Hints 分区提示 分区提示支持四种模式:coalesce:合并分区,但不触发shufflerepartition:允许按照特定字段合并/拆分分区,触发shufflerepartition_by_range:和repartition类似rebalance:合并/拆分分区,但会保持输出分区大小尽量一致,倾斜的分区会被拆开 在写入 Hive表时,选择rebalance 模式能够比较好地保持输出文件大小的均衡。SELECT /*+ COALESCE(100) */ * FROM t; SELECT /*+ REBALANCE(c) */ * FROM t;其他常用的优化参数 场景一:Hive 表小文件读取优化 Spark 在读取大文件时,如果文件大小超过 hdfs block 大小,会默认拆分到多个partition里。如果是小文件,则会每个小文件对应一个Task。如果Task数量超过10w,Spark集群的性能会出现严重的衰减,绝大多数情况下是小文件问题: 对于小文件问题,如果上游能优化一下,比如通过 repartition hints 或 coalescePartitions 参数等,下游一般都不用考虑这个问题。如果上游解决不了,下游还可以通过 maxPartitionBytes 将多个文件合并到同一个分区下:set spark.sql.files.maxPartitionBytes=134217728; 值得注意的是:这个参数只对 parquet、orc 和 json 格式的表生效,对 text 格式的hive表无效。 场景二:控制 shuffle 输出的分区数set spark.sql.shuffle.partitions=200; 这是一个全局参数,如果hive表的数据量比较大,可以调大这个参数。否则可能会出现 executor oom 的情况。当然另外一种方式是 增大 executor memory:set spark.executor.memory=16g; 本期的 SQL 优化就到这里,想要查询更多资料的话:Spark hints: Google 搜索 "Spark 3.2.1 hint"Spark性能优化: Google 搜索 "Performance Tuning - Spark 3.2.1 Documentation"更多set 参数:参考 github apache/spark 下 sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala