前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark SQL 快速入门系列(7) | SparkSQL如何实现与多数据源交互

Spark SQL 快速入门系列(7) | SparkSQL如何实现与多数据源交互

作者头像
不温卜火
发布2020-10-28 17:42:08
1.4K0
发布2020-10-28 17:42:08
举报
文章被收录于专栏:不温卜火

  Spark SQL 的DataFrame接口支持操作多种数据源. 一个 DataFrame类型的对象可以像 RDD 那样操作(比如各种转换), 也可以用来创建临时表.

  把DataFrame注册为一个临时表之后, 就可以在它的数据上面执行 SQL 查询.

一. 通用加载和保存函数

1.1 保存到HDFS上

1.1.1 通用写法

df.write.format("json").save("路径")

  • 1.定义一个DF
代码语言:javascript
复制
// 把scala集合转换成DF,隐式转换不需要自己导
val df = (1 to 10).toDF("num")
  • 2. write
代码语言:javascript
复制
df.write
  • 3. 保存想要保存的文件到指定位置
代码语言:javascript
复制
df.write.format("json").save("./0804json")
  • 4. 查看存放位置(确定是否成功)

  保存操作可以使用 SaveMode, 用来指明如何处理数据. 使用mode()方法来设置.

  有一点很重要: 这些 SaveMode 都是没有加锁的, 也不是原子操作. 还有, 如果你执行的是 Overwrite 操作, 在写入新的数据之前会先删除旧的数据.

  下列为此图实例

  • 5. 如果已经保存过,再次保存相同的文件会出现报错【erroe(模式)】
  • 6. 如果不想出现错误,可以使用overwrite(覆盖)
代码语言:javascript
复制
scala> df.write.format("json").mode("overwrite").save("./0804json")
  • 7. append(追加)
代码语言:javascript
复制
scala> df.write.format("json").mode("overwrite").save("./0804json")
  • 8. ignore(忽略,即没有任何变化)
代码语言:javascript
复制
scala> df.write.format("json").mode("ignore").save("./0804json")

1.1.2 专用写法

代码语言:javascript
复制
scala> df.write.format("json").mode("append").save("./0804json")

1.2 保存到本地

  默认数据源是parquet, 我们也可以通过使用:spark.sql.sources.default这个属性来设置默认的数据源.

代码语言:javascript
复制
val usersDF = spark.read.load("file:///opt/module/spark/ examples/src/main/resources/users.parquet")

usersDF.select("name", "favorite_color").write.save("namesAndFavColors.parquet")

说明:

  1. spark.read.load 是加载数据的通用方法.
  2. df.write.save 是保存数据的通用方法.
  • 1. 手动指定选项

  也可以手动给数据源指定一些额外的选项. 数据源应该用全名称来指定, 但是对一些内置的数据源也可以使用短名称:json, parquet, jdbc, orc, libsvm, csv, text

代码语言:javascript
复制
val peopleDF = spark.read.format("json").load("file:///opt/module/spark/examples/src/main/resources/people.json")

peopleDF.select("name", "age").write.format("parquet").save("namesAndAges.parquet")
  • 2.在文件上直接运行 SQL

  我们前面都是使用read API 先把文件加载到 DataFrame, 然后再查询. 其实, 我们也可以直接在文件上进行查询

代码语言:javascript
复制
scala> spark.sql("select * from json. `file:///opt/module/spark/examples/src/main/resources/people.json`")

说明: json表示文件的格式. 后面的文件具体路径需要用反引号括起来.

二. API读取数据

2.1 加载JSON 文件

  Spark SQL 能够自动推测 JSON数据集的结构,并将它加载为一个Dataset[Row].

  可以通过SparkSession.read.json()去加载一个JSON 文件。 也可以通过SparkSession.read.format(“json”).load()来加载.

  • 1. 源码
代码语言:javascript
复制
package com.buwenbuhuo.spark.sql.day02

import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}

/**
 **
 *
 * @author 不温卜火
 *         *
 * @create 2020-08-04 14:23
 **
 *         MyCSDN :  https://buwenbuhuo.blog.csdn.net/
 *
 */
object DataSourceDemo {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession
      .builder()
      .master("local[*]")
      .appName("DataSourceDemo")
      .getOrCreate()
    import spark.implicits._

    val df: DataFrame = spark.read.json("d:/user.json")
    val ds: Dataset[User] = df.as[User]
    ds.foreach(user => println(user.friends(0)))

  }
}
case class User(name:String, age: Long, friends: Array[String])
  • 2. 运行结果

2.2 读取Parquet 文件

  Parquet 是一种流行的列式存储格式,可以高效地存储具有嵌套字段的记录。Parquet 格式经常在 Hadoop 生态圈中被使用,它也支持 Spark SQL 的全部数据类型。Spark SQL 提供了直接读取和存储 Parquet 格式文件的方法

  • 1. 源码
代码语言:javascript
复制
package com.buwenbuhuo.spark.sql.day02
import org.apache.spark.sql.{DataFrame, Dataset, SaveMode, SparkSession}
/**
 **
@author 不温卜火
 **
 * @create 2020-08-04 14:28
 **
 *         MyCSDN :  https://buwenbuhuo.blog.csdn.net/
 *
 */
object DataSourceDemo1 {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession
      .builder()
      .master("local[*]")
      .appName("DataSourceDemo1 ")
      .getOrCreate()
    import spark.implicits._

