【Spark源码分析】基于Spark3.4.2源码分析SparkSQL执行过程
** 为sql字符串创建逻辑计划 */// 获取构造器AstBuilder,将ParseTree转换为AST(visit模式)case _ =>command")// 将sql内容转换成字符流,并且转换成大写形式。词法解析器// 清空识别错误的监听器// ParseErrorListener将解析错误转换为 AnalysisException。ParseException继承AnalysisExc
基于Spark3.4.2源码分析SparkSQL执行过程
文章目录
基本执行流程
Spark SQL运行架构简图
- 将SQL语句解析成未绑定的逻辑计划(未解决的关系Relation、函数Function、属性Attribute)
- 配合元数据信息,完善未绑定的逻辑计划的属性转换成绑定的逻辑计划,将元数据信息与catalog绑定。
- 使用优化规则生成优化逻辑计划
- 将优化的逻辑计划转换成可执行的物理计划
- 使用preparations规则进行预处理后,使用SparkPlan的execute执行rdd
Spark SQL四大执行阶段简图
Unresolved逻辑计划树相关类
RuleExector相关类
解析语法树绑定,使用rule绑定逻辑计划
-
在SparkSession中创建SessionState实例对象
-
使用SessionState中的转换接口,调用
AbstractSqlParser.parsePlan
, -
基于基础 SQL 解析基础架构,使用到了ANTLR4进行此法解析。使用由插件分别生成词法解析器
SqlBaseLexer
和语法解析器SqlBaseParser
,尝试使用Antlr4最快解析模式PredictionMode.SLL
解析后,解析失败会在使用PredictionMode.LL
模式解析,对解析结果进行函数柯理化。将sql的内容先由词法解析器解析成token序列,传入语法解析器解析。 -
使用Visitor模式进行遍历,参照SqlBase.g4的语法树进行匹配,产生逻辑计划(未绑定)。
UnresolvedRelation
语法树。 -
构建QueryExecution,通过查询执行器将Unresolved LogicalPlan一路变成costmodel。
以
select * from people
语句为例,其转化过程如下:== Parsed Logical Plan == 'Project [*] +- 'UnresolvedRelation [global_temp, people], [], false == Analyzed Logical Plan == age: bigint, name: string Project [age#8L, name#9] +- SubqueryAlias global_temp.people +- View (`global_temp`.`people`, [age#8L,name#9]) +- Relation [age#8L,name#9] json == Optimized Logical Plan == Relation [age#8L,name#9] json == Physical Plan == FileScan json [age#8L,name#9] Batched: false, DataFilters: [], Format: JSON, Location: InMemoryFileIndex(1 paths)[file:/D:/code/spark/spark-3.4.2/examples/src/main/resources/people.json], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<age:bigint,name:string>
-
由RuleExcutor.execute(plan),按照batches顺序和Batch内的rules顺序,对传入的树节点节点应用对应规则。转换成为完全类型的对象。
详细代码
spark.sql("SELECT * FROM global_temp.people").show();
SparkSession
@Experimental
def sql(sqlText: String, args: Map[String, Any]): DataFrame = withActive {
// 查询计划追踪器
val tracker = new QueryPlanningTracker
val plan = tracker.measurePhase(QueryPlanningTracker.PARSING) {
// 解析计划--AbstractSqlParser来实现
val parsedPlan = sessionState.sqlParser.parsePlan(sqlText)
if (args.nonEmpty) {
ParameterizedQuery(parsedPlan, args.mapValues(lit(_).expr).toMap)
} else {
parsedPlan
}
}
// 将执行结果以DataFrame返回
Dataset.ofRows(self, plan, tracker)
}
private[sql] def withActive[T](block: => T): T = {
// 直接使用线程本地的活跃会话,以确保我们得到的会话实际上是事实上设置的而不是默认的会话。
// 这是为了防止一旦我们都搞完之后把默认会话升级到了活跃会话。
val old = SparkSession.activeThreadSession.get()
SparkSession.setActiveSession(this)
try block finally {
SparkSession.setActiveSession(old)
}
}
AbstractSqlParser
/** 为sql字符串创建逻辑计划 */
override def parsePlan(sqlText: String): LogicalPlan = parse(sqlText) { parser =>
val ctx = parser.singleStatement()
withOrigin(ctx, Some(sqlText)) {
// 获取构造器AstBuilder,将ParseTree转换为AST(visit模式)
astBuilder.visitSingleStatement(ctx) match {
case plan: LogicalPlan => plan
case _ =>
val position = Origin(None, None)
throw QueryParsingErrors.sqlStatementUnsupportedError(sqlText, position)
}
}
}
protected def astBuilder: AstBuilder
protected def parse[T](command: String)(toResult: SqlBaseParser => T): T = {
logDebug(s"Parsing command: $command")
// 将sql内容转换成字符流,并且转换成大写形式。 词法解析器
val lexer = new SqlBaseLexer(new UpperCaseCharStream(CharStreams.fromString(command)))
// 清空识别错误的监听器
lexer.removeErrorListeners()
// ParseErrorListener将解析错误转换为 AnalysisException。 ParseException继承AnalysisException
lexer.addErrorListener(ParseErrorListener)
// 使用指定的令牌源和默认令牌通道 构造一个新的 CommonTokenStream
val tokenStream = new CommonTokenStream(lexer)
// SqlBaseParser进行语法解析
val parser = new SqlBaseParser(tokenStream)
// 语法解析器的后置处理监听器 专门用来验证并清理解析树
parser.addParseListener(PostProcessor)
// 语法解析器的后置处理监听器用来检查未闭合括号的注释的
parser.addParseListener(UnclosedCommentProcessor(command, tokenStream))
// 清空错误处理监听器
parser.removeErrorListeners()
// 添加自定义的编译错误监听器
parser.addErrorListener(ParseErrorListener)
// 错误处理器SparkParserErrorStrategy
parser.setErrorHandler(new SparkParserErrorStrategy())
// 配置项spark.sql.legacy.setopsPrecedence.enabled,默认false。
// 控制解析计算顺序,顺序未指定时,true时,将按照查询中出现的顺序从左到右执行集合操作。
// false时,INTERSECT操作将在任何UNION、EXCEPT和MINUS操作之前执行。
parser.legacy_setops_precedence_enabled = conf.setOpsPrecedenceEnforced
// 配置项 spark.sql.legacy.exponentLiteralAsDecimal.enabled,默认false
// 控制指数数字的转换类型。为true时,带有指数的字面值(例如1E-30)将被解析为Decimal而不是Double。
parser.legacy_exponent_literal_as_decimal_enabled = conf.exponentLiteralAsDecimalEnabled
// 配置项spark.sql.ansi.enforceReservedKeywords,默认值false。
// spark.sql.ansi.enabled ,若SPARK_ANSI_SQL_MODE为true则开启。 v3.0.0
// 当true和ANSI_ENABLED为true时,Spark SQL解析器强制使用ANSI保留关键字,并禁止使用保留关键字作为表、视图、函数等的别名和/或标识符的SQL查询。
parser.SQL_standard_keyword_behavior = conf.enforceReservedKeywords
// 配置项 spark.sql.ansi.doubleQuotedIdentifiers,默认值false。
// 当为true且ANSI_ENABLED为true时,Spark SQL读取用双引号(\")括起来的字面值作为标识符。当为false时,它们被读取为字符串字面值。
parser.double_quoted_identifiers = conf.doubleQuotedIdentifiers
try {
try {
// 首次解析,使用快速模式SSL模式
parser.getInterpreter.setPredictionMode(PredictionMode.SLL)
toResult(parser)
}
catch {
case e: ParseCancellationException =>
tokenStream.seek(0) // 重置输入流 准备重试
parser.reset()
//SLL模式解析失败,则转为LL模式再次尝试解析
parser.getInterpreter.setPredictionMode(PredictionMode.LL)
toResult(parser)
}
}
catch {
case e: ParseException if e.command.isDefined =>
throw e
case e: ParseException =>
throw e.withCommand(command)
case e: AnalysisException =>
val position = Origin(e.line, e.startPosition)
throw new ParseException(Option(command), e.message, position, position,
e.errorClass, e.messageParameters)
}
}
}
Dataset
/** ofRows的一种变体,它允许传入跟踪器,这样我们就可以跟踪查询解析时间。 */
def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan, tracker: QueryPlanningTracker)
: DataFrame = sparkSession.withActive {
// 使用sparksession,逻辑计划,跟踪器,构建查询查询执行
val qe = new QueryExecution(sparkSession, logicalPlan, tracker)
// 断言式分析,进行逻辑计划的分析执行
qe.assertAnalyzed()
// 构建一个新的Dataset
new Dataset[Row](qe, RowEncoder(qe.analyzed.schema))
}
QueryExecution
def assertAnalyzed(): Unit = analyzed
lazy val analyzed: LogicalPlan = executePhase(QueryPlanningTracker.ANALYSIS) {
// We can't clone `logical` here, which will reset the `_analyzed` flag.
sparkSession.sessionState.analyzer.executeAndCheck(logical, tracker)
}
Analyzer
def executeAndCheck(plan: LogicalPlan, tracker: QueryPlanningTracker): LogicalPlan = {
if (plan.analyzed) return plan
AnalysisHelper.markInAnalyzer {
val analyzed = executeAndTrack(plan, tracker)
try {
// 检查是否可以正常分析,若不行,则异常抛出
checkAnalysis(analyzed)
analyzed
} catch {
case e: AnalysisException =>
val ae = e.copy(plan = Option(analyzed))
ae.setStackTrace(e.getStackTrace)
throw ae
}
}
}
RuleExecutor
/**
* 执行由子类定义的规则批次,并使用提供的跟踪器跟踪每个规则的计时信息。 跳转到Analyzer.execute方法,最后又跳转到RuleExecutor.execute方法
*/
def executeAndTrack(plan: TreeType, tracker: QueryPlanningTracker): TreeType = {
QueryPlanningTracker.withTracker(tracker) {
execute(plan)
}
}
/**
* 执行由子类定义的规则批次。批处理使用定义的执行策略连续执行。在每个批处理中,规则也是连续执行的。Analyzer中的batches: Seq[Batch]
*/
def execute(plan: TreeType): TreeType = {
var curPlan = plan
val queryExecutionMetrics = RuleExecutor.queryExecutionMeter
val planChangeLogger = new PlanChangeLogger[TreeType]()
val tracker: Option[QueryPlanningTracker] = QueryPlanningTracker.get
val beforeMetrics = RuleExecutor.getCurrentMetrics()
val enableValidation = SQLConf.get.getConf(SQLConf.PLAN_CHANGE_VALIDATION)
// 验证初始输入plan。
if (Utils.isTesting || enableValidation) {
validatePlanChanges(plan, plan) match {
case Some(msg) =>
val ruleExecutorName = this.getClass.getName.stripSuffix("$")
throw new SparkException(
errorClass = "PLAN_VALIDATION_FAILED_RULE_EXECUTOR",
messageParameters = Map("ruleExecutor" -> ruleExecutorName, "reason" -> msg),
cause = null)
case _ =>
}
}
batches.foreach { batch =>
val batchStartPlan = curPlan
var iteration = 1
var lastPlan = curPlan
var continue = true
// 运行直到固定点(或策略中指定的最大迭代次数)。
while (continue) {
curPlan = batch.rules.foldLeft(curPlan) {
case (plan, rule) =>
val startTime = System.nanoTime()
val result = rule(plan)
val runTime = System.nanoTime() - startTime
val effective = !result.fastEquals(plan)
if (effective) {
queryExecutionMetrics.incNumEffectiveExecution(rule.ruleName)
queryExecutionMetrics.incTimeEffectiveExecutionBy(rule.ruleName, runTime)
planChangeLogger.logRule(rule.ruleName, plan, result)
//在每个规则之后运行计划更改验证。
if (Utils.isTesting || enableValidation) {
validatePlanChanges(plan, result) match {
case Some(msg) =>
throw new SparkException(
errorClass = "PLAN_VALIDATION_FAILED_RULE_IN_BATCH",
messageParameters = Map(
"rule" -> rule.ruleName,
"batch" -> batch.name,
"reason" -> msg),
cause = null)
case _ =>
}
}
}
queryExecutionMetrics.incExecutionTimeBy(rule.ruleName, runTime)
queryExecutionMetrics.incNumExecution(rule.ruleName)
// 使用QueryPlanningTracker记录计时信息
tracker.foreach(_.recordRuleInvocation(rule.ruleName, runTime, effective))
result
}
iteration += 1
if (iteration > batch.strategy.maxIterations) {
// 只有当这条规则应该运行不止一次时才记录日志。
if (iteration != 2) {
val endingMsg = if (batch.strategy.maxIterationsSetting == null) {
"."
} else {
s", please set '${batch.strategy.maxIterationsSetting}' to a larger value."
}
val message = s"Max iterations (${iteration - 1}) reached for batch ${batch.name}" +
s"$endingMsg"
if (Utils.isTesting || batch.strategy.errorOnExceed) {
throw new RuntimeException(message)
} else {
logWarning(message)
}
}
// 检查一次批次的幂等性
if (batch.strategy == Once &&
Utils.isTesting && !excludedOnceBatches.contains(batch.name)) {
checkBatchIdempotence(batch, curPlan)
}
continue = false
}
if (curPlan.fastEquals(lastPlan)) {
logTrace(
s"Fixed point reached for batch ${batch.name} after ${iteration - 1} iterations.")
continue = false
}
lastPlan = curPlan
}
planChangeLogger.logBatch(batch.name, batchStartPlan, curPlan)
}
planChangeLogger.logMetrics(RuleExecutor.getCurrentMetrics() - beforeMetrics)
curPlan
}
CheckAnalysis
def checkAnalysis(plan: LogicalPlan): Unit = {
// 内联查询计划中的所有CTE(Common Table Expressions,公用表表达式)
val inlineCTE = InlineCTE(alwaysInline = true)
val cteMap = mutable.HashMap.empty[Long, (CTERelationDef, Int)]
inlineCTE.buildCTEMap(plan, cteMap)
cteMap.values.foreach { case (relation, refCount) =>
// 如果从未使用过CTE关系,它将在inline之后消失。这里我们显式地检查它的分析,以确保整个查询计划是有效的。
if (refCount == 0) checkAnalysis0(relation.child)
}
// 内联计划中的所有cte,以帮助检查子查询中的查询计划结构。
checkAnalysis0(inlineCTE(plan))
// 递归地将此计划树中的所有节点标记为已分析
plan.setAnalyzed()
}
new Dataset[Row](qe, RowEncoder(qe.analyzed.schema))
,其中RowEncoder
用于构造将外部行与Spark SQL内部二进制表示进行转换的编码器。
附录
CTE简述
SQL解析器
在SparkSQL中词法解析器SqlBaseLexer和语法解析器SqlBaseParser,遍历节点有两种模式Listener和Visitor。
Listener模式是被动式遍历,antlr生成类ParseTreeListener,这个类里面包含了所有进入语法树中每个节点和退出每个节点时要进行的操作。我们只需要实现我们需要的节点事件逻辑代码即可,再实例化一个遍历类ParseTreeWalker,antlr会自上而下的遍历所有节点,以完成我们的逻辑处理。
Visitor则是主动遍历模式,需要我们显示的控制遍历的顺序。该模式可以实现在不改变各元素的类的前提下定义作用于这些元素的新操作。SparkSql用的就是此方式来遍历节点的。
QueryPlanningTracker
查询计划执行跟踪器
跟踪计划中阶段运行时间和相关统计信息的简单工具
- 阶段:这些是查询规划中的广泛阶段,如下所示,即analysis、optimization和physical planning(只是规划)。
- 规则:这些是我们跟踪的单个Catalyst规则。除执行耗时外,我们还跟踪调用次数和有效调用次数。
类的主要函数
特质类ParserInterface
中主要方法
- 解析成逻辑计划
- 解析成表达式
- 解析成表标识符
- 解析成函数标识符
- 解析成多分区标识符
- 解析成表schema
- 解析成数据类型
- 将查询语句解析成逻辑计划
接口类ParseTreeVisitor
中的主要访问解析树的方法
- 访问解析树,并返回用户定义的操作结果。
- 访问节点的子节点,并返回用户自定义的操作结果。
- 访问叶子节点,并返回用户自定义的操作结果。
- 访问错误节点,并返回用户自定义的操作结果。
QueryExecution转化语法树过程
阶段输出
== Parsed Logical Plan ==
:构造QueryExecution时候传入的LogicalPlan,即AST(语法抽象树),也为Unresolved Logical Plan== Analyzed Logical Plan ==
:是通过QueryExecution#analyzed进行处理后得到的结果== Optimized Logical Plan ==
:优化逻辑计划== Physical Plan ==
:转换成物理计划
注解
-
@Experimental
面向用户的实验性 API。实验性 API 可能会在 Spark 的次要版本中更改或移除,或被采用为第一类的 Spark API。
注意:如果在此注释之前有 Scaladoc 注释,则注释的第一行必须是":: 实验:::",不留尾部空行。这是因为一个已知的问题,即 Scaladoc 只显示注释或注释,以先显示者为准
配置项
spark.sql.planChangeLog.level
配置日志级别,用于在应用规则或批处理后记录从原始计划到新计划的更改。取值为trace
、debug
、info
、warn
、error
。默认的日志级别是trace
。
更多推荐
所有评论(0)