Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >SparkSql之编程方式

SparkSql之编程方式

作者头像
用户1483438
发布于 2022-07-26 13:34:32
发布于 2022-07-26 13:34:32
94300
代码可运行
举报
文章被收录于专栏:大数据共享大数据共享
运行总次数:0
代码可运行

什么是SparkSql?

  • SparkSql作用 主要用于用于处理结构化数据,底层就是将SQL语句转成RDD执行
  • SparkSql的数据抽象 1.DataFrame 2.DataSet

SparkSession

在老的版本中,SparkSQL提供两种SQL查询起始点:

  • 一个叫SQLContext,用于Spark自己提供的SQL查询;
  • 一个叫HiveContext,用于连接Hive的查询。

SparkSession是Spark最新的SQL查询起始点,实质上是SQLContext和HiveContext的组合,所以在SQLContext和HiveContext上可用的API在SparkSession上同样是可以使用的。

SparkSession内部封装了SparkContext,所以计算实际上是由SparkContext完成的。当我们使用spark-shell的时候,Spark框架会自动的创建一个名称叫做Spark的SparkSession,就像我们以前可以自动获取到一个sc来表示SparkContext。

引入依赖

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    <dependencies>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>compile</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>

    </dependencies>

创建SparkSession

导包

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
import org.apache.spark.sql.SparkSession

SparkSession 构造器

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Stable
class SparkSession private(
    @transient val sparkContext: SparkContext,
    @transient private val existingSharedState: Option[SharedState],
    @transient private val parentSessionState: Option[SessionState],
    @transient private[sql] val extensions: SparkSessionExtensions)
  extends Serializable with Closeable with Logging {...}

SparkSession 主构造器已被私有化,无法通过常规的new创建对象。在SparkSession伴生对象中,有个Builder类及builder方法

第一种方式: 创建Builder 对象获取SparkSession 实例

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// 创建Builder实例
val builder = new spark.sql.SparkSession.Builder
// 调用getOrCreate获取 SparkSession 实例
val session: SparkSession = builder.getOrCreate()

第二种方式: 通过SparkSession调用builder()函数获取Builder的实例

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// 通过调用 builder() 获取 Builder实例
val builder: SparkSession.Builder = SparkSession.builder()
// 调用getOrCreate获取 SparkSession 实例
val session: SparkSession = builder.getOrCreate()

在使用SparkContext时 可以在SparkConf指定masterappName 如:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
val conf =new SparkConf().setMaster("local[4]").setAppName("test")
val sc=new SparkContext(conf)

Builder也是可以

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
val builder: SparkSession.Builder = SparkSession.builder()
builder.master("local[4]")
builder.appName("test")

创建好SparkSession就可以开始下面的工作了。


spark sql 编程有两种方式

  • 声明式:SQL
  • 命令式:DSL

声明式:SQL

使用声明式,需要注册成表注册成表的四种方式

  • createOrReplaceTempView:创建临时视图,如果视图已经存在则覆盖[只能在当前sparksession中使用] 【重点】
  • createTempView: 创建临时视图,如果视图已经存在则报错[只能在当前sparksession中使用]

示例: 注册成表;viewName指定表名

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
 df.createGlobalTempView(viewName="表名")

编写sql

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
sparksession.sql("sql语句")

案例:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Test
  def sparkSqlBySql(): Unit ={
    val female=List(
      Student(2,"绣花",16,"女",1),
      Student(5,"翠花",19,"女",2),
      Student(9,"王菲菲",20,"女",1),
      Student(11,"小惠",23,"女",1),
      Student(12,"梦雅",25,"女",3)
    )

    val boys=List(
      Student(1,"张三",18,"男",3),
      Student(3,"李四",18,"男",2),
      Student(4,"王五",18,"男",2),
      Student(7,"张鹏",14,"男",1),
      Student(8,"刘秀",13,"男",2),
      Student(10,"乐乐",21,"男",1)
    )

    // 导入隐式转换
    import sparkSession.implicits._

    val femaleDf: DataFrame = female.toDF()
    val boysDf: DataFrame = boys.toDF()

    //合并
    val unionAll=femaleDf.unionAll(boysDf)


    // 注册成表
    unionAll.createOrReplaceTempView(viewName = "student")


    //编写sql

    // 统计男女人数
    sparkSession.sql(
      """
        |select sex,count(*) sex_count from student
        |group by sex
        |""".stripMargin).show()

  }
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
+---+---------+
|sex|sex_count|
+---+---------+
||        6|
||        5|
+---+---------+

