前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark SQL 快速入门系列(2) | SparkSession与DataFrame的简单介绍

Spark SQL 快速入门系列(2) | SparkSession与DataFrame的简单介绍

作者头像
不温卜火
发布2020-10-28 17:40:55
2.2K0
发布2020-10-28 17:40:55
举报
文章被收录于专栏:不温卜火

一. SparkSession

  在老的版本中,SparkSQL 提供两种 SQL 查询起始点:一个叫SQLContext,用于Spark 自己提供的 SQL 查询;一个叫 HiveContext,用于连接 Hive 的查询。

  从2.0开始, SparkSession是 Spark 最新的 SQL 查询起始点,实质上是SQLContext和HiveContext的组合,所以在SQLContext和HiveContext上可用的 API 在SparkSession上同样是可以使用的。

  SparkSession内部封装了SparkContext,所以计算实际上是由SparkContext完成的。

  当我们使用 spark-shell 的时候, spark 会自动的创建一个叫做spark的SparkSession, 就像我们以前可以自动获取到一个sc来表示SparkContext

二. 使用 DataFrame 进行编程

  Spark SQL 的 DataFrame API 允许我们使用 DataFrame 而不用必须去注册临时表或者生成 SQL 表达式.

  DataFrame API 既有 transformation操作也有action操作. DataFrame的转换从本质上来说更具有关系, 而 DataSet API 提供了更加函数式的 API

2.1 创建 DataFrame

With a SparkSession, applications can create DataFrames from an existing RDD, from a Hive table, or from Spark data sources.

有了 SparkSession 之后, 通过 SparkSession有 3 种方式来创建DataFrame:

  1. 通过 Spark 的数据源创建
  2. 通过已知的 RDD 来创建
  3. 通过查询一个 Hive 表来创建.

1. 通过 Spark 数据源创建

  • 1. 查看Spark数据源进行创建的文件格式
  • 2. 读取json文件创建DataFrame
代码语言:javascript
复制
// 读取 json 文件
scala> val df = spark.read.json("file:///opt/module/spark/examples/src/main/resources/employees.json")
df: org.apache.spark.sql.DataFrame = [name: string, salary: bigint]
  • 3. 展示结果
代码语言:javascript
复制
// 展示结果
scala> df.show
+-------+------+
|   name|salary|
+-------+------+
|Michael|  3000|
|   Andy|  4500|
| Justin|  3500|
|  Berta|  4000|
+-------+------+

关于通过 RDD 进行转换和通过查询 Hive 表创建,博主会在后面专门探讨

2.2 DataFrame 语法风格

1. SQL 语法风格(主要)

  SQL 语法风格是指我们查询数据的时候使用 SQL 语句来查询.

  这种风格的查询必须要有临时视图或者全局视图来辅助

  • 1. 读取json文件创建DataFrame
代码语言:javascript
复制
// 读取 json 文件
scala> val df = spark.read.json("file:///opt/module/spark/examples/src/main/resources/employees.json")
df: org.apache.spark.sql.DataFrame = [name: string, salary: bigint]
  • 2. 对DataFrame创建一个临时表
代码语言:javascript
复制
scala> df.createOrReplaceTempView("people")
  • 3. 通过SQL语句实现查询全表
代码语言:javascript
复制
scala> spark.sql("select * from people").show
+-------+------+
|   name|salary|
+-------+------+
|Michael|  3000|
|   Andy|  4500|
| Justin|  3500|
|  Berta|  4000|
+-------+------+

注意

  1. 临时视图只能在当前 Session 有效, 在新的 Session 中无效.
  2. 可以创建全局视图. 访问全局视图需要全路径:如global_temp.xxx
  • 4. 对于DataFrame创建一个全局表
