Spark3.0主要特性(1)—— Adaptive Query Execution

举报
aloe 发表于 2020/08/25 10:41:46 2020/08/25
【摘要】 Spark目前依赖的基于代价的优化规则,根据估计Plan节点的大小决定优化策略,一定程度上优化了执行计划。 但是表的统计信息不完善时,或者预估的节点大小不精确时,仍然有些场景无法做到最优。AQE能够在此基础上,带来一定的改善。

众所周知,目前Spark的基于代价的优化策略,能够给SQL执行计划带来很大的优化,比如:调整Join顺序,决定Join类型(BroadcastHashJoin 或者 SortMergeJoin)等等。 但是该优化策略有一个明显的问题是:对于代价的估计是基于表的一些统计信息的,若这些统计信息不存在或者过期,则会对SQL的优化带来负面的影响。因此,本文介绍的Adaptive Query Execution就是针对这种问题,不依赖于统计信息进行优化。


AQE的一个难点就是在何时进行再次优化规则。Spark程序执行时,一般都是并行或者是管道式的,但是了解过Spark内核的人都知道,Spark作业有一个DAG Stage的划分,Stage之间会进行shuffle操作,所以每一个stage要等待其上一个stage作业全部完成才能开始,这就为AQE的执行提供了一个时机,因此此时已经能够知道前一个Stage的中间结果的大小、列数等统计信息,可以为我们的AQE执行提供所需的统计信息。

  1. 首先,对于第一层叶子节点的Stage(即不依赖于其他任何Stage的那些Stage)不需要执行AQE;

  2. 每一个Stage执行完成后,就标记该stage的状态为完成,同时收集统计信息,并更新对应的逻辑计划;

  3. 根据收集到的这些统计信息,重新执行指定的一些优化规则,再转为物理计划;

  4. 然后,基于这个新的优化后的Plan, 从之前已经完成的Stage向后继续执行,并重复上述步骤,直到整个sql执行完成


AQE有如下3个特征:


1、动态合并Shuffle分区

Spark执行查询过程中会有很多Shuffle操作,即Stage之间的数据传递,需要通过网络对数据进行传递并合并计算等操作。影响Shuffle的性能有很多因素,其中分区的个数就是一个很重要的因素。 分区的个数目前是用默认的配置项200来决定的,该值的选择对Shuffle影响很大:

  •     若分区个数太少,则每个分区需要处理的数据量很大,每个task处理一个分区的数据,可能会需要将数据溢写到磁盘,从而降低执行效率;

  •     若分区个数太大,则每个分区处理很少的数据,但是task个数很多,导致很多小的网络数据获取和传播,同样会因为IO瓶颈带来性能下降。

在AQE中,首先设置一个较大的分区个数,然后随着Stage任务的执行,在运行时根据metrics统计信息将小的数据量的分区进行合并,从而自动调整分区个数。以 SELECT max(i) FROM tbl GROUP BY j  为例,

原表很小,在group执行之前,只有两个分区; 

初始分区个数设置为5,则本地group之后会将数据划分为5个分区;

若没有AQE,则shuffle之后将分为5个task分别执行,其中有3个task的数据量很小,提交这样的task执行会浪费一定的资源;

但是开启AQE之后,会自动将小分区合并,如下图,合并之后剩余3个分区,且每个分区的数据量相近。



2、动态切换join策略

Spark中用的最多的Join方式为BroadcastHashJoin 和 SortMergeJoin,所有的Join类型中BroadcastHashJoin性能最好,因为避免了数据的shuffle。 所以Spark目前通过估计join两端表的大小与广播阈值的关系,来判断是否可以使用BroadcastHashJoin。 但是该值的估计常常是不准确的,比如:有一个过滤效率很高的filter,可能使得过滤后的数据可以广播,但是估计值却偏大; 或者是Join的一端是一个很复杂的操作时,估计的值就更加不准确,常常估计出一个很大的值导致使用SortMergeJoin,而实际执行后会发现该复杂的查询后的结果集很小且适合广播。

AQE在执行过程中,重新进行优化,可以利用前一个Stage执行结果的大小,直接的知道是否适合广播。如下例子:


在该例子中,两个表原始Join时,根据CBO估计的大小是SortMergeJoin,但是当stage2执行完成后,调用了AQE重新执行优化规则发现,实际结果小于广播阈值(默认10M),因此可以使用BroadcastHashJoin, 则会修改Join类型,从而节约Join的时间。

这里要注意,前两个Stage中shuffle写的操作此时已经完成,这部分的时间无法避免;能优化的是Join的Stage中Shuffle读以及Join的执行时间。



3、动态优化join数据倾斜

当每个分区的数据分布不均匀时,容易出现数据倾斜的问题,有些场景下尤其是Join时,若出现数据倾斜,可能会导致个别的task任务特别繁重,其他所有的task都执行完毕,executor处于空闲状态,等待这几个数据倾斜的task执行完成。AQE能够自动检测具有数据倾斜的分区,并将这些数据量很大的分区进行切分。 如下例: A和B表做Join,其中A0分区特别大,在没有AQE时如下图:


开启AQE之后,将A0分区划分为两个差不多大的分区(A0-0和A0-1),这两个分区同时与B0分区进行join,因为A0本来就是需要与B0做Join的。划分后,使用了5个数据量相近的task同时执行该任务,可以获得更好的性能。



参考:https://databricks.com/blog/2020/05/29/adaptive-query-execution-speeding-up-spark-sql-at-runtime.html


【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。