前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >sparksql源码系列 | 一文搞懂with one count distinct 执行原理

sparksql源码系列 | 一文搞懂with one count distinct 执行原理

作者头像
数据仓库践行者
发布于 2022-06-09 13:34:49
发布于 2022-06-09 13:34:49
1.2K00
代码可运行
举报
运行总次数:0
代码可运行

今天下午的源码课,主要是对上两次课程中留的作业的讲解,除了几个逻辑执行计划的优化器外, 重点是planAggregateWithOneDistinct(有一个count distinct情况下生成物理执行计划的原理)。

在面试时,或多或少会被问到有关count distinct的优化,现在离线任务用到的基本就是hivesql和sparksql,那sparksql中有关count distinct做了哪些优化呢?

实际上sparksql中count distinct执行原理可以从两个点来说明:

  • with one count distinct
  • more than one count distinct

这篇文章主要聊一聊 with one count distinct,如果你正好也想了解这块,就点赞、收藏吧

本文基于spark 3.2

本文大纲

1、Aggregate函数的几种mode2、生成WithOneDistinct物理执行计划的几个阶段3、除了count distinct,没有其他非distinct聚合函数的情况的执行原理4、除了count distinct,有其他非distinct聚合函数的情况的执行原理5、关键点调试6、总结

1、Aggregate函数的几种mode

Partial: 局部数据的聚合。会根据读入的原始数据更新对应的聚合缓冲区,当处理完所有的输入数据后,返回的是局部聚合的结果

PartialMerge: 主要是对Partial返回的聚合缓冲区(局部聚合结果)进行合并,但此时仍不是最终结果,还要经过Final才是最终结果(count distinct 类型)

Final: 起到的作用是将聚合缓冲区的数据进行合并,然后返回最终的结果

Complete: 不进行局部聚合计算,应用在不支持Partial模式的聚合函数上(比如求百分位percentile_approx)

非distinct类的聚合函数的路线Partial --> Final

distinct类的聚合函数的路线:Partial --> PartialMerge --> Partial --> Final

2、生成WithOneDistinct物理执行计划的几个阶段

  • partialAggregate
  • partialMergeAggregate
  • partialDistinctAggregate
  • finalAggregate

3、没有其他非distinct聚合函数的情况下执行原理

sql:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
select a,count(distinct b )  from testdata2 group by a