代码语言:javascript
复制
scala> val df = spark.read.json("file:///opt/module/spark-local/examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> df.createGlobalTempView("people")
  • 5. 通过SQL语句实现查询全表
代码语言:javascript
复制
scala> spark.sql("select * from global_temp.people")
res31: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> res5.show
+-------+------+
|   name|salary|
+-------+------+
|Michael|  3000|
|   Andy|  4500|
| Justin|  3500|
|  Berta|  4000|
+-------+------+



scala> spark.newSession.sql("select * from global_temp.people")
res33: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> res7.show
+-------+------+
|   name|salary|
+-------+------+
|Michael|  3000|
|   Andy|  4500|
| Justin|  3500|
|  Berta|  4000|
+-------+------+

2. DSL 语法风格(了解)

  DataFrame提供一个特定领域语言(domain-specific language, DSL)去管理结构化的数据. 可以在 Scala, Java, Python 和 R 中使用 DSL

  使用 DSL 语法风格不必去创建临时视图了.

1. 查看 Schema 信息
代码语言:javascript
复制
scala> val df = spark.read.json("file:///opt/module/spark-local/examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> df.printSchema
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
2. 使用 DSL 查询
  • 1. 只查询name列数据
代码语言:javascript
复制
scala> df.select($"name").show
+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+


scala> df.select("name").show
+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+
  • 2. 查询name和age
代码语言:javascript
复制
scala> df.select("name", "age").show
+-------+----+
|   name| age|
+-------+----+
|Michael|null|
|   Andy|  30|
| Justin|  19|
+-------+----+
  • 3. 查询name和age + 1
代码语言:javascript
复制
// 设计到运算的时候, 每列都必须使用$
scala> df.select($"name", $"age" + 1).show
+-------+---------+
|   name|(age + 1)|
+-------+---------+
|Michael|     null|
|   Andy|       31|
| Justin|       20|
+-------+---------+
  • 4. 查询age大于20的数据
代码语言:javascript
复制
scala> df.filter($"age" > 21).show
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+
  • 5. 按照age分组,查看数据条数
代码语言:javascript
复制
scala> df.groupBy("age").count.show
+----+-----+
| age|count|
+----+-----+
|  19|    1|
|null|    1|
|  30|    1|
+----+-----+

2.3 RDD 和 DataFrame 的交互

1. 从 RDD 到 DataFrame

  涉及到RDD, DataFrame, DataSet之间的操作时, 需要导入:import spark.implicits._ 这里的spark不是包名, 而是表示SparkSession 的那个对象. 所以必须先创建SparkSession对象再导入. implicits是一个内部object

  首先创建一个RDD

代码语言:javascript
复制
scala> val rdd1 = sc.textFile("/opt/module/spark-local/examples/src/main/resources/people.txt")
rdd1: org.apache.spark.rdd.RDD[String] = /opt/module/spark-local/examples/src/main/resources/people.txt MapPartitionsRDD[10] at textFile at <console>:24
  • 1. 手动转换
代码语言:javascript
复制
scala> val rdd2 = rdd1.map(line => { val paras = line.split(", "); (paras(0), paras(1).toInt)})
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[11] at map at <console>:26

// 转换为 DataFrame 的时候手动指定每个数据字段名
scala> rdd2.toDF("name", "age").show
+-------+---+
|   name|age|
+-------+---+
|Michael| 29|
|   Andy| 30|
| Justin| 19|
+-------+---+
  • 2. 通过样例类反射转换(最常用)
代码语言:javascript
复制
// 1.创建样例类
scala> case class People(name :String, age: Int)
defined class People

// 2.使用样例把 RDD 转换成DataFrame
scala> val rdd2 = rdd1.map(line => { val paras = line.split(", "); People(paras(0), paras(1).toInt) })
rdd2: org.apache.spark.rdd.RDD[People] = MapPartitionsRDD[6] at map at <console>:28

scala> rdd2.toDF.show
+-------+---+
|   name|age|
+-------+---+
|Michael| 29|
|   Andy| 30|
| Justin| 19|
+-------+---+
  • 3. 通过 API 的方式转换
代码语言:javascript
复制
package day05

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

object DataFrameDemo2 {
    def main(args: Array[String]): Unit = {

        val spark: SparkSession = SparkSession.builder()
            .master("local[*]")
            .appName("Word Count")
            .getOrCreate()
        val sc: SparkContext = spark.sparkContext
        val rdd: RDD[(String, Int)] = sc.parallelize(Array(("lisi", 10), ("zs", 20), ("zhiling", 40)))
        // 映射出来一个 RDD[Row], 因为 DataFrame其实就是 DataSet[Row]
        val rowRdd: RDD[Row] = rdd.map(x => Row(x._1, x._2))
        // 创建 StructType 类型
        val types = StructType(Array(StructField("name", StringType), StructField("age", IntegerType)))
        val df: DataFrame = spark.createDataFrame(rowRdd, types)
        df.show

    }
}

2. 从 DataFrame到RDD

直接调用DataFrame的rdd方法就完成了从转换.

代码语言:javascript
复制
scala> val df = spark.read.json("/opt/module/spark-local/examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> val rdd = df.rdd
rdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[6] at rdd at <console>:25

scala> rdd.collect
res0: Array[org.apache.spark.sql.Row] = Array([null,Michael], [30,Andy], [19,Justin])

说明

得到的RDD中存储的数据类型是:Row.

  本次的分享就到这里了

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2020/08/07 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一. SparkSession
  • 二. 使用 DataFrame 进行编程
    • 2.1 创建 DataFrame
      • 1. 通过 Spark 数据源创建
    • 2.2 DataFrame 语法风格
      • 1. SQL 语法风格(主要)
      • 2. DSL 语法风格(了解)
    • 2.3 RDD 和 DataFrame 的交互
      • 1. 从 RDD 到 DataFrame
      • 2. 从 DataFrame到RDD
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档