也可以支持开窗

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    // 统计男女人数
    sparkSession.sql(
      """
        |select *,row_number() over(partition by sex order by age)as rn from student
        |""".stripMargin).show()
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
+---+------+---+---+-------+---+
| id|  name|age|sex|classId| rn|
+---+------+---+---+-------+---+
|  8|  刘秀| 13||      2|  1|
|  7|  张鹏| 14||      1|  2|
|  1|  张三| 18||      3|  3|
|  3|  李四| 18||      2|  4|
|  4|  王五| 18||      2|  5|
| 10|  乐乐| 21||      1|  6|
|  2|  绣花| 16||      1|  1|
|  5|  翠花| 19||      2|  2|
|  9|王菲菲| 20||      1|  3|
| 11|  小惠| 23||      1|  4|
| 12|  梦雅| 25||      3|  5|
+---+------+---+---+-------+---+

  • createOrReplaceGlobalTempView: 创建全局视图,如果视图已经存在则覆盖[能够在多个sparksession中使用]
  • createGlobalTempView: 创建全局视图,如果视图已经存在则报错[能够在多个sparksession中使用]

注意:使用createOrReplaceGlobalTempViewcreateGlobalTempView创建的表后续查询的时候必须通过 global_temp.表名 方式使用

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    // 统计男女人数
    sparkSession.sql(
      """
        |select *,row_number() over(partition by sex order by age)as rn from global_temp.student
        |""".stripMargin).show()

    // 获取一个新的sparkSession
    val sparkSession2: SparkSession = sparkSession.newSession()
    sparkSession2.sql(
      """
        |select *,row_number() over(partition by sex order by age)as rn from global_temp.student
        |""".stripMargin).show()

结果都是一样,略...


命令式:DSL

通过算子操作数据 参考:https://blog.csdn.net/dabokele/article/details/52802150

DataFrame对象上Action操作

  1. show:展示数据
  2. collect:获取所有数据到数组
  3. collectAsList:获取所有数据到List
  4. describe(cols: String*):获取指定字段的统计信息
  5. first, head, take, takeAsList:获取若干行记录

DataFrame对象上的条件查询和join等操作

  • where条件相关 1.where(conditionExpr: String):SQL语言中where关键字后的条件 2.filter:根据字段进行筛选
  • 查询指定字段 1.select:获取指定字段值 2.electExpr:可以对指定字段进行特殊处理 3.col:获取指定字段 4.apply:获取指定字段 5.drop:去除指定字段,保留其他字段
  • limit limit方法获取指定DataFrame的前n行记录,得到一个新的DataFrame对象。和take与head不同的是,limit方法不是Action操作。
  • order by 1.orderBy和sort:按指定字段排序,默认为升序 2.sortWithinPartitions   和上面的sort方法功能类似,区别在于sortWithinPartitions方法返回的是按Partition排好序的DataFrame对象。
  • group by 1.groupBy:根据字段进行group by操作 2.cube和rollup:group by的扩展 3.GroupedData对象   该方法得到的是GroupedData类型对象,在GroupedData的API中提供了group by之后的操作,比如, max(colNames: String*)方法,获取分组中指定字段或者所有的数字类型字段的最大值,只能作用于数字型字段 min(colNames: String*)方法,获取分组中指定字段或者所有的数字类型字段的最小值,只能作用于数字型字段 mean(colNames: String*)方法,获取分组中指定字段或者所有的数字类型字段的平均值,只能作用于数字型字段 sum(colNames: String*)方法,获取分组中指定字段或者所有的数字类型字段的和值,只能作用于数字型字段 count()方法,获取分组中的元素个数
  • distinct 1.distinct:返回一个不包含重复记录的DataFrame 2.dropDuplicates:根据指定字段去重
  • 聚合 1.聚合操作调用的是agg方法,该方法有多种调用方式。一般与groupBy方法配合使用。
  • union 1.unionAll方法:对两个DataFrame进行组合
  • join 1.笛卡尔积 2.using一个字段形式 3.using多个字段形式 4.指定join类型 5.使用Column类型来join 6.在指定join字段同时指定join类型
  • 获取指定字段统计信息 1.stat方法可以用于计算指定字段或指定字段之间的统计信息,比如方差,协方差等。这个方法返回一个DataFramesStatFunctions类型对象。
  • 获取两个DataFrame中共有的记录 1.intersect方法可以计算出两个DataFrame中相同的记录,
  • 获取一个DataFrame中有另一个DataFrame中没有的记录 1.使用 except
  • 操作字段名 1.withColumnRenamed:重命名DataFrame中的指定字段名   如果指定的字段名不存在,不进行任何操作 2.withColumn:往当前DataFrame中新增一列   whtiColumn(colName: String , col: Column)方法根据指定colName往DataFrame中新增一列,如果colName已存在,则会覆盖当前列。
  • 行转列 1.有时候需要根据某个字段内容进行分割,然后生成多行,这时可以使用explode方法
  • 其他操作 API中还有na, randomSplit, repartition, alias, as方法。

