Spark SQL 的DataFrame接口支持操作多种数据源. 一个 DataFrame类型的对象可以像 RDD 那样操作(比如各种转换), 也可以用来创建临时表.
把DataFrame注册为一个临时表之后, 就可以在它的数据上面执行 SQL 查询.
df.write.format("json").save("路径")
// 把scala集合转换成DF,隐式转换不需要自己导
val df = (1 to 10).toDF("num")
df.write
df.write.format("json").save("./0804json")
保存操作可以使用 SaveMode, 用来指明如何处理数据. 使用mode()方法来设置.
有一点很重要: 这些 SaveMode 都是没有加锁的, 也不是原子操作. 还有, 如果你执行的是 Overwrite 操作, 在写入新的数据之前会先删除旧的数据.
下列为此图实例
scala> df.write.format("json").mode("overwrite").save("./0804json")
scala> df.write.format("json").mode("overwrite").save("./0804json")
scala> df.write.format("json").mode("ignore").save("./0804json")
scala> df.write.format("json").mode("append").save("./0804json")
默认数据源是parquet, 我们也可以通过使用:spark.sql.sources.default这个属性来设置默认的数据源.
val usersDF = spark.read.load("file:///opt/module/spark/ examples/src/main/resources/users.parquet")
usersDF.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
说明:
spark.read.load
是加载数据的通用方法.df.write.save
是保存数据的通用方法. 也可以手动给数据源指定一些额外的选项. 数据源应该用全名称来指定, 但是对一些内置的数据源也可以使用短名称:json
, parquet
, jdbc
, orc
, libsvm
, csv
, text
val peopleDF = spark.read.format("json").load("file:///opt/module/spark/examples/src/main/resources/people.json")
peopleDF.select("name", "age").write.format("parquet").save("namesAndAges.parquet")
我们前面都是使用read API 先把文件加载到 DataFrame, 然后再查询. 其实, 我们也可以直接在文件上进行查询
scala> spark.sql("select * from json. `file:///opt/module/spark/examples/src/main/resources/people.json`")
说明: json表示文件的格式. 后面的文件具体路径需要用反引号括起来.
Spark SQL 能够自动推测 JSON数据集的结构,并将它加载为一个Dataset[Row].
可以通过SparkSession.read.json()去加载一个JSON 文件。 也可以通过SparkSession.read.format(“json”).load()来加载.
package com.buwenbuhuo.spark.sql.day02
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
/**
**
*
* @author 不温卜火
* *
* @create 2020-08-04 14:23
**
* MyCSDN : https://buwenbuhuo.blog.csdn.net/
*
*/
object DataSourceDemo {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.master("local[*]")
.appName("DataSourceDemo")
.getOrCreate()
import spark.implicits._
val df: DataFrame = spark.read.json("d:/user.json")
val ds: Dataset[User] = df.as[User]
ds.foreach(user => println(user.friends(0)))
}
}
case class User(name:String, age: Long, friends: Array[String])
Parquet 是一种流行的列式存储格式,可以高效地存储具有嵌套字段的记录。Parquet 格式经常在 Hadoop 生态圈中被使用,它也支持 Spark SQL 的全部数据类型。Spark SQL 提供了直接读取和存储 Parquet 格式文件的方法
package com.buwenbuhuo.spark.sql.day02
import org.apache.spark.sql.{DataFrame, Dataset, SaveMode, SparkSession}
/**
**
@author 不温卜火
**
* @create 2020-08-04 14:28
**
* MyCSDN : https://buwenbuhuo.blog.csdn.net/
*
*/
object DataSourceDemo1 {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.master("local[*]")
.appName("DataSourceDemo1 ")
.getOrCreate()
import spark.implicits._
val jsonDF: DataFrame = spark.read.json("d:/user/user.json")
jsonDF.write.mode(SaveMode.Overwrite).parquet("d:/user/user.parquet")
val parDF: DataFrame = spark.read.parquet("d:/user/user.parquet")
val userDS: Dataset[User] = parDF.as[User]
userDS.map(user => {user.name="bwbh"; user.friends(0)="不温卜火";user}).show()
}
}
case class User(var name:String, age: Long, friends: Array[String])
注意: Parquet格式的文件是 Spark 默认格式的数据源.所以, 当使用通用的方式时可以直接保存和读取.而不需要使用format
spark.sql.sources.default 这个配置可以修改默认数据源
可以使用通用的load方法, 也可以使用jdbc方法
package com.buwenbuhuo.spark.sql.day02.jdbc
import org.apache.spark.sql.SparkSession
/**
**
*
* @author 不温卜火
* *
* @create 2020-08-04 15:12
**
* MyCSDN : https://buwenbuhuo.blog.csdn.net/
*
*/
object JDBCRead {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.master("local[*]")
.appName("JDBCRead")
.getOrCreate()
import spark.implicits._
val jdbcDF = spark.read
.format("jdbc")
.option("url", "jdbc:mysql://hadoop002:3306/rdd")
.option("user", "root")
.option("password", "199712")
.option("dbtable", "user")
.load()
jdbcDF.show
}
}
package com.buwenbuhuo.spark.sql.day02.jdbc
import java.util.Properties
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
**
*
* @author 不温卜火
* *
* @create 2020-08-04 15:12
**
* MyCSDN : https://buwenbuhuo.blog.csdn.net/
*
*/
object JDBCRead1 {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.master("local[*]")
.appName("JDBCRead1")
.getOrCreate()
val props: Properties = new Properties()
props.setProperty("user", "root")
props.setProperty("password", "199712")
val df: DataFrame = spark.read.jdbc("jdbc:mysql://hadoop002:3306/rdd", "user", props)
df.show
}
}
也分两种方法: 通用write.save和write.jdbc
package com.buwenbuhuo.spark.sql.day02.jdbc
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{ Dataset, SaveMode, SparkSession}
/**
**
* @author 不温卜火
* @create 2020-08-04 15:12
* MyCSDN : https://buwenbuhuo.blog.csdn.net/
*
*/
object JDBCWrite {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.master("local[*]")
.appName("JDBCWrite")
.getOrCreate()
import spark.implicits._
val rdd: RDD[User1] = spark.sparkContext.parallelize(Array(User1(7,"瞎子",20), User1(8,"zs", 30)))
val ds: Dataset[User1] = rdd.toDS
ds.write
.format("jdbc")
.option("url", "jdbc:mysql://hadoop002:3306/rdd")
.option("user", "root")
.option("password", "199712")
.option("dbtable", "user")
.mode(SaveMode.Append)
.save()
}
}
case class User1(id: Long,name: String, age: Long)
package com.buwenbuhuo.spark.sql.day02.jdbc
import java.util.Properties
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, SaveMode, SparkSession}
/**
**
* @author 不温卜火
* @create 2020-08-04 15:12
* MyCSDN : https://buwenbuhuo.blog.csdn.net/
*
*/
object JDBCWrite1 {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.master("local[*]")
.appName("JDBCWrite1")
.getOrCreate()
import spark.implicits._
val rdd: RDD[User2] = spark.sparkContext.parallelize(Array(User2(9,"wuwuw",20)))
val ds: Dataset[User2] = rdd.toDS
val props: Properties = new Properties()
props.setProperty("user", "root")
props.setProperty("password", "199712")
ds.write.mode(SaveMode.Append).jdbc("jdbc:mysql://hadoop002:3306/rdd", "user", props)
}
}
case class User2(id:Long,name: String, age: Long)
本次的分享就到这里了