在老的版本中,SparkSQL 提供两种 SQL 查询起始点:一个叫SQLContext,用于Spark 自己提供的 SQL 查询;一个叫 HiveContext,用于连接 Hive 的查询。
从2.0开始, SparkSession是 Spark 最新的 SQL 查询起始点,实质上是SQLContext和HiveContext的组合,所以在SQLContext和HiveContext上可用的 API 在SparkSession上同样是可以使用的。
SparkSession内部封装了SparkContext,所以计算实际上是由SparkContext完成的。
当我们使用 spark-shell 的时候, spark 会自动的创建一个叫做spark的SparkSession, 就像我们以前可以自动获取到一个sc来表示SparkContext
Spark SQL 的 DataFrame API 允许我们使用 DataFrame 而不用必须去注册临时表或者生成 SQL 表达式.
DataFrame API 既有 transformation操作也有action操作. DataFrame的转换从本质上来说更具有关系, 而 DataSet API 提供了更加函数式的 API
With a SparkSession, applications can create DataFrames from an existing RDD, from a Hive table, or from Spark data sources.
有了 SparkSession 之后, 通过 SparkSession有 3 种方式来创建DataFrame:
// 读取 json 文件
scala> val df = spark.read.json("file:///opt/module/spark/examples/src/main/resources/employees.json")
df: org.apache.spark.sql.DataFrame = [name: string, salary: bigint]
// 展示结果
scala> df.show
+-------+------+
| name|salary|
+-------+------+
|Michael| 3000|
| Andy| 4500|
| Justin| 3500|
| Berta| 4000|
+-------+------+
关于通过 RDD 进行转换和通过查询 Hive 表创建,博主会在后面专门探讨
SQL 语法风格是指我们查询数据的时候使用 SQL 语句来查询.
这种风格的查询必须要有临时视图或者全局视图来辅助
// 读取 json 文件
scala> val df = spark.read.json("file:///opt/module/spark/examples/src/main/resources/employees.json")
df: org.apache.spark.sql.DataFrame = [name: string, salary: bigint]
scala> df.createOrReplaceTempView("people")
scala> spark.sql("select * from people").show
+-------+------+
| name|salary|
+-------+------+
|Michael| 3000|
| Andy| 4500|
| Justin| 3500|
| Berta| 4000|
+-------+------+
注意:
scala> val df = spark.read.json("file:///opt/module/spark-local/examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> df.createGlobalTempView("people")
scala> spark.sql("select * from global_temp.people")
res31: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> res5.show
+-------+------+
| name|salary|
+-------+------+
|Michael| 3000|
| Andy| 4500|
| Justin| 3500|
| Berta| 4000|
+-------+------+
scala> spark.newSession.sql("select * from global_temp.people")
res33: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> res7.show
+-------+------+
| name|salary|
+-------+------+
|Michael| 3000|
| Andy| 4500|
| Justin| 3500|
| Berta| 4000|
+-------+------+
DataFrame提供一个特定领域语言(domain-specific language, DSL)去管理结构化的数据. 可以在 Scala, Java, Python 和 R 中使用 DSL
使用 DSL 语法风格不必去创建临时视图了.
scala> val df = spark.read.json("file:///opt/module/spark-local/examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> df.printSchema
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
scala> df.select($"name").show
+-------+
| name|
+-------+
|Michael|
| Andy|
| Justin|
+-------+
scala> df.select("name").show
+-------+
| name|
+-------+
|Michael|
| Andy|
| Justin|
+-------+
scala> df.select("name", "age").show
+-------+----+
| name| age|
+-------+----+
|Michael|null|
| Andy| 30|
| Justin| 19|
+-------+----+
// 设计到运算的时候, 每列都必须使用$
scala> df.select($"name", $"age" + 1).show
+-------+---------+
| name|(age + 1)|
+-------+---------+
|Michael| null|
| Andy| 31|
| Justin| 20|
+-------+---------+
scala> df.filter($"age" > 21).show
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+
scala> df.groupBy("age").count.show
+----+-----+
| age|count|
+----+-----+
| 19| 1|
|null| 1|
| 30| 1|
+----+-----+
涉及到RDD, DataFrame, DataSet之间的操作时, 需要导入:import spark.implicits._ 这里的spark不是包名, 而是表示SparkSession 的那个对象. 所以必须先创建SparkSession对象再导入. implicits是一个内部object
首先创建一个RDD
scala> val rdd1 = sc.textFile("/opt/module/spark-local/examples/src/main/resources/people.txt")
rdd1: org.apache.spark.rdd.RDD[String] = /opt/module/spark-local/examples/src/main/resources/people.txt MapPartitionsRDD[10] at textFile at <console>:24
scala> val rdd2 = rdd1.map(line => { val paras = line.split(", "); (paras(0), paras(1).toInt)})
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[11] at map at <console>:26
// 转换为 DataFrame 的时候手动指定每个数据字段名
scala> rdd2.toDF("name", "age").show
+-------+---+
| name|age|
+-------+---+
|Michael| 29|
| Andy| 30|
| Justin| 19|
+-------+---+
// 1.创建样例类
scala> case class People(name :String, age: Int)
defined class People
// 2.使用样例把 RDD 转换成DataFrame
scala> val rdd2 = rdd1.map(line => { val paras = line.split(", "); People(paras(0), paras(1).toInt) })
rdd2: org.apache.spark.rdd.RDD[People] = MapPartitionsRDD[6] at map at <console>:28
scala> rdd2.toDF.show
+-------+---+
| name|age|
+-------+---+
|Michael| 29|
| Andy| 30|
| Justin| 19|
+-------+---+
package day05
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
object DataFrameDemo2 {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder()
.master("local[*]")
.appName("Word Count")
.getOrCreate()
val sc: SparkContext = spark.sparkContext
val rdd: RDD[(String, Int)] = sc.parallelize(Array(("lisi", 10), ("zs", 20), ("zhiling", 40)))
// 映射出来一个 RDD[Row], 因为 DataFrame其实就是 DataSet[Row]
val rowRdd: RDD[Row] = rdd.map(x => Row(x._1, x._2))
// 创建 StructType 类型
val types = StructType(Array(StructField("name", StringType), StructField("age", IntegerType)))
val df: DataFrame = spark.createDataFrame(rowRdd, types)
df.show
}
}
直接调用DataFrame的rdd方法就完成了从转换.
scala> val df = spark.read.json("/opt/module/spark-local/examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> val rdd = df.rdd
rdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[6] at rdd at <console>:25
scala> rdd.collect
res0: Array[org.apache.spark.sql.Row] = Array([null,Michael], [30,Andy], [19,Justin])
说明:
得到的RDD中存储的数据类型是:Row.
本次的分享就到这里了