本文系转载,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文系转载,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
SparkSql之DataFrame
为了方便测试,单独把sparkSession 提出去,使用它 Junit的方式进行测试运行。
用户1483438
2022/07/26
7590
SparkSql学习笔记一
1.简介     Spark SQL是Spark用来处理结构化数据的一个模块,它提供了一个编程抽象叫做DataFrame并且作为分布式SQL查询引擎的作用。     为什么要学习Spark SQL?     我们已经学习了Hive,它是将Hive SQL转换成MapReduce然后提交到集群上执行,大大简化了编写MapReduce的程序的复杂性,由于MapReduce这种计算模型执行效率比较慢。所以Spark SQL的应运而生,它是将Spark SQL转换成RDD,然后提交到集群执行,执行效率非常快!同时Spark SQL也支持从Hive中读取数据。 2.特点     *容易整合     *统一的数据访问方式     *兼容Hive     *标准的数据连接 3.基本概念     *DataFrame         DataFrame(表) = schema(表结构) + Data(表结构,RDD)             就是一个表 是SparkSql 对结构化数据的抽象             DataFrame表现形式就是RDD         DataFrame是组织成命名列的数据集。它在概念上等同于关系数据库中的表,但在底层具有更丰富的优化。DataFrames可以从各种来源构建,         DataFrame多了数据的结构信息,即schema。         RDD是分布式的Java对象的集合。DataFrame是分布式的Row对象的集合。         DataFrame除了提供了比RDD更丰富的算子以外,更重要的特点是提升执行效率、减少数据读取以及执行计划的优化     *Datasets         Dataset是数据的分布式集合。Dataset是在Spark 1.6中添加的一个新接口,是DataFrame之上更高一级的抽象。它提供了RDD的优点(强类型化,使用强大的lambda函数的能力)以及Spark SQL优化后的执行引擎的优点。一个Dataset 可以从JVM对象构造,然后使用函数转换(map, flatMap,filter等)去操作。 Dataset API 支持Scala和Java。 Python不支持Dataset API。 4.创建表 DataFrame     方式一 使用case class 定义表         val df = studentRDD.toDF     方式二 使用SparkSession直接生成表         val df = session.createDataFrame(RowRDD,scheme)     方式三 直接读取一个带格式的文件(json文件)         spark.read.json("") 5.视图(虚表)     普通视图         df.createOrReplaceTempView("emp")             只对当前对话有作用     全局视图         df.createGlobalTempView("empG")             在全局(不同会话)有效             前缀:global_temp 6.操作表:     两种语言:SQL,DSL      spark.sql("select * from t ").show     df.select("name").show 
曼路
2018/10/18
8820
SparkSQL
Hive on Spark:Hive既作为存储元数据又负责SQL的解析优化,语法是HQL语法,执行引擎变成了Spark,Spark负责采用RDD执行。
ha_lydms
2023/11/04
4780
SparkSQL
适合小白入门的IDEA开发SparkSQL详细教程
之前博主利用业余时间,梳理了一份《SparkSQL编程系列》,奈何当时考虑不周,写的不是很详细。于是在正式开始学习了之后,决定整理一篇适合像我一样的小白级别都能看得懂的IDEA操作SparkSQL教程,于是就有了下文…
大数据梦想家
2021/01/27
2K0
适合小白入门的IDEA开发SparkSQL详细教程
《SparkSql使用教程》--- 大数据系列
在Spark中,DataFrame是一种以RDD为基础的分布式数据据集,类似于传统数据库听二维表格,DataFrame带有Schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型。
用户3467126
2019/07/03
1K0
SparkSQL快速入门系列(6)
上一篇《SparkCore快速入门系列(5)》,下面给大家更新一篇SparkSQL入门级的讲解。
刘浩的BigDataPath
2021/04/13
2.4K0
SparkSQL快速入门系列(6)
2021年大数据Spark(二十七):SparkSQL案例一花式查询和案例二WordCount
前面使用RDD封装数据,实现词频统计WordCount功能,从Spark 1.0开始,一直到Spark 2.0,建立在RDD之上的一种新的数据结构DataFrame/Dataset发展而来,更好的实现数据处理分析。DataFrame 数据结构相当于给RDD加上约束Schema,知道数据内部结构(字段名称、字段类型),提供两种方式分析处理数据:DataFrame API(DSL编程)和SQL(类似HiveQL编程),下面以WordCount程序为例编程实现,体验DataFrame使用。
Lansonli
2021/10/09
7770
2021年大数据Spark(二十八):SparkSQL案例三电影评分数据分析
     使用电影评分数据进行数据分析,分别使用DSL编程和SQL编程,熟悉数据处理函数及SQL使用,业务需求说明:
