文章大纲
在《20张图详解 Spark SQL 运行原理及数据抽象》的第 5 节“SparkSession”中,我们知道了 Spark SQL 就是基于 SparkSession 作为入口实现的。
在 Spark 2.0 版本之后,SparkSession 封装了 SQLContext 及 HiveContext,实现了后两者的所有功能,并可以获取到 SparkConetxt。
那 Spark SQL 具体的实现方式是怎样的?如何进行使用呢?
下面就带大家一起来认识 Spark SQL 的使用方式,并通过十步操作实战,轻松拿下 Spark SQL 的使用。
1
DataSet 及 DataFrame 的创建
在《20张图详解 Spark SQL 运行原理及数据抽象》的第 4 节“Spark SQL 数据抽象”中,我们认识了 Spark SQL 中的两种数据抽象:DataSet 及 DataFrame。
而在《带你理解 Spark 中的核心抽象概念:RDD》的 2.1 节中,我们认识了如何在 Spark 中创建 RDD,那 DataSet 及 DataFrame 在 Spark SQL 中又是如何进行创建的呢?
DataSet 及 DataFrame 的创建方式有两种:
1.1
使用 Spark 创建函数进行创建
手动定义数据集合,然后通过 Spark 的创建操作函数 createDataset()
、createDataFrame()
, 创建 DataSet、DataFrame:
DataSet:
//DataSet
case class Person(name:String, age:Int, height:Int)
val seq1 = Seq(Person("Michael", 25, 176), Person("Jack", 15, 165))
val ds1 = spark.createDataset(seq1)
ds1.show
使用 Spark 创建操作函数创建 DataSet
DataFrame:
//DataFrame
val seq2 = Seq(("Michael", 25, 176), ("Jack", 15, 165))
val df1 = spark.createDataFrame(seq2).toDF("name", "age", "height")
df1.show
使用 Spark 创建操作函数创建 DataFrame
由于这种方式需要手动定义数据,实际操作中并不常用。
1.2
读取数据源进行创建
Spark SQL 支持的数据源包括:文件、数据库、Hive 等。
1.2.1. 读取文件数据源
Spark SQL 支持的文件类型包括:parquet、text、csv、json、orc 等。
例如读取 Spark 自带的 text 文件:
val sc = spark.sparkContext
val textRDD1 = sc.textFile("file:///opt/modules/spark/examples/src/main/resources/people.txt")
textRDD1.take(5)
Spark SQL 读取文件数据源方式一
或:
val textRDD2 = spark.read.text("file:///opt/modules/spark/examples/src/main/resources/people.txt").rdd
textRDD2.take(5)
Spark SQL 读取文件数据源方式二
两种用法的区别在于返回的数据集类型不一样
sc.textFile(path:String)
返回的数据集类型是:RDD[String]spark.read.text(path:String)
返回的数据集类型是:DataFrame(DataSet[Row])1.2.2. 读取数据库数据源
Spark SQL 支持通过 JDBC 读取外部数据库的数据作为数据源。
以读取 Oracle 数据库为例:
启动 Spark Shell 时,指定 Oracle 数据库的驱动:
spark-shell --master spark://hadoop101:7077 \
--jars /root/temp/ojdbc6.jar \
--driver-class-path /root/temp/ojdbc6.jar
连接数据库,以读取数据库中的数据:
val oracleDF = spark.read.format("jdbc").
option("url","jdbc:oracle:thin:@192.168.100.1:1521/orcl.example.com").
option("dbtable","scott.emp").
option("user","scott").
option("password","test").
load
1.2.3. 使用 Hive 中的数据
Spark SQL 是由 Shark 发展而来的,Shark 其实就是 Hive on Spark。Spark 1.0 版本发布后,才引入了 Spark SQL。
2014 年 7 月 1 日之后,Databricks 宣布终止对 Shark 的开发,将重点放到 Spark SQL 上。
Spark SQL 的具体发展史详见下图:
Spark SQL 发展历史
可见,Spark 原生就对 Hive 的兼容十分友好,且其还内置了 Hive 组件,Spark SQL 可以通过内置 Hive 或者外部 Hive 两种方式读取 Hive 库中的数据。
Spark SQL 具体使用和操作 Hive 数据源的方法将在后续的 Hive 专栏中进行介绍。
2
RDD、DataFrame、DataSet 的共性与转换
在 Spark 中,RDD、DataFrame、DataSet 三种类型的数据集是有一定的共同特性的,因此它们三者之间可以相互进行转换,而且需要转换的场景也较为常见。
2.1
RDD、DataFrame、DataSet 的共性
spark.implicits._
进行支持。2.2
RDD、DataFrame、DataSet 的转换
RDD、DataFrame、DataSet 之间的转换
2.2.1. DataFrame/DataSet 转 RDD
这个转换比较简单,直接调用 rdd 即可将 DataFrame/DataSet 转换为 RDD:
val rdd1 = testDF.rdd
val rdd2 = testDS.rdd
2.2.2. RDD 转 DataFrame
a. 通过编程的方式来设置 Schema,适用于编译器不能确定列的情况:
val peopleRDD = spark.sparkContext.textFile("file:///opt/modules/spark/examples/src/main/resources/people.txt")
val schemaString = "name age"
val filed = schemaString.split(" ").map(filename => org.apache.spark.sql.types.StructField(filename, org.apache.spark.sql.types.StringType, nullable = true))
val schema = org.apache.spark.sql.types.StructType(filed)
peopleRDD.map(_.split(",")).map(para => org.apache.spark.sql.Row(para(0).trim, para(1).trim))
val peopleDF = spark.createDataFrame(res6, schema)
peopleDF.show
b. 用元组把一行的数据写在一起,然后在 toDF()
中指定字段名:
val peopleDF2 = rdd.map(para(para(0).trim(), para(1).trim().toInt)).toDF("name", "age")
peopleDF2.show
c. 定义 case class,通过反射来设置 Schema,使用 toDF
进行转换:
case class Person(name:String, age:Int)
val peopleDF3 = spark.sparkContext.textFile("file:///opt/modules/spark/examples/src/main/resources/people.txt").map(_.split(",")).map(para => Person(para(0).trim, para(1).trim.toInt)).toDF
peopleDF3.show
RDD 转 DataFrame(case class 方式)
2.2.3. RDD 转 DataSet
定义 case class,通过反射来设置 Schema,使用 toDS
进行转换:
case class Person(name:String, age:Int)
val peopleDS = spark.sparkContext.textFile("file:///opt/modules/spark/examples/src/main/resources/people.txt").map(_.split(",")).map(para => Person(para(0).trim, para(1).trim.toInt)).toDS
peopleDS.show
RDD 转 DataSet
2.2.4. DataSet 转 DataFrame
直接调用 toDF
,即可将 DataSet 转换为 DataFrame:
val peopleDF4 = peopleDS.toDF
peopleDF4.show
DataSet 转 DataFrame
2.2.5. DataFrame 转 DataSet
使用 as
方法,as
方法后面跟的是 case class:
val peopleDS2 = peopleDF3.as[Person]
peopleDS2.show
DataFrame 转 DataSet
DataFrame 与 DataSet 均支持 Spark SQL 的算子操作,同时也能进行 SQL 语句操作,下面的实战中会进行演示。
3
Spark SQL 查询方式
Spark SQL 支持两种查询方式:一种是DSL 风格,另外一种是SQL 风格。
3.1
DSL 风格
Spark SQL 提供了一种 DSL(Domain Specified Language,领域专用语言,在语义上与 SQL 关系查询非常相近),以方便操作结构化数据。
使用前需要引入 spark.implicits._
这个隐式转换,以将 DataFrame 隐式转换成 RDD。
3.2
SQL 风格
Spark SQL 的一个强大之处就是我们可以将它看作是一个关系型数据表,然后可以通过在程序中使用 spark.sql()
来执行 SQL 查询,并返回结果数据集。
使用前需要将 DataFrame/DataSet 注册成一张表,注册方式分两种:
1. Local Temporary View
使用 createOrReplaceTempView()
或 createTempView()
方法可以将表注册成 Local Temporary View(局部临时视图),这种方式注册的表只对当前生命周期中的 Session 有效,不能与其它 Session 共享。
2. Global Temporary View
使用 createGlobalTempView()
方法可以将表注册成 Global Temporary View(全局临时视图),这种方式注册的表可以在不同的 Session 中共享,即跨 Session 有效,而且在 Application 的运行周期内可用。
需要注意的是,使用 SQL 语句访问该表时,要加上 global_temp
作为前缀来引用,因为全局临时视图是绑定到系统保留的数据库 global_temp 上的。
下面的实战中会有注册不同类型表区别的实例操作演示。
3.3
Spark SQL 算子
DSL 支持 Spark SQL 算子,且算子十分丰富,下面列举部分算子:
3.3.1. select 相关
a. 列的数据展示有多种表示方法:""
、$""
、'
、col()
、df("")
,注意不要混合使用:
// select
df1.select($"ename", $"age", $"sal").show
df1.select("ename", "age", "sal").show
df1.select('ename, 'age, 'sal).show
df1.select(col("ename"), col("age"), col("sal")).show
df1.select(df1("ename"), df1("age"), df1("sal")).show
b. expr
表达式可以对列进行操作,注意 expr
里面只能使用引号:
// expr表达式
df1.select(expr("age + 1"), expr("sal + 100"), expr("ename")).show
df1.selectExpr("ename as name").show
df1.selectExpr("round(sal, -3) as newsal", "sal", "ename").show
3.3.2. 更改相关
a. drop
可删除一个或多个列,得到新的 DataFrame:
// drop
df1.drop("age").show
df1.drop("age", "sal").show
b. withColumn
可对列值进行更改:
// withColumn
df1.withColumn("sal", $"sal" + 100).show
c. withColumnRenamed
可对列名进行更改:
// withColumnRenamed
df1.withColumnRenamed("sal", "newsal").show
注意:以上操作后,返回的数据集的类型是 DataFrame。
3.3.3. 筛选过滤相关
筛选、过滤的操作可以使用 filter
或 where
算子:
// filter
df1.filter("sal > 10000").show
df1.filter("sal > 10000 and job == 'MANAGER'").show
// where
df1.where("sal > 10000").show
df1.where("sal > 10000 and job == 'MANAGER'").show
3.3.4. 聚集统计相关
使用 groupBy
算子搭配统计方式或 agg
可进行数据统计操作:
// groupBy with sum, min, max, avg, count
df1.groupBy("age").sum("sal").show
df1.groupBy("age").min("sal").show
df1.groupBy("age").max("sal").show
df1.groupBy("age").avg("sal").show
df1.groupBy("age").count.show
// agg
df1.groupBy("age").agg("sal" -> "sum", "sal" -> "min", "sal" -> "max", "sal" -> "avg", "sal" -> "count").show
df1.groupBy("age").agg(sum("sal"), min("sal"), max("sal"), avg("sal"), count("sal")).show
df1.groupBy("age").agg(sum("sal").as("sum1"), min("sal").as("min2"), max("sal").as("max3"), avg("sal").as("avg4"), count("sal").as("count5")).show
3.3.5. 排序相关
使用 orderBy
或 sort
算子可进行排序操作:
// orderBy
df1.orderBy("sal").show
df1.orderBy($"sal").show
df1.orderBy($"sal".asc).show
// 降序
df1.orderBy($"sal".desc).show
df1.orderBy(-'sal).show
df1.orderBy(-'age, -'sal).show
// sort
df1.sort("sal").show
df1.sort($"sal").show
df1.sort($"sal".asc).show
df1.sort($"sal".desc).show
df1.sort(-'sal).show
df1.sort(-'age, -'sal).show
3.3.6. 集合(并、交、差)相关
使用 union
(unionAll
)、intersect
、except
算子可对数据进行并集、交集、差集操作:
// union, unionAll, intersect, except
val ds3 = ds1.select("ename")
val ds4 = ds2.select("ename")
// union(求并集,不去重)
ds3.union(ds4).show
// unionAll(求并集,去重,过期方法)
ds3.unionAll(ds4).show
// intersect(求交集)
ds3.intersect(ds4).show
// except(求差集)
ds3.except(ds4).show
3.3.7. 连接相关
与 SQL 类似,连接类型有:内连接、左(外)连接、右(外)连接、全(外)连接、半连接、反连接、笛卡尔积等:
// join
// inner join(内连接)
ds1.join(ds2, "empno").show
ds1.join(ds2, Seq("empno"), "inner").show
// left join(左连接), left outer join(左外连接)
ds1.join(ds2, Seq("empno"), "left").show
ds1.join(ds2, Seq("empno"), "left_outer").show
// right join(右连接), right outer join(右外连接)
ds1.join(ds2, Seq("empno"), "right").show
ds1.join(ds2, Seq("empno"), "right_outer").show
// outer join(外连接), full join(全连接), full outer join(全外连接)
ds1.join(ds2, Seq("empno"), "outer").show
ds1.join(ds2, Seq("empno"), "full").show
ds1.join(ds2, Seq("empno"), "full_outer").show
//semi join(半连接), anti join(反连接)
ds1.join(ds2, Seq("empno"), "left_semi").show
ds1.join(ds2, Seq("empno"), "left_anti").show
注意:跟更改相关的算子一样,连接操作后,返回的数据集的类型是 DataFrame。
4
Spark SQL 使用实战
有了上面及之前介绍的理论知识为基础,下面手把手带大家十步轻松拿下 Spark SQL 使用操作,用实战的形式实践学习到的理论知识,以加深对 Spark SQL 的印象与理解。
4.1
创建数据源文件
这里使用《如何快速获取并分析自己所在城市的房价行情?》中获取到的广州二手房 csv 格式的数据作为数据源文件。
数据源文件(广州二手房信息)
另外再创建一个户型信息相关的数据源文件,以进行连接操作使用。
数据源文件(户型信息)
注意数据文件的编码格式要采用中文编码,否则中文会显示乱码。
4.2
上传数据源文件至 HDFS
这里使用《万字+50图,详解 Hadoop HA 完全分布式部署配置及运行调试》中搭建的 Hadoop 中的 HDFS 作为数据文件的存储系统,因此需要将创建的数据源文件上传至 HDFS 中,供 Spark SQL 进行读取。
上传数据源文件至 HDFS:
hdfs dfs -put /opt/data/ershouHousePrice_lianjia_gz_hdfs.csv /input
hdfs dfs -put /opt/data/huxing_lianjia_gz_hdfs.csv /input
打开 HDFS 的 Web 页面查看:
通过 HDFS Web 页面查看上传数据文件是否成功
可以看到,两个数据源文件已经成功上传至 HDFS 中。
4.3
定义 case class(表的 schema)
打开 SparkSession,定义 case class,即表的 Schema 信息:
case class House(totalprice:Float, positioninfo:String, huxing:String, chaoxiang:String, zhuangxiu:String, louceng:String, louling:String, louxing:String, danjia:Int, mianji:Float, guanzhu:Int)
定义 case class(House)
这里按照数据文件中的字段名称及对应的数据类型,对 Schema 进行定义。
4.4
读取数据源,加载数据(RDD 转 DataFrame)
读取上传到 HDFS 中的广州二手房信息数据文件,分隔符为逗号,将数据加载到上面定义的 Schema 中,并转换为 DataFrame 数据集:
val houseDF = spark.sparkContext.textFile("hdfs://hadoop100:8020/input/ershouHousePrice_lianjia_gz_hdfs.csv").map(_.split(",")).map(para => House(para(0).trim.toFloat, para(1).trim, para(2).trim, para(3).trim, para(4).trim, para(5).trim, para(6).trim, para(7).trim, para(8).trim.toInt, para(9).trim.toFloat, para(10).trim.toInt)).toDF
houseDF.show
读取并加载数据源文件
展示加载的数据集结果
由于数据加载到 Schema 中为 RDD 数据集,需要用 toDF
转换为 DataFrame 数据集,以使用 Spark SQL 进行查询。
4.5
使用 DSL 风格查询数据
使用 Spark SQL 的 DSL 风格查询方式,对 houseDF
数据集进行查询,包括 select、筛选过滤、聚集统计:
houseDF.select("positioninfo").show
houseDF.filter($"totalprice" > 1000).show
houseDF.groupBy($"huxing").count.show
DSL 风格 - 使用 select 算子
DSL 风格 - 使用筛选过滤算子
DSL 风格 - 使用聚集统计算子
大家还可以尝试使用上面介绍的其它 Spark SQL 算子进行查询。
4.6
注册表
为 houseDF
数据集注册两种不同类型的表:Local Temporary View、Global Temporary View:
houseDF.createOrReplaceTempView("houseDF")
houseDF.createGlobalTempView("houseDF_gl")
下面对这两种类型的表进行查询,观察两者之间的区别。
4.7
使用 SQL 风格查询数据
使用 Spark SQL 的 SQL 风格查询方式,对上面注册的两种不同类型表进行查询:
spark.sql("select * from houseDF").show
SQL 风格 - 查询 Local Temporary View
spark.sql("select * from global_temp.houseDF_gl").show
SQL 风格 - 查询 Global Temporary View
注意查询 Global Temporary View 类型表时,需要加上 global_temp
前缀。
在不同的 Session 中,对上面注册的两种表进行查询:
spark.newSession.sql("select * from houseDF").show
在新的 Session 中查询 Local Temporary View
spark.newSession.sql("select * from global_temp.houseDF_gl").show
在新的 Session 中查询 Global Temporary View
通过操作实践,可以看到:
Local Temporary View 只对当前 Session 有效;而 Global Temporary View 可以在不同 Session 间共享,支持跨 Session 查询。
4.8
DataFrame 转 DataSet
将 DataFrame 数据集 houseDF
转换成 DataSet 数据集 houseDS
:
val houseDS = houseDF.as[House]
houseDS.show
DataFrame 转 DataSet 实战
使用 DSL 风格查询方式,对 houseDS
数据集进行查询操作:
houseDS.filter(_.totalprice > 1000).show
houseDS.filter(_.huxing == "3室2厅").show
houseDS.groupBy($"huxing").count.show
对 DataSet 进行 DSL 风格查询
将 houseDS
数据集转换成 Array 类型结构数据:
houseDS.collect
对 DataSet 转换为 Array 类型结构数据
可见,DataFrame 转换为 DataSet 后,同样支持 Spark SQL 的算子操作。
4.
RDD 转 DataSet
重新读取并加载广州二手房信息数据源文件,将其转换为 DataSet 数据集:
val houseRdd = spark.sparkContext.textFile("hdfs://hadoop100:8020/input/ershouHousePrice_lianjia_gz_hdfs.csv").map(_.split(","))
val houseDS2 = houseRdd.map(para => House(para(0).trim.toFloat,para(1).trim,para(2).trim,para(3).trim,para(4).trim,para(5).trim,para(6).trim,para(7).trim,para(8).trim.toInt,para(9).trim.toFloat,para(10).trim.toInt)).toDS
houseDS2.show
RDD 转 DataSet 实战
将 houseDS2
数据集注册成表,并使用 SQL 风格查询方式进行查询:
houseDS2.createOrReplaceTempView("houseDS2")
spark.sql("select * from houseDS2").show
注册表并进行 SQL 风格查询
spark.sql("select totalprice, positioninfo, huxing, danjia, mianji from houseDS2 where totalprice > 1000 and mianji < 150 order by mianji").show
对 DataSet 进行 SQL 风格查询
SQL 风格查询方式更适合进行复杂的数据查询。
4.10
使用 SQL 风格进行连接查询
读取上传到 HDFS 中的户型信息数据文件,分隔符为逗号,将数据加载到定义的 Schema 中,并转换为 DataSet 数据集:
case class Huxing(huxing:String, rooms:String)
val huxingRdd = spark.sparkContext.textFile("hdfs://hadoop100:8020/input/huxing_lianjia_gz_hdfs.csv").map(_.split(","))
val huxingDS = huxingRdd.map(para => Huxing(para(0).trim, para(1).trim)).toDS
huxingDS.show
加载户型信息数据源文件,并转换为 DataSet
将 huxingDS
数据集注册成表,并使用 SQL 风格查询方式进行查询:
huxingDS.createOrReplaceTempView("huxingDS")
spark.sql("select * from huxingDS").show
注册表并进行 SQL 风格查询
对 houseDS2
与 huxingDS
两个 DataSet 数据集采用 SQL 风格查询方式进行连接查询,统计所有二房和三房房子的总价格:
spark.sql("select sum(totalprice) from (select houseDS2.totalprice, huxingDS.rooms from houseDS2 join huxingDS where houseDS2.huxing = huxingDS.huxing and huxingDS.rooms in ('二房','三房')) t").show
使用 SQL 风格进行连接查询
至此,Spark SQL 的使用操作实战暂告一段落,大家可以继续深入摸索研究,发掘 Spark SQL 的精髓所在!
版权信息:© Evgeny Vasenev / Aurora Photos