基于Spark3.4.2源码分析SparkSQL执行过程

基本执行流程

Spark SQL运行架构简图

1.SqlParser
1.SqlParser
1.SqlParser
2.Analyzer
AnalysisRules
3.Optimizer
4.Strategies
5.Execution
SQL
DataFrame
Dataset
Unresolved
LogicalPlan
LogicalPlan
Optimized
LogicalPlan
Physical
Plans
CostModel
Selected
LogicalPlan
SessionCatalog
RDDs
  1. 将SQL语句解析成未绑定的逻辑计划(未解决的关系Relation、函数Function、属性Attribute)
  2. 配合元数据信息,完善未绑定的逻辑计划的属性转换成绑定的逻辑计划,将元数据信息与catalog绑定。
  3. 使用优化规则生成优化逻辑计划
  4. 将优化的逻辑计划转换成可执行的物理计划
  5. 使用preparations规则进行预处理后,使用SparkPlan的execute执行rdd

Spark SQL四大执行阶段简图

planning 阶段
规则应用到副本
准备物理计划规则
生成物理计划副本
optimization 阶段
校验优化结果
执行优化规则
analysis 阶段
校验分析结果
执行分析规则
parsing 阶段
结果处理
SQL语法解析
生成解析树
生成AST

Unresolved逻辑计划树相关类

继承
继承
继承
继承
实现
继承
继承
继承
Association
Association
继承
实现
«trait»
ParserInterface
parsePlan(sqlText: String)
parseExpression(sqlText: String)
parseTableIdentifier(sqlText: String)
parseFunctionIdentifier(sqlText: String)
parseMultipartIdentifier(sqlText: String)
parseTableSchema(sqlText: String)
parseDataType(sqlText: String)
parseQuery(sqlText: String)
«abstract»
AbstractSqlParser
CatalystSqlParser
SparkSqlParser
AstBuilder
SparkSqlAstBuilder
SqlBaseParserBaseVisitor
AbstractParseTreeVisitor
«interface»
ParseTreeVisitor<T>
T visit(ParseTree tree)
T visitChildren(RuleNode node)
T visitTerminal(TerminalNode node)
T visitErrorNode(ErrorNode node)
«interface»
SqlBaseParserVisitor
MyParser

RuleExector相关类

解析语法树绑定,使用rule绑定逻辑计划