    val jsonDF: DataFrame = spark.read.json("d:/user/user.json")
    jsonDF.write.mode(SaveMode.Overwrite).parquet("d:/user/user.parquet")

    val parDF: DataFrame = spark.read.parquet("d:/user/user.parquet")
    val userDS: Dataset[User] = parDF.as[User]
    userDS.map(user => {user.name="bwbh"; user.friends(0)="不温卜火";user}).show()
  }
}
case class User(var name:String, age: Long, friends: Array[String])
  • 2. 运行结果

注意:   Parquet格式的文件是 Spark 默认格式的数据源.所以, 当使用通用的方式时可以直接保存和读取.而不需要使用format

  spark.sql.sources.default 这个配置可以修改默认数据源

三. JDBC

3.1 从 jdbc 读数据

  可以使用通用的load方法, 也可以使用jdbc方法

3.1.1 使用通用的load方法加载

  • 1. 源码
代码语言:javascript
复制
package com.buwenbuhuo.spark.sql.day02.jdbc

import org.apache.spark.sql.SparkSession

/**
 **
 *
 * @author 不温卜火
 *         *
 * @create 2020-08-04 15:12
 **
 *         MyCSDN :  https://buwenbuhuo.blog.csdn.net/
 *
 */
object JDBCRead {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession
      .builder()
      .master("local[*]")
      .appName("JDBCRead")
      .getOrCreate()
    import spark.implicits._

    val jdbcDF = spark.read
      .format("jdbc")
      .option("url", "jdbc:mysql://hadoop002:3306/rdd")
      .option("user", "root")
      .option("password", "199712")
      .option("dbtable", "user")
      .load()
    jdbcDF.show
  }

}
  • 2. 运行结果

3.1.2 使用 jdbc方法加载

  • 1. 源码
代码语言:javascript
复制
package com.buwenbuhuo.spark.sql.day02.jdbc
import java.util.Properties

import org.apache.spark.sql.{DataFrame, SparkSession}

/**
 **
 *
 * @author 不温卜火
 *         *
 * @create 2020-08-04 15:12
 **
 *         MyCSDN :  https://buwenbuhuo.blog.csdn.net/
 *
 */
object JDBCRead1 {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession
      .builder()
      .master("local[*]")
      .appName("JDBCRead1")
      .getOrCreate()

    val props: Properties = new Properties()
    props.setProperty("user", "root")
    props.setProperty("password", "199712")
    val df: DataFrame = spark.read.jdbc("jdbc:mysql://hadoop002:3306/rdd", "user", props)
    df.show
  }


}
  • 2. 运行结果

3.2 从 jdbc 读数据

  也分两种方法: 通用write.save和write.jdbc

3.2.1 write.save

  • 1. 源码
代码语言:javascript
复制
package com.buwenbuhuo.spark.sql.day02.jdbc
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{ Dataset, SaveMode, SparkSession}


/**
 **
 * @author 不温卜火
 * @create 2020-08-04 15:12
 *  MyCSDN :  https://buwenbuhuo.blog.csdn.net/
 *
 */
object JDBCWrite {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession
      .builder()
      .master("local[*]")
      .appName("JDBCWrite")
      .getOrCreate()
    import spark.implicits._
    val rdd: RDD[User1] = spark.sparkContext.parallelize(Array(User1(7,"瞎子",20), User1(8,"zs", 30)))
    val ds: Dataset[User1] = rdd.toDS
    ds.write
      .format("jdbc")
      .option("url", "jdbc:mysql://hadoop002:3306/rdd")
      .option("user", "root")
      .option("password", "199712")
      .option("dbtable", "user")
      .mode(SaveMode.Append)
      .save()

  }
}


case class User1(id: Long,name: String, age: Long)
  • 2. 运行结果

3.2.1 write.jdbc

  • 1. 源码
代码语言:javascript
复制
package com.buwenbuhuo.spark.sql.day02.jdbc
import java.util.Properties

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, SaveMode, SparkSession}

/**
 **
 * @author 不温卜火
 * @create 2020-08-04 15:12
 *  MyCSDN :  https://buwenbuhuo.blog.csdn.net/
 *
 */
object JDBCWrite1 {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession
      .builder()
      .master("local[*]")
      .appName("JDBCWrite1")
      .getOrCreate()
    import spark.implicits._
    val rdd: RDD[User2] = spark.sparkContext.parallelize(Array(User2(9,"wuwuw",20)))
    val ds: Dataset[User2] = rdd.toDS
    val props: Properties = new Properties()
    props.setProperty("user", "root")
    props.setProperty("password", "199712")
    ds.write.mode(SaveMode.Append).jdbc("jdbc:mysql://hadoop002:3306/rdd", "user", props)
  }

}


case class User2(id:Long,name: String, age: Long)
  • 2. 运行结果

  本次的分享就到这里了

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2020/08/13 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一. 通用加载和保存函数
    • 1.1 保存到HDFS上
      • 1.1.1 通用写法
      • 1.1.2 专用写法
    • 1.2 保存到本地
    • 二. API读取数据
      • 2.1 加载JSON 文件
        • 2.2 读取Parquet 文件
        • 三. JDBC
          • 3.1 从 jdbc 读数据
            • 3.1.1 使用通用的load方法加载
            • 3.1.2 使用 jdbc方法加载
          • 3.2 从 jdbc 读数据
            • 3.2.1 write.save
            • 3.2.1 write.jdbc
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档