Spark源码分析(一):Spark-SQL中关于Subquery的处理
一、Spark SQL的整体架构图
图1.1 Spark SQL整体架构图
二、Optimized Logical Plan到ExcutedPlan的优化过程
1、从图1.1可以看到从Resolved Logical Plan到Optimizer Logical Plan需要经过SparkOptimizer中的多个优化规则,对Resolved Logical Plan进行优化。
lazy val optimizedPlan: LogicalPlan = sparkSession.sessionState.optimizer.execute(withCachedData.clone())
2、对于其中包含的Subqueries,会将遍历整个Plan,找到对应的SubqueryExpression,并对这些SubqueryExpression再次利用所有的优化规则,具体可见OptimizeSubqueries规则。
object OptimizeSubqueries extends Rule[LogicalPlan] {
private def removeTopLevelSort(plan: LogicalPlan): LogicalPlan = {
plan match {
case Sort(_, _, child) => child
case Project(fields, child) => Project(fields, removeTopLevelSort(child))
case other => other
}
}
def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
case s: SubqueryExpression =>
val Subquery(newPlan) = Optimizer.this.execute(Subquery(s.plan))
// At this point we have an optimized subquery plan that we are going to attach
// to this subquery expression. Here we can safely remove any top level sort
// in the plan as tuples produced by a subquery are un-ordered.
s.withNewPlan(removeTopLevelSort(newPlan))
}
}
3、然后通过Query Planner 将 Optimized Logical Plan 转换成多个 Physical Plan。
lazy val sparkPlan: SparkPlan = planner.plan(ReturnAnswer(optimizedPlan.clone())).next()
4、CBO 根据 Cost Model 算出每个 Physical Plan 的代价并选取代价最小的 Physical Plan 作为最终的 Physical Plan。
lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan.clone())
protected def prepareForExecution(plan: SparkPlan): SparkPlan = {
preparations.foldLeft(plan) { case (sp, rule) => rule.apply(sp) }
}
protected def preparations: Seq[Rule[SparkPlan]] =
sparkSession.sessionState.preExecutionRules ++ Seq(
***
***)
其中在PlanSubqueries规则中,会对Scalarubquery以及InSubquery进一步处理。
PlanSubqueries(sparkSession: SparkSession) Rule[SparkPlan] {
(plan: SparkPlan): SparkPlan = {
plan.transformAllExpressions {
subquery: expressions.ScalarSubquery =>
val executedPlan = new QueryExecution(sparkSession, subquery.plan).executedPlan
ScalarSubquery(
({subquery.exprId.id}executedPlan)subquery.exprId)
expressions.(values(query_exprId_)) =>
expr = (values.length == ) {
values.head
} {
CreateNamedStruct(
values.zipWithIndex.flatMap { (vindex) =>
((index)v)
}
)
}
executedPlan = new QueryExecution(sparkSession, subquery.plan).executedPlan
InSubqueryExec(expr({exprId.id}executedPlan)exprId)
}
}
}
关键是下面这行代码,在new QueryExecution(sparkSession, subquery.plan)会将subquery中的plan,再一次经过parser、analyzer、optimizer,以及通过Query Planner 将 Optimized Logical Plan 转换成多个 Physical Plan,基于CBO来选择合适的executedPlan。
val executedPlan = new QueryExecution(sparkSession, subquery.plan).executedPlan
5、至此关于Spark SQL中对subquery的处理过程讲解完毕,可以看到针对ScalarSubquery和InSubquery这些subquery,会被处理多次,很可能这些多次处理是冗余的,甚至会出现有些subquery无法reuse的问题,基于此Spark 3.0中修改了针对subquery的处理逻辑,后面的博文中会详情讲解。
- 点赞
- 收藏
- 关注作者
评论(0)