Optimized Logical Plan-->Physical Plan-->executedPlan:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
== Optimized Logical Plan ==
Aggregate [a#3], [a#3, count(distinct b#4) AS count(DISTINCT b)#11L]
+- SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).a AS a#3, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).b AS b#4]
   +- ExternalRDD [obj#2]

== Physical Plan ==
HashAggregate(keys=[a#3], functions=[count(distinct b#4)], output=[a#3, count(DISTINCT b)#11L])
+- HashAggregate(keys=[a#3], functions=[partial_count(distinct b#4)], output=[a#3, count#16L])
   +- HashAggregate(keys=[a#3, b#4], functions=[], output=[a#3, b#4])
      +- HashAggregate(keys=[a#3, b#4], functions=[], output=[a#3, b#4])
         +- SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).a AS a#3, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).b AS b#4]
            +- Scan[obj#2]     

== executedPlan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[a#3], functions=[count(distinct b#4)], output=[a#3, count(DISTINCT b)#11L])
   +- Exchange hashpartitioning(a#3, 5), ENSURE_REQUIREMENTS, [id=#28]
      +- HashAggregate(keys=[a#3], functions=[partial_count(distinct b#4)], output=[a#3, count#16L])
         +- HashAggregate(keys=[a#3, b#4], functions=[], output=[a#3, b#4])
            +- Exchange hashpartitioning(a#3, b#4, 5), ENSURE_REQUIREMENTS, [id=#24]
               +- HashAggregate(keys=[a#3, b#4], functions=[], output=[a#3, b#4])
                  +- SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).a AS a#3, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).b AS b#4]
                     +- Scan[obj#2]

四个阶段的运行原理:

4、有其他非distinct聚合函数的情况下执行原理

sql:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
select a,count(distinct b),max(b) from testdata2 group by a

Optimized Logical Plan-->Physical Plan-->executedPlan:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
== Optimized Logical Plan ==
Aggregate [a#3], [a#3, count(distinct b#4) AS count(DISTINCT b)#12L, max(b#4) AS max(b)#13]
+- SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).a AS a#3, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).b AS b#4]
   +- ExternalRDD [obj#2]

== Physical Plan ==
HashAggregate(keys=[a#3], functions=[max(b#4), count(distinct b#4)], output=[a#3, count(DISTINCT b)#12L, max(b)#13])
+- HashAggregate(keys=[a#3], functions=[merge_max(b#4), partial_count(distinct b#4)], output=[a#3, max#18, count#21L])
   +- HashAggregate(keys=[a#3, b#4], functions=[merge_max(b#4)], output=[a#3, b#4, max#18])
      +- HashAggregate(keys=[a#3, b#4], functions=[partial_max(b#4)], output=[a#3, b#4, max#18])
         +- SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).a AS a#3, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).b AS b#4]
            +- Scan[obj#2]

== executedPlan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[a#3], functions=[max(b#4), count(distinct b#4)], output=[a#3, count(DISTINCT b)#12L, max(b)#13])
   +- Exchange hashpartitioning(a#3, 5), ENSURE_REQUIREMENTS, [id=#28]
      +- HashAggregate(keys=[a#3], functions=[merge_max(b#4), partial_count(distinct b#4)], output=[a#3, max#18, count#21L])
         +- HashAggregate(keys=[a#3, b#4], functions=[merge_max(b#4)], output=[a#3, b#4, max#18])
            +- Exchange hashpartitioning(a#3, b#4, 5), ENSURE_REQUIREMENTS, [id=#24]
               +- HashAggregate(keys=[a#3, b#4], functions=[partial_max(b#4)], output=[a#3, b#4, max#18])
                  +- SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).a AS a#3, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).b AS b#4]
                     +- Scan[obj#2]

四个阶段的运行原理:

5、关键点调试

distinctAggregateExpressions-->带distinct聚合函数的表达式

distinctAggregateAttributes-->带distinct聚合函数的引用

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
val (distinctAggregateExpressions, distinctAggregateAttributes) =
        rewrittenDistinctFunctions.zipWithIndex.map { case (func, i) =>
          // We rewrite the aggregate function to a non-distinct aggregation because
          // its input will have distinct arguments.
          // We just keep the isDistinct setting to true, so when users look at the query plan,
          // they still can see distinct aggregations.
          val expr = AggregateExpression(func, Partial, isDistinct = true)
          // Use original AggregationFunction to lookup attributes, which is used to build
          // aggregateFunctionToAttribute
          val attr = functionsWithDistinct(i).resultAttribute
          (expr, attr)
      }.unzip

debug结果:

6、总结

我们对hive的count(distinct)做优化,怎么做? 先group by,再count

Sparksql with one count(distinct) 的情况,相比于hive来说,做了优化

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制

select a,count(distinct b)  from testdata2 group by a 
等价于
select a,count(b) from (
select a,b  from testdata2 group by a,b
) tmp  group by a

HashAggregate(keys=[a#3], functions=[count(distinct b#4)], output=[a#3, count(DISTINCT b)#11L])
+- HashAggregate(keys=[a#3], functions=[partial_count(distinct b#4)], output=[a#3, count#16L])
   +- HashAggregate(keys=[a#3, b#4], functions=[], output=[a#3, b#4])
      +- HashAggregate(keys=[a#3, b#4], functions=[], output=[a#3, b#4])
         +- SerializeFromObject 
            +- Scan[obj#2]  
----------------------------------------------------------------------------
HashAggregate(keys=[a#3], functions=[count(1)], output=[a#3, count(b)#11L])
+- HashAggregate(keys=[a#3], functions=[partial_count(1)], output=[a#3, count#15L])
   +- HashAggregate(keys=[a#3, b#4], functions=[], output=[a#3])   
      +- HashAggregate(keys=[a#3, b#4], functions=[], output=[a#3, b#4])
         +- SerializeFromObject 
            +- Scan[obj#2]

大家一定要觉醒一件事,那就是,我们一定要有一个提升自己的办法。

业务能力的提升、自身软能力的提升、技术能力的提升等。

精读源码,是一种有效的修炼技术内功的方式~~

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-05-14,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 数据仓库践行者 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
再来说说sparksql中count(distinct)原理和优化手段吧~
元旦前一周到现在总共接到9个sparksql相关的优化咨询,这些案例中,有4个和count(distinct)有关。
数据仓库践行者
2024/01/16
2.1K0
再来说说sparksql中count(distinct)原理和优化手段吧~
SparkSql不同写法的一些坑(性能优化)
这种情况也是我经常会遇到的一个场景,之前也有同学拿着sql来问,说这样写会不会影响运行效率:
数据仓库践行者
2022/11/25
8461
SparkSql全代码生成规则梳理-CollapseCodegenStages
火山模型(迭代器模型), 是1994年 Goetz Graefe 在他的论文 《Volcano, An Extensible and Parallel Query Evaluation System》中提出的概念。
数据仓库践行者
2022/11/25
1.4K0
SparkSql全代码生成规则梳理-CollapseCodegenStages
sparksql源码系列 | 一文搞懂Distribution源码体系(spark3.2)
这篇文章主要介绍sparksql中Distribution的源码体系,Distribution是我们理解Physical Plan、executed Plan、shuffle、SparkSQL的AQE机制等的一个比较基础的知识点。
数据仓库践行者
2022/06/09
1.3K0
sparksql源码系列 | 一文搞懂Distribution源码体系(spark3.2)
SparkSql窗口函数源码分析(第一部分)
WindowExpression :描述该expression是一个windowExpression,继承BinaryLike,是一个二元树。
数据仓库践行者
2022/11/25
1.2K0
SparkSql窗口函数源码分析(第一部分)
Spark Aggregations execution
包含 distinct 关键字的 aggregation 由 4 个物理执行步骤组成。我们使用以下 query 来介绍:
codingforfun
2020/03/26
2.9K0
sparksql源码系列 | ResolveReferences规则count(*)详解
主要看Project [*] 是怎么转化为 Project [a#3, b#4] 的,ResolveReferences 规则的作用在源码共读分享上说过了:
数据仓库践行者
2022/06/09
5680
sparksql源码系列 | ResolveReferences规则count(*)详解
spark sql多维分析优化——细节是魔鬼
这是一张广告竞价的业务表,每一条请求 request_id 都会产生一条数据,一天下来,数据量是很大的(几十亿)。 然而,又要对 7个维度做成22个组合,分别求 count(distinct request_id) , count(distinct deviceid), count(distinct case when bid_response_nbr=10001 then bid_response_id else null end) ,count(distinct case when bid_response_nbr=10001 then deviceid else null end) 。 只能说,需求好无耻啊 啊 啊 啊
数据仓库践行者
2020/04/20
4.4K0
Spark sql 生成PhysicalPlan(源码详解)
QueryExecution.createSparkPlan -> (SparkPlanner.plan)SparkStrategies.plan ->QueryPlanner.plan
数据仓库践行者
2020/11/09
1.1K0
Spark sql 生成PhysicalPlan(源码详解)
额,关于笛卡尔积CartesianProduct
如果这样理解的话,就会很矛盾,笛卡尔积的依赖中,一个父RDD的分区明明被多个子RDD的分区消费了,可它是窄依赖
数据仓库践行者
2022/11/25
5860
额,关于笛卡尔积CartesianProduct
sparksql比hivesql优化的点(窗口函数)
有时候,一个 select 语句中包含多个窗口函数,它们的窗口定义(OVER 子句)可能相同、也可能不同。
数据仓库践行者
2020/04/18
1.5K0
TiSpark 原理之下推丨TiDB 工具分享
TiSpark 是 PingCAP 为解决用户复杂 OLAP 需求而推出的产品。它通过 Spark 提供的拓展机制与内置的 TiKV Client Java,在 Spark 之上直连 TiKV 进行读写,具有事务性读取、事务性写入与删除等能力。其中在事务性读取中基于 Spark Extension 实现了下推(详情可见 TiSpark 用户指南)。
PingCAP
2022/08/11
4580
spark、hive中窗口函数实现原理复盘
这篇文章从一次业务中遇到的问题出发,深入聊了聊hsql中窗口函数的数据流转原理,在文章最后针对这个问题给出解决方案。
数据仓库践行者
2020/04/20
3.2K0
TiDB 源码阅读系列文章(二十二)Hash Aggregation
在 SQL 中,聚合操作对一组值执行计算,并返回单个值。TiDB 实现了 2 种聚合算法:Hash Aggregation 和 Stream Aggregation。
PingCAP
2018/12/21
2.4K0
SparkSql序列化时列的ID是在哪里生成的呢?
sparksql生成解析后的逻辑执行计划时,会通过catalog把各个字段和元数据库绑定,也就说在ResolveLogical的阶段的字段是带了id的:
数据仓库践行者
2022/11/25
8110
SparkSql序列化时列的ID是在哪里生成的呢?
面试 | 你真的了解count(*)和count(1)嘛?
先给结论,在spark sql中count(*)不管在运行效率方面,还是在最终展示结果方面 都等同于count(1)。
数据仓库践行者
2022/04/18
6770
面试 | 你真的了解count(*)和count(1)嘛?
SparkSql源码成神之路
快来加入我的源码学习社群吧,在社群的长期陪伴下,解决你在学习路上遇到的点点滴滴的问题~~
数据仓库践行者
2022/11/24
1.1K0
SparkSql源码成神之路
SparkSQL的两种UDAF的讲解
Spark的dataframe提供了通用的聚合方法,比如count(),countDistinct(),avg(),max(),min()等等。然而这些函数是针对dataframe设计的,当然sparksql也有类型安全的版本,java和scala语言接口都有,这些就适用于强类型Datasets。本文主要是讲解spark提供的两种聚合函数接口:
Spark学习技巧
2018/08/01
2.7K0
Spark SQL 快速入门系列(6) | 一文教你如何自定义 SparkSQL 函数
强类型的Dataset和弱类型的DataFrame都提供了相关的聚合函数, 如 count(),countDistinct(),avg(),max(),min()。除此之外,用户可以设定自己的自定义聚合函数
不温卜火
2020/10/28
1.5K0
Spark SQL 快速入门系列(6) | 一文教你如何自定义 SparkSQL 函数
Spark源码系列(九)Spark SQL初体验之解析过程详解
好久没更新博客了,之前学了一些R语言和机器学习的内容,做了一些笔记,之后也会放到博客上面来给大家共享。一个月前就打算更新Spark Sql的内容了,因为一些别的事情耽误了,今天就简单写点,Spark1.2马上就要出来了,不知道变动会不会很大,据说添加了很多的新功能呢,期待中... 首先声明一下这个版本的代码是1.1的,之前讲的都是1.0的。 Spark支持两种模式,一种是在spark里面直接写sql,可以通过sql来查询对象,类似.net的LINQ一样,另外一种支持hive的HQL。不管是哪种方式,下面提到
岑玉海
2018/02/28
1.8K0
Spark源码系列(九)Spark SQL初体验之解析过程详解
相关推荐
再来说说sparksql中count(distinct)原理和优化手段吧~
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验