Lansonli
2021/10/09
1.5K0
2021年大数据Spark(二十五):SparkSQL的RDD、DF、DS相关操作
Spark 2.0开始,SparkSQL应用程序入口为SparkSession,加载不同数据源的数据,封装到DataFrame/Dataset集合数据结构中,使得编程更加简单,程序运行更加快速高效。
Lansonli
2021/10/09
1.4K0
Spark SQL 快速入门系列(5) | 一文教你如何使用 IDEA 创建 SparkSQL 程序(小白入门!)
一. 添加 SparkSQL 依赖 <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.1.1</version> </dependency> 二. 具体代码 2.1 DataFrameDemo 1. 源码 package com.buwenbuhuo.spark.sql.day01 import org.apache.spark.
不温卜火
2020/10/28
1.2K0
Spark SQL 快速入门系列(5) | 一文教你如何使用 IDEA 创建 SparkSQL 程序(小白入门!)
2021年大数据Spark(三十二):SparkSQL的External DataSource
在SparkSQL模块,提供一套完成API接口,用于方便读写外部数据源的的数据(从Spark 1.4版本提供),框架本身内置外部数据源:
Lansonli
2021/10/09
2.4K0
【Spark重点难点】SparkSQL YYDS(上)!
Spark 社区在 1.3 版本发布了 DataFrame。那么,相比 RDD,DataFrame 到底有何不同呢?
王知无-import_bigdata
2021/12/15
1K0
【Spark重点难点】SparkSQL YYDS(上)!
2021年大数据Spark(三十):SparkSQL自定义UDF函数
     无论Hive还是SparkSQL分析处理数据时,往往需要使用函数,SparkSQL模块本身自带很多实现公共功能的函数,在org.apache.spark.sql.functions中。SparkSQL与Hive一样支持定义函数:UDF和UDAF,尤其是UDF函数在实际项目中使用最为广泛。
Lansonli
2021/10/09
2.3K0
第三天:SparkSQL
Spark SQL是Spark用来处理结构化数据的一个模块,它提供了2个编程抽象:DataFrame和DataSet,并且作为分布式SQL查询引擎的作用。 我们已经学习了Hive,它是将Hive SQL转换成MapReduce然后提交到集群上执行,大大简化了编写MapReduc的程序的复杂性,由于MapReduce这种计算模型执行效率比较慢。所有Spark SQL的应运而生,它是将Spark SQL转换成RDD,然后提交到集群执行,执行效率非常快!
sowhat1412
2020/11/05
13.4K0
第三天:SparkSQL
大数据Kudu(九):Spark操作Kudu
使用SparkSQL操作Kudu,这里需要导入Kudu与SparkSQL整合的包和SparkSQL的包,在Maven中导入如下依赖:
Lansonli
2022/12/21
1.2K0
大数据Kudu(九):Spark操作Kudu
十年磨一剑,SparkSQL来一题!
之前推出过一期关于Spark的练习,反响还不错。而最近博主又写了关于SparkSQL,SparkStreaming,Structured Streaming的内容,为了巩固大家的基础,提升实战的能力,故备下了一道综合性比较全面的题,希望大家能够受用。
大数据梦想家
2021/01/27
9470
十年磨一剑,SparkSQL来一题!
Spark SQL 快速入门系列(2) | SparkSession与DataFrame的简单介绍
  在老的版本中,SparkSQL 提供两种 SQL 查询起始点:一个叫SQLContext,用于Spark 自己提供的 SQL 查询;一个叫 HiveContext,用于连接 Hive 的查询。
不温卜火
2020/10/28
2.3K0
Spark SQL 快速入门系列(2) | SparkSession与DataFrame的简单介绍
【Spark手机流量日志处理】使用SparkSQL按月统计流量使用量最多的用户
其中,spark-sql_2.12是Spark SQL的核心依赖,spark-core_2.12是Spark的核心依赖。注意,版本号可以根据实际情况进行调整。
大数据小禅
2023/03/30
6800
【Spark手机流量日志处理】使用SparkSQL按月统计流量使用量最多的用户
Note_Spark_Day07:Spark SQL(DataFrame是什么和数据分析(案例讲解))
使得Spark SQL得以洞察更多的结构信息,从而对藏于DataFrame背后的数据源以及作用于DataFrame之上的变换进行针对性的优化,最终达到大幅提升运行时效率
Maynor
2021/12/07
2.4K0
Note_Spark_Day07:Spark SQL(DataFrame是什么和数据分析(案例讲解))
Spark Day06:Spark Core之Spark 内核调度和SparkSQL快速入门
ShuffleMapTask要进行Shuffle,ResultTask负责返回计算结果,一个Job中只有最后的Stage采用ResultTask,其他的均为ShuffleMapTask。
Maynor
2022/02/17
8760
Spark Day06:Spark Core之Spark 内核调度和SparkSQL快速入门
推荐阅读
相关推荐
SparkSql之DataFrame
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验