继承
继承
继承
继承
继承
继承
Association
Association
Association
继承
Dependency
«abstract»
RuleExecutor
execute(plan: TreeType)
«abstract»
Strategy
maxIterations: Int
Analyzer
fixedPoint:FixedPoint
batches: Seq[Batch]
«abstract»
Optimizer
FixedPoint
Batch
name: String
strategy: Strategy
rules: Rule[TreeType]*
SparkOptimizer
SimpleAnalyzer
Once
CheckAnalysis
SessionCatalog
UserMain SparkSession SessionState AbstractSqlParser AstBuilder QueryExecution Analyzer RuleExcutor sql() 1.创建实例 返回 2.parsePlan() 3.parse() 4.visitSingleStatement() 5.创建实例 返回 assertAnalyzed executeAndCheck 6.executeAndTrack UserMain SparkSession SessionState AbstractSqlParser AstBuilder QueryExecution Analyzer RuleExcutor 流程
  1. 在SparkSession中创建SessionState实例对象

  2. 使用SessionState中的转换接口,调用AbstractSqlParser.parsePlan

  3. 基于基础 SQL 解析基础架构,使用到了ANTLR4进行此法解析。使用由插件分别生成词法解析器SqlBaseLexer和语法解析器SqlBaseParser,尝试使用Antlr4最快解析模式PredictionMode.SLL解析后,解析失败会在使用PredictionMode.LL模式解析,对解析结果进行函数柯理化。将sql的内容先由词法解析器解析成token序列,传入语法解析器解析。

  4. 使用Visitor模式进行遍历,参照SqlBase.g4的语法树进行匹配,产生逻辑计划(未绑定)。UnresolvedRelation语法树。

  5. 构建QueryExecution,通过查询执行器将Unresolved LogicalPlan一路变成costmodel。

    select * from people语句为例,其转化过程如下:

    == Parsed Logical Plan ==
    'Project [*]
    +- 'UnresolvedRelation [global_temp, people], [], false
    
    == Analyzed Logical Plan ==
    age: bigint, name: string
    Project [age#8L, name#9]
    +- SubqueryAlias global_temp.people
       +- View (`global_temp`.`people`, [age#8L,name#9])
          +- Relation [age#8L,name#9] json
    
    == Optimized Logical Plan ==
    Relation [age#8L,name#9] json
    
    == Physical Plan ==
    FileScan json [age#8L,name#9] Batched: false, DataFilters: [], Format: JSON, Location: InMemoryFileIndex(1 paths)[file:/D:/code/spark/spark-3.4.2/examples/src/main/resources/people.json], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<age:bigint,name:string>
    
  6. 由RuleExcutor.execute(plan),按照batches顺序和Batch内的rules顺序,对传入的树节点节点应用对应规则。转换成为完全类型的对象。

    Analyzer RuleExecutor Rule QueryExecutionMetering execute(plan) 创建QueryExecutionMetering queryExecutionMeter实例 用来记录解析优化的指标: 1.执行了哪些Rule与耗时 2.执行了哪些有效Rule与耗时 依次执行batchs中的每一个batch,每个batch类rules也是依次执行。 iteration = 1 iteration记录batch的执行次数 apply(plan: TreeType) result = appliedLogicalPlan incNumEffectiveExecution(rule.ruleName) 记录有效执行rule的名称 incTimeEffectiveExecutionBy(rule.ruleName, runTime) 记录有效执行rule的耗时 alt [!result.fastEquals(plan)] incExecutionTimeBy(rule.ruleName, runTime) 记录执行rule的名称与耗时 incNumExecution(rule.ruleName) 记录执行有效rule的个数 loop [batch.rules.foldLeft(curPlan)] iteration += 1 incNumEffectiveExecution(rule.ruleName) 记录有效执行rule的名称 incTimeEffectiveExecutionBy(rule.ruleName, runTime) 记录有效执行rule的耗时 alt [iteration > batch.strategy.maxIterations] 若batch执行的次数达到最大次数,则不再继续执行batch loop [while(continue)] loop [batches.foreach] analyzedLogicalPlan Analyzer RuleExecutor Rule QueryExecutionMetering 流程

详细代码

    spark.sql("SELECT * FROM global_temp.people").show();

SparkSession

  @Experimental
  def sql(sqlText: String, args: Map[String, Any]): DataFrame = withActive {
    // 查询计划追踪器
    val tracker = new QueryPlanningTracker
    val plan = tracker.measurePhase(QueryPlanningTracker.PARSING) {
      // 解析计划--AbstractSqlParser来实现
      val parsedPlan = sessionState.sqlParser.parsePlan(sqlText)
      if (args.nonEmpty) {
        ParameterizedQuery(parsedPlan, args.mapValues(lit(_).expr).toMap)
      } else {
        parsedPlan
      }
    }
    // 将执行结果以DataFrame返回
    Dataset.ofRows(self, plan, tracker)
  }

  private[sql] def withActive[T](block: => T): T = {
    // 直接使用线程本地的活跃会话,以确保我们得到的会话实际上是事实上设置的而不是默认的会话。
    // 这是为了防止一旦我们都搞完之后把默认会话升级到了活跃会话。
    val old = SparkSession.activeThreadSession.get()
    SparkSession.setActiveSession(this)
    try block finally {
      SparkSession.setActiveSession(old)
    }
  }

AbstractSqlParser

  /** 为sql字符串创建逻辑计划 */
  override def parsePlan(sqlText: String): LogicalPlan = parse(sqlText) { parser =>
    val ctx = parser.singleStatement()
    withOrigin(ctx, Some(sqlText)) {
      // 获取构造器AstBuilder,将ParseTree转换为AST(visit模式)
      astBuilder.visitSingleStatement(ctx) match {
        case plan: LogicalPlan => plan
        case _ =>
          val position = Origin(None, None)
          throw QueryParsingErrors.sqlStatementUnsupportedError(sqlText, position)
      }
    }
  }

  protected def astBuilder: AstBuilder

  protected def parse[T](command: String)(toResult: SqlBaseParser => T): T = {
    logDebug(s"Parsing command: $command")
    // 将sql内容转换成字符流,并且转换成大写形式。 词法解析器
    val lexer = new SqlBaseLexer(new UpperCaseCharStream(CharStreams.fromString(command)))
    // 清空识别错误的监听器
    lexer.removeErrorListeners()
    // ParseErrorListener将解析错误转换为 AnalysisException。 ParseException继承AnalysisException
    lexer.addErrorListener(ParseErrorListener)

    // 使用指定的令牌源和默认令牌通道 构造一个新的 CommonTokenStream
    val tokenStream = new CommonTokenStream(lexer)
    // SqlBaseParser进行语法解析
    val parser = new SqlBaseParser(tokenStream)
    // 语法解析器的后置处理监听器 专门用来验证并清理解析树
    parser.addParseListener(PostProcessor)
    // 语法解析器的后置处理监听器用来检查未闭合括号的注释的
    parser.addParseListener(UnclosedCommentProcessor(command, tokenStream))
    // 清空错误处理监听器
    parser.removeErrorListeners()
    // 添加自定义的编译错误监听器
    parser.addErrorListener(ParseErrorListener)
    // 错误处理器SparkParserErrorStrategy
    parser.setErrorHandler(new SparkParserErrorStrategy())
    // 配置项spark.sql.legacy.setopsPrecedence.enabled,默认false。
    // 控制解析计算顺序,顺序未指定时,true时,将按照查询中出现的顺序从左到右执行集合操作。
    // false时,INTERSECT操作将在任何UNION、EXCEPT和MINUS操作之前执行。
    parser.legacy_setops_precedence_enabled = conf.setOpsPrecedenceEnforced
    // 配置项 spark.sql.legacy.exponentLiteralAsDecimal.enabled,默认false
    // 控制指数数字的转换类型。为true时,带有指数的字面值(例如1E-30)将被解析为Decimal而不是Double。
    parser.legacy_exponent_literal_as_decimal_enabled = conf.exponentLiteralAsDecimalEnabled
    // 配置项spark.sql.ansi.enforceReservedKeywords,默认值false。
    // spark.sql.ansi.enabled ,若SPARK_ANSI_SQL_MODE为true则开启。 v3.0.0
    // 当true和ANSI_ENABLED为true时,Spark SQL解析器强制使用ANSI保留关键字,并禁止使用保留关键字作为表、视图、函数等的别名和/或标识符的SQL查询。
    parser.SQL_standard_keyword_behavior = conf.enforceReservedKeywords
    // 配置项 spark.sql.ansi.doubleQuotedIdentifiers,默认值false。
    // 当为true且ANSI_ENABLED为true时,Spark SQL读取用双引号(\")括起来的字面值作为标识符。当为false时,它们被读取为字符串字面值。
    parser.double_quoted_identifiers = conf.doubleQuotedIdentifiers

    try {
      try {
        // 首次解析,使用快速模式SSL模式
        parser.getInterpreter.setPredictionMode(PredictionMode.SLL)
        toResult(parser)
      }
      catch {
        case e: ParseCancellationException =>
          tokenStream.seek(0) // 重置输入流 准备重试
          parser.reset()

          //SLL模式解析失败,则转为LL模式再次尝试解析
          parser.getInterpreter.setPredictionMode(PredictionMode.LL)
          toResult(parser)
      }
    }
    catch {
      case e: ParseException if e.command.isDefined =>
        throw e
      case e: ParseException =>
        throw e.withCommand(command)
      case e: AnalysisException =>
        val position = Origin(e.line, e.startPosition)
        throw new ParseException(Option(command), e.message, position, position,
          e.errorClass, e.messageParameters)
    }
  }
}

Dataset

  /** ofRows的一种变体,它允许传入跟踪器,这样我们就可以跟踪查询解析时间。 */
  def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan, tracker: QueryPlanningTracker)
    : DataFrame = sparkSession.withActive {
    // 使用sparksession,逻辑计划,跟踪器,构建查询查询执行
    val qe = new QueryExecution(sparkSession, logicalPlan, tracker)
    // 断言式分析,进行逻辑计划的分析执行
    qe.assertAnalyzed()
    // 构建一个新的Dataset
    new Dataset[Row](qe, RowEncoder(qe.analyzed.schema))
  }

QueryExecution

  def assertAnalyzed(): Unit = analyzed

  lazy val analyzed: LogicalPlan = executePhase(QueryPlanningTracker.ANALYSIS) {
    // We can't clone `logical` here, which will reset the `_analyzed` flag.
    sparkSession.sessionState.analyzer.executeAndCheck(logical, tracker)
  }

Analyzer

  def executeAndCheck(plan: LogicalPlan, tracker: QueryPlanningTracker): LogicalPlan = {
    if (plan.analyzed) return plan
    AnalysisHelper.markInAnalyzer {
      val analyzed = executeAndTrack(plan, tracker)
      try {
        // 检查是否可以正常分析,若不行,则异常抛出
        checkAnalysis(analyzed)
        analyzed
      } catch {
        case e: AnalysisException =>
          val ae = e.copy(plan = Option(analyzed))
          ae.setStackTrace(e.getStackTrace)
          throw ae
      }
    }
  }

RuleExecutor

  /**
   * 执行由子类定义的规则批次,并使用提供的跟踪器跟踪每个规则的计时信息。 跳转到Analyzer.execute方法,最后又跳转到RuleExecutor.execute方法
   */
  def executeAndTrack(plan: TreeType, tracker: QueryPlanningTracker): TreeType = {
    QueryPlanningTracker.withTracker(tracker) {
      execute(plan)
    }
  }

  /**
   * 执行由子类定义的规则批次。批处理使用定义的执行策略连续执行。在每个批处理中,规则也是连续执行的。Analyzer中的batches: Seq[Batch]
   */
  def execute(plan: TreeType): TreeType = {
    var curPlan = plan
    val queryExecutionMetrics = RuleExecutor.queryExecutionMeter
    val planChangeLogger = new PlanChangeLogger[TreeType]()
    val tracker: Option[QueryPlanningTracker] = QueryPlanningTracker.get
    val beforeMetrics = RuleExecutor.getCurrentMetrics()

    val enableValidation = SQLConf.get.getConf(SQLConf.PLAN_CHANGE_VALIDATION)
    // 验证初始输入plan。
    if (Utils.isTesting || enableValidation) {
      validatePlanChanges(plan, plan) match {
        case Some(msg) =>
          val ruleExecutorName = this.getClass.getName.stripSuffix("$")
          throw new SparkException(
            errorClass = "PLAN_VALIDATION_FAILED_RULE_EXECUTOR",
            messageParameters = Map("ruleExecutor" -> ruleExecutorName, "reason" -> msg),
            cause = null)
        case _ =>
      }
    }

    batches.foreach { batch =>
      val batchStartPlan = curPlan
      var iteration = 1
      var lastPlan = curPlan
      var continue = true

      // 运行直到固定点(或策略中指定的最大迭代次数)。
      while (continue) {
        curPlan = batch.rules.foldLeft(curPlan) {
          case (plan, rule) =>
            val startTime = System.nanoTime()
            val result = rule(plan)
            val runTime = System.nanoTime() - startTime
            val effective = !result.fastEquals(plan)

            if (effective) {
              queryExecutionMetrics.incNumEffectiveExecution(rule.ruleName)
              queryExecutionMetrics.incTimeEffectiveExecutionBy(rule.ruleName, runTime)
              planChangeLogger.logRule(rule.ruleName, plan, result)
              //在每个规则之后运行计划更改验证。
              if (Utils.isTesting || enableValidation) {
                validatePlanChanges(plan, result) match {
                  case Some(msg) =>
                    throw new SparkException(
                      errorClass = "PLAN_VALIDATION_FAILED_RULE_IN_BATCH",
                      messageParameters = Map(
                        "rule" -> rule.ruleName,
                        "batch" -> batch.name,
                        "reason" -> msg),
                      cause = null)
                  case _ =>
                }
              }
            }
            queryExecutionMetrics.incExecutionTimeBy(rule.ruleName, runTime)
            queryExecutionMetrics.incNumExecution(rule.ruleName)

            // 使用QueryPlanningTracker记录计时信息
            tracker.foreach(_.recordRuleInvocation(rule.ruleName, runTime, effective))

            result
        }
        iteration += 1
        if (iteration > batch.strategy.maxIterations) {
          // 只有当这条规则应该运行不止一次时才记录日志。
          if (iteration != 2) {
            val endingMsg = if (batch.strategy.maxIterationsSetting == null) {
              "."
            } else {
              s", please set '${batch.strategy.maxIterationsSetting}' to a larger value."
            }
            val message = s"Max iterations (${iteration - 1}) reached for batch ${batch.name}" +
              s"$endingMsg"
            if (Utils.isTesting || batch.strategy.errorOnExceed) {
              throw new RuntimeException(message)
            } else {
              logWarning(message)
            }
          }
          // 检查一次批次的幂等性
          if (batch.strategy == Once &&
            Utils.isTesting && !excludedOnceBatches.contains(batch.name)) {
            checkBatchIdempotence(batch, curPlan)
          }
          continue = false
        }

        if (curPlan.fastEquals(lastPlan)) {
          logTrace(
            s"Fixed point reached for batch ${batch.name} after ${iteration - 1} iterations.")
          continue = false
        }
        lastPlan = curPlan
      }

      planChangeLogger.logBatch(batch.name, batchStartPlan, curPlan)
    }
    planChangeLogger.logMetrics(RuleExecutor.getCurrentMetrics() - beforeMetrics)

    curPlan
  }

CheckAnalysis

  def checkAnalysis(plan: LogicalPlan): Unit = {
    // 内联查询计划中的所有CTE(Common Table Expressions,公用表表达式)
    val inlineCTE = InlineCTE(alwaysInline = true)
    val cteMap = mutable.HashMap.empty[Long, (CTERelationDef, Int)]
    inlineCTE.buildCTEMap(plan, cteMap)
    cteMap.values.foreach { case (relation, refCount) =>
      // 如果从未使用过CTE关系,它将在inline之后消失。这里我们显式地检查它的分析,以确保整个查询计划是有效的。
      if (refCount == 0) checkAnalysis0(relation.child)
    }
    // 内联计划中的所有cte,以帮助检查子查询中的查询计划结构。
    checkAnalysis0(inlineCTE(plan))
    // 递归地将此计划树中的所有节点标记为已分析
    plan.setAnalyzed()
  }

new Dataset[Row](qe, RowEncoder(qe.analyzed.schema)),其中RowEncoder用于构造将外部行与Spark SQL内部二进制表示进行转换的编码器。

附录

CTE简述

SQL解析器

在SparkSQL中词法解析器SqlBaseLexer和语法解析器SqlBaseParser,遍历节点有两种模式Listener和Visitor。

Listener模式是被动式遍历,antlr生成类ParseTreeListener,这个类里面包含了所有进入语法树中每个节点和退出每个节点时要进行的操作。我们只需要实现我们需要的节点事件逻辑代码即可,再实例化一个遍历类ParseTreeWalker,antlr会自上而下的遍历所有节点,以完成我们的逻辑处理。

Visitor则是主动遍历模式,需要我们显示的控制遍历的顺序。该模式可以实现在不改变各元素的类的前提下定义作用于这些元素的新操作。SparkSql用的就是此方式来遍历节点的。

QueryPlanningTracker

查询计划执行跟踪器

跟踪计划中阶段运行时间和相关统计信息的简单工具

  1. 阶段:这些是查询规划中的广泛阶段,如下所示,即analysis、optimization和physical planning(只是规划)。
  2. 规则:这些是我们跟踪的单个Catalyst规则。除执行耗时外,我们还跟踪调用次数和有效调用次数。

类的主要函数

特质类ParserInterface中主要方法

  • 解析成逻辑计划
  • 解析成表达式
  • 解析成表标识符
  • 解析成函数标识符
  • 解析成多分区标识符
  • 解析成表schema
  • 解析成数据类型
  • 将查询语句解析成逻辑计划

接口类ParseTreeVisitor中的主要访问解析树的方法

  • 访问解析树,并返回用户定义的操作结果。
  • 访问节点的子节点,并返回用户自定义的操作结果。
  • 访问叶子节点,并返回用户自定义的操作结果。
  • 访问错误节点,并返回用户自定义的操作结果。

QueryExecution转化语法树过程

阶段输出

  • == Parsed Logical Plan ==:构造QueryExecution时候传入的LogicalPlan,即AST(语法抽象树),也为Unresolved Logical Plan
  • == Analyzed Logical Plan ==:是通过QueryExecution#analyzed进行处理后得到的结果
  • == Optimized Logical Plan ==:优化逻辑计划
  • == Physical Plan ==:转换成物理计划

注解

  • @Experimental

    面向用户的实验性 API。实验性 API 可能会在 Spark 的次要版本中更改或移除,或被采用为第一类的 Spark API。

    注意:如果在此注释之前有 Scaladoc 注释,则注释的第一行必须是":: 实验:::",不留尾部空行。这是因为一个已知的问题,即 Scaladoc 只显示注释或注释,以先显示者为准

配置项

spark.sql.planChangeLog.level

配置日志级别,用于在应用规则或批处理后记录从原始计划到新计划的更改。取值为tracedebuginfowarnerror。默认的日志级别是trace

Logo

助力广东及东莞地区开发者,代码托管、在线学习与竞赛、技术交流与分享、资源共享、职业发展,成为松山湖开发者首选的工作与学习平台

更多推荐