Spark源码分析(一):Spark-SQL中关于Subquery的处理
一、Spark SQL的整体架构图
图1.1 Spark SQL整体架构图
二、Optimized Logical Plan到ExcutedPlan的优化过程
1、从图1.1可以看到从Resolved Logical Plan到Optimizer Logical Plan需要经过SparkOptimizer中的多个优化规则,对Resolved Logical Plan进行优化。
lazy val optimizedPlan: LogicalPlan = sparkSession.sessionState.optimizer.execute(withCachedData.clone())
2、对于其中包含的Subqueries,会将遍历整个Plan,找到对应的SubqueryExpression,并对这些SubqueryExpression再次利用所有的优化规则,具体可见OptimizeSubqueries规则。
object OptimizeSubqueries extends Rule[LogicalPlan] { private def removeTopLevelSort(plan: LogicalPlan): LogicalPlan = { plan match { case Sort(_, _, child) => child case Project(fields, child) => Project(fields, removeTopLevelSort(child)) case other => other } } def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions { case s: SubqueryExpression => val Subquery(newPlan) = Optimizer.this.execute(Subquery(s.plan)) // At this point we have an optimized subquery plan that we are going to attach // to this subquery expression. Here we can safely remove any top level sort // in the plan as tuples produced by a subquery are un-ordered. s.withNewPlan(removeTopLevelSort(newPlan)) } }
3、然后通过Query Planner 将 Optimized Logical Plan 转换成多个 Physical Plan。
lazy val sparkPlan: SparkPlan = planner.plan(ReturnAnswer(optimizedPlan.clone())).next()
4、CBO 根据 Cost Model 算出每个 Physical Plan 的代价并选取代价最小的 Physical Plan 作为最终的 Physical Plan。
lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan.clone()) protected def prepareForExecution(plan: SparkPlan): SparkPlan = { preparations.foldLeft(plan) { case (sp, rule) => rule.apply(sp) } } protected def preparations: Seq[Rule[SparkPlan]] = sparkSession.sessionState.preExecutionRules ++ Seq( *** ***)
其中在PlanSubqueries规则中,会对Scalarubquery以及InSubquery进一步处理。
PlanSubqueries(sparkSession: SparkSession) Rule[SparkPlan] { (plan: SparkPlan): SparkPlan = { plan.transformAllExpressions { subquery: expressions.ScalarSubquery => val executedPlan = new QueryExecution(sparkSession, subquery.plan).executedPlan ScalarSubquery( ({subquery.exprId.id}executedPlan)subquery.exprId) expressions.(values(query_exprId_)) => expr = (values.length == ) { values.head } { CreateNamedStruct( values.zipWithIndex.flatMap { (vindex) => ((index)v) } ) } executedPlan = new QueryExecution(sparkSession, subquery.plan).executedPlan InSubqueryExec(expr({exprId.id}executedPlan)exprId) } } }
关键是下面这行代码,在new QueryExecution(sparkSession, subquery.plan)会将subquery中的plan,再一次经过parser、analyzer、optimizer,以及通过Query Planner 将 Optimized Logical Plan 转换成多个 Physical Plan,基于CBO来选择合适的executedPlan。
val executedPlan = new QueryExecution(sparkSession, subquery.plan).executedPlan
5、至此关于Spark SQL中对subquery的处理过程讲解完毕,可以看到针对ScalarSubquery和InSubquery这些subquery,会被处理多次,很可能这些多次处理是冗余的,甚至会出现有些subquery无法reuse的问题,基于此Spark 3.0中修改了针对subquery的处理逻辑,后面的博文中会详情讲解。
- 点赞
- 收藏
- 关注作者
评论(0)