Spark源码分析(一):Spark-SQL中关于Subquery的处理

举报
那人好像一条狗~ 发表于 2020/07/27 16:48:10 2020/07/27
【摘要】 Spark-SQL中关于Subquery的处理

一、Spark SQL的整体架构图

image.png

                                                                                                        图1.1 Spark SQL整体架构图


二、Optimized Logical Plan到ExcutedPlan的优化过程

1、从图1.1可以看到从Resolved Logical Plan到Optimizer Logical Plan需要经过SparkOptimizer中的多个优化规则,对Resolved Logical Plan进行优化。

lazy val optimizedPlan: LogicalPlan = sparkSession.sessionState.optimizer.execute(withCachedData.clone())


2、对于其中包含的Subqueries,会将遍历整个Plan,找到对应的SubqueryExpression,并对这些SubqueryExpression再次利用所有的优化规则,具体可见OptimizeSubqueries规则。

object OptimizeSubqueries extends Rule[LogicalPlan] {
    private def removeTopLevelSort(plan: LogicalPlan): LogicalPlan = {
      plan match {
        case Sort(_, _, child) => child
        case Project(fields, child) => Project(fields, removeTopLevelSort(child))
        case other => other
      }
    }
    def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
      case s: SubqueryExpression =>
        val Subquery(newPlan) = Optimizer.this.execute(Subquery(s.plan))
        // At this point we have an optimized subquery plan that we are going to attach
        // to this subquery expression. Here we can safely remove any top level sort
        // in the plan as tuples produced by a subquery are un-ordered.
        s.withNewPlan(removeTopLevelSort(newPlan))
    }
  }


3、然后通过Query Planner 将 Optimized Logical Plan 转换成多个 Physical Plan。


lazy val sparkPlan: SparkPlan = planner.plan(ReturnAnswer(optimizedPlan.clone())).next()


4、CBO 根据 Cost Model 算出每个 Physical Plan 的代价并选取代价最小的 Physical Plan 作为最终的 Physical Plan


lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan.clone())

protected def prepareForExecution(plan: SparkPlan): SparkPlan = {
  preparations.foldLeft(plan) { case (sp, rule) => rule.apply(sp) }
}
  
protected def preparations: Seq[Rule[SparkPlan]] =
  sparkSession.sessionState.preExecutionRules ++ Seq(
  ***
  
  ***)

其中在PlanSubqueries规则中,会对Scalarubquery以及InSubquery进一步处理。

PlanSubqueries(sparkSession: SparkSession) Rule[SparkPlan] {
  (plan: SparkPlan): SparkPlan = {
    plan.transformAllExpressions {
      subquery: expressions.ScalarSubquery =>
        val executedPlan = new QueryExecution(sparkSession, subquery.plan).executedPlan
        ScalarSubquery(
          ({subquery.exprId.id}executedPlan)subquery.exprId)
      expressions.(values(query_exprId_)) =>
        expr = (values.length == ) {
          values.head
        } {
          CreateNamedStruct(
            values.zipWithIndex.flatMap { (vindex) =>
              ((index)v)
            }
          )
        }
        executedPlan = new QueryExecution(sparkSession, subquery.plan).executedPlan
        InSubqueryExec(expr({exprId.id}executedPlan)exprId)
    }
  }
}

关键是下面这行代码,在new QueryExecution(sparkSession, subquery.plan)会将subquery中的plan,再一次经过parser、analyzer、optimizer,以及通过Query Planner 将 Optimized Logical Plan 转换成多个 Physical Plan,基于CBO来选择合适的executedPlan。

val executedPlan = new QueryExecution(sparkSession, subquery.plan).executedPlan


5、至此关于Spark SQL中对subquery的处理过程讲解完毕,可以看到针对ScalarSubquery和InSubquery这些subquery,会被处理多次,很可能这些多次处理是冗余的,甚至会出现有些subquery无法reuse的问题,基于此Spark 3.0中修改了针对subquery的处理逻辑,后面的博文中会详情讲解。

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。