首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Spark Shell笔记

Spark Shell笔记

作者头像
CBeann
发布于 2023-12-25 09:13:42
发布于 2023-12-25 09:13:42
37500
代码可运行
举报
文章被收录于专栏:CBeann的博客CBeann的博客
运行总次数:0
代码可运行

学习感悟

(1)学习一定要敲,感觉很简单,但是也要敲一敲,不要眼高手低

(2)一定要懂函数式编程,一定,一定

(3)shell中的方法在scala写的项目中也会有对应的方法

(4)sc和spark是程序的入口,直接用

SparkShell

启动SparkShell

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
 ./bin/spark-shell

WordCount案例

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
sc.textFile("hdfs://iZm5ea99qngm2v98asii1aZ:9000/README.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).saveAsTextFile("hdfs://iZm5ea99qngm2v98asii1aZ:9000/out")

RDD创建(Shell)

从集合中创建RDD

parallelize和makeRDD

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
val rdd1246 = sc.parallelize(List("a","b","c"))
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
rdd1246.collect
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
val rdd1617=sc.makeRDD(List(1,List(("a","b","c")),(2,List("d","e","f"))))
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
 rdd1617.collect
从外部存储创建RDD

由外部存储系统的数据集创建,包括本地文件系统,还有Hadoop支持的数据集,如HDFSHBase

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
sc.textFile("hdfs://iZm5ea99qngm2v98asii1aZ:9000/README.txt")
从其他RDD转换

常用的Transformation和Action(Shell)

map(func):返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
scala> var rdd1638  = sc.parallelize(1 to 10)

scala> rdd1638.collect

scala> rdd1638.map(_*2).collect

filter(func):返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
scala> var rdd1643 =sc.parallelize(1 to 10)

scala> rdd1643.filter(_>5).collect

flatMap(func):类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素)

注意:func 必须是将一个数据映射为0或多个输出元素

通俗点说:一个数据通过func函数产生的集合压平

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
val rdd3=sc.makeRDD(List("hello1_hello2_hello3","hello4_hello5"))

scala> rdd3.flatMap(_.split("_")).collect

sample(withReplacement, fraction, seed):以指定的随机种子随机抽样出数量为 fraction 的数据,withReplacement 表示是抽 出的数据是否放回,true 为有放回的抽样, false 为无放回的抽样,seed 用于指定随机 数生成器种子。例子从 RDD 中随机且有放 回的抽出 50%的数据,随机种子值为 3(即 可能以 1 2 3 的其中一个起始值)

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
scala> val rdd5 = sc.makeRDD(List(1,2,3,4,5,6,7))

scala> rdd5.sample(false,0.2,3).collect

takeSample:和 Sample 的区别是:takeSample 返回的是最终的结果集合。

union(otherDataset):对源 RDD 和参数 RDD 求并集后返回一个 新的 RDD

intersection(otherDataset):对源 RDD 和参数 RDD 求交集后返回一个 新的 RDD

intersection(otherDataset):对源 RDD 和参数 RDD 求交集后返回一个 新的 RDD

distinct([numTasks])):对源 RDD 进行去重后返回一个新的 RDD. 默认情况下,只有 8 个并行任务来操作, 但是可以传入一个可选的 numTasks 参数 改变它。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
rdd3 = sc.makeRDD(List(1,1,2,3,4,4,5))

rdd3.distinct(2).collect

reduceByKey(func, [numTasks]):在一个(K,V)的 RDD 上调用,返回一个 (K,V)的 RDD,使用指定的 reduce 函数, 将相同 key 的值聚合到一起,reduce 任务 的个数可以通过第二个可选的参数来设置

groupByKey:groupByKey 也是对每个 key 进行操作,但只生成 一个 sequence。

sortByKey([ascending], [numTasks]):在一个(K,V)的 RDD 上调用,K 必须实现 Ordered 接口,返回一个按照 key 进行排序 的(K,V)的 RDD

sortBy(func,[ascending], [numTasks]):与 sortByKey 类似,但是更灵活,可以用 func 先对数据进行处理,按照处理后的数 据比较结果排序。

join(otherDataset, [numTasks]):在类型为(K,V)和(K,W)的 RDD 上调用,返 回一个相同 key 对应的所有元素对在一起 的(K,(V,W))的 RDD

cogroup(otherDataset, [numTasks]):在类型为(K,V)和(K,W)的 RDD 上调用,返 回一个(K,(Iterable,Iterable))类型 的 RDD

cartesian(otherDataset):笛卡尔积

coalesce(numPartitions):缩减分区数,用于大数据集过滤后,提高 小数据集的执行效率。

repartition(numPartitions):根据分区数,从新通过网络随机洗牌所有 数据。

glom:将每一个分区形成一个数组,形成新的 RDD 类型时 RDD[Array[T]]

subtract:计算差的一种函数去除两个 RDD 中相同的 元素,不同的 RDD 将保留下来

mapValues:针对于(K,V)形式的类型只对 V 进行操作

reduce(func):通过 func 函数聚集 RDD 中的所有元素, 这个功能必须是可交换且可并联的

collect():在驱动程序中,以数组的形式返回数据 集的所有元素

count():返回 RDD 的元素个数

first():返回 RDD 的第一个元素(类似于 take(1))

take(n);返回一个由数据集的前 n 个元素组成的 数组

takeOrdered(n):返回前几个的排序

saveAsTextFile(path):将数据集的元素以 textfile 的形式保存 到 HDFS 文件系统或者其他支持的文件 系统,对于每个元素,Spark 将会调用 toString 方法,将它装换为文件中的文 本

saveAsSequenceFile(path):将数据集中的元素以 Hadoop sequencefile 的格式保存到指定的目录 下,可以使 HDFS 或者其他 Hadoop 支 持的文件系统。

saveAsObjectFile(path):用于将 RDD 中的元素序列化成对象, 存储到文件中。

countByKey();针对(K,V)类型的 RDD,返回一个 (K,Int)的 map,表示每一个 key 对应的 元素个数。

数据读取与保存主要方式(Shell)

文本文件输入输出
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
val rdd1 =sc.textFile("hdfs://Master:9000/cbeann/README.txt")
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
 rdd.saveAsTextFile("hdfs://Master:9000/cbeann/README2.txt")
JSON 、CSV文件输入输出(Shell)

先通过文本文件读入,然后通过fastjson等第三方库解析字符串为自定义的类型

先将自定义的类型通过第三方库转换为字符串,在同文本文件的形式保存到RDD中

SequenceFile 文件输入输出(Shell)

SequenceFile 文件是 Hadoop 用来存储二进制形式的 key-value 对而设计的 一种平面文件(Flat File)。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
 val data=sc.parallelize(List((2,"aa"),(3,"bb"),(4,"cc"),(5,"dd"),(6,"ee")))
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
data.saveAsSequenceFile("hdfs://Master:9000/cbeann/seq")
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
val sdata = sc.sequenceFile[Int,String]("hdfs://Master:9000/cbeann/seq/p*")
对象文件输入输出(Shell)
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
val data=sc.parallelize(List((2,"aa"),(3,"bb"),(4,"cc"),(5,"dd"),(6,"ee")))
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
data.saveAsObjectFile("hdfs://master01:9000/objfile")
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
val objrdd:RDD[(Int,String)] = sc.objectFile[(Int,String)]("hdfs://master01:9000/objfile/p*")

Spark SQL(Shell)

启动SparkShell

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
./bin/spark-shell

读取数据,创建DataFrame

我的hdfs上/cbeann/person.json

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
{  "name": "王小二",   "age": 15}
{  "name": "王小三",   "age": 25}
{  "name": "王小四",   "age": 35}
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
 val df = spark.read.json("hdfs://Master:9000/cbeann/person.json")
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
df.show

将数据注册一张表,表名为 people

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
df.createOrReplaceTempView("people")

发送SQL

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
spark.sql("select * from people where age > 16").show

或者

RDD、DataFrame、DataSet之间的转化(Shell)

RDD-》DataFrame
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
val rdd = sc.makeRDD(List(("zhangsan",11),("lisi",13)))
rdd.toDF("name","age").show

或者 

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
val rdd = sc.makeRDD(List(("zhangsan",11),("lsi",12),("wanwu",16)))
case class Person(name:String, age:Int)
 val df =  rdd.map(x=>Person(x._1,x._2.toInt)).toDF
DataFrame-》RDD
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
val rdd1 = df.rdd
RDD-》DataSet
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
val rdd = sc.makeRDD(List(("zhangsan",11),("lsi",12),("wanwu",16)))
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
val ds = rdd.toDS

或者

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
val rdd = sc.makeRDD(List(("zhangsan",11),("lsi",12),("wanwu",16)))
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
case class Person(name:String, age:Int)
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
 rdd.map(x=>Person(x._1,x._2.toInt)).toDS
DataSet-》RDD
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
ds.rdd
DataFrame》DataSet
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
scala> val rdd = sc.makeRDD(List(("zhangsan",11),("lsi",12),("wanwu",16)))

scala> val df = rdd.toDF("name","age")

scala> case class Person(name:String, age:Int)

scala> val ds = df.as[Person]

scala> ds.collect
DataSet-》DataFrame
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
ds.toDF

SparkSQl输入输出(Shell)

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
val personDF= spark.read.format("json").load("hdfs://Master:9000/cbeann/person.json")

等价于 

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
 val personDF1= spark.read.json("hdfs://Master:9000/cbeann/person.json")

相同的用法还有parquet,csv,text,jdbc

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
personDF1.write.format("json").save("hdfs://Master:9000/cbeann/person")

等价于与

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
personDF1.write.json("hdfs://Master:9000/cbeann/person1")

 相同的用法还有parquet,csv,text,jdbc

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022-03-07,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
mysql中的慢查询日志
操作系统版本:CentOS Linux release 7.7.1908 (Core)
用户6715144
2019/12/17
3.6K0
mysql中的慢查询日志
MySQL慢查询日志
和大多数关系型数据库一样,日志文件是MySQL数据库的重要组成部分。MySQL有几种不同的日志文件,通常包括错误日志文件,二进制日志,通用日志,慢查询日志,等等。这些日志可以帮助我们定位mysqld内部发生的事件,数据库性能故障,记录数据的变更历史,用户恢复数据库等等。
端碗吹水
2020/09/23
1.5K0
MySQL慢查询日志
干货|MySQL性能优化的4个小技巧
  MySQL性能优化是一个老生常谈的问题,无论是在实际工作中还是面试中,都不可避免遇到相应的场景,下面博主就总结一些能够帮助大家解决这个问题的小技巧。
IT学习日记
2022/09/13
1.1K0
干货|MySQL性能优化的4个小技巧
MySQL优化之慢日志查询
对于SQL和索引的优化问题,我们会使用explain去分析SQL语句。但是真正的企业级项目有成千上万条SQL,我们不可能从头开始一条一条explain去分析。我们从什么地方可以获取那些运行时间长,耗性能的SQL??
终有救赎
2023/11/02
2520
MySQL优化之慢日志查询
Mysql慢日志查询
当前,如果不是调优需要的话,一般不建议启动该参数,因为开启慢查询日志会对性能造成一定的影响,慢查询日志支持将日志记录到文件中
大忽悠爱学习
2021/11/15
1.4K0
你的哪些SQL慢?看看MySQL慢查询日志吧
在项目里面,多多少少都隐藏着一些执行比较慢的SQL, 不同的开发测试人员在平时使用的过程中多多少少都能够遇到,但是无法立马有时间去排查解决。那么如果有一个文件能够将这些使用过程中比较慢的SQL记录下来,定期去分析排查,那该多美好啊。这种情况MySQL也替我们想到了,它提供了SQL慢查询的日志,本文就分享下如何使用吧。
闻说社
2023/02/22
7000
MySQL性能优化(四)-慢查询
它能记录下所有执行超过longquerytime时间的SQL语句,帮我们找到执行慢的SQL,方便我们对这些SQL进行优化。
码农小胖哥
2019/12/10
9430
MySQL性能优化(四)-慢查询
性能分析之MySQL慢查询日志分析(慢查询日志)
MySQL的慢查询日志是MySQL提供的一种日志记录,他用来记录在MySQL中响应的时间超过阈值的语句,具体指运行时间超过long_query_time(默认是10秒)值的SQL,会被记录到慢查询日志中。
小明爱吃火锅
2023/09/30
1.3K0
MYSQL高级篇-----查询截取分析,锁机制,主从复制
3、show Profile查询SQL在MySQL数据库中的执行细节和生命周期情况。
默 语
2024/11/20
2670
MYSQL高级篇-----查询截取分析,锁机制,主从复制
图文结合带你搞懂MySQL日志之Slow Query Log(慢查询日志)
MySQL 的慢查询日志,用来记录在 MySQL 中响应时间超过阀值的语句,具体指运行时间超过 long_query_time 值的SQL,则会被记录到慢查询日志中。long_query_time 的默认值为10,意思是运行10秒以上(不含10秒)的语句,认为是超出了我们的最大忍耐时间值。
GreatSQL社区
2023/02/22
4.3K0
Mysql性能优化之开启Mysql慢查询日志
查看当前服务器是否开启慢查询: 1、快速办法,运行sql语句show VARIABLES like "%slow%" 2、直接去my.conf中查看。 my.conf中的配置(放在[mysqld]下的下方加入) [mysqld] log-slow-queries = /data/mysql/10-9-138-42-slow.log long_query_time = 1 #单位是秒 log-queries-not-using-indexes 使用sql语句来修改:不能按照my.conf中的项来修改的。修
思梦php
2018/03/09
1K0
Mysql性能优化之开启Mysql慢查询日志
高性能MySQL(二):服务器性能剖析
在他们的技术咨询生涯中,最常碰到的三个性能相关的服务请求是:如何确认服务器是否达到了性能最佳的状态、找出某条语句为什么执行不够快,以及诊断被用户描述成“停顿”、“堆积”或“卡死”的某些间歇性疑难杂症。
看、未来
2021/09/18
8380
MySQL慢查询日志
MySQL 的慢查询日志是MySQL提供的一种日志记录,他用来记录在MySQL中响应时间超过阀值的语句。
万能青年
2019/09/25
1.4K0
MySQL慢查询日志
MySQL慢查询日志总结
MySQL的慢查询日志是MySQL提供的一种日志记录,它用来记录在MySQL中响应时间超过阀值的语句,具体指运行时间超过longquerytime值的SQL,则会被记录到慢查询日志中。longquerytime的默认值为10,意思是运行10S以上的语句。
lyb-geek
2018/12/28
2.1K0
MySQL慢查询日志总结
Mysql慢查询_mysql并发查询慢
MySQL的慢查询日志是MySQL提供的一种日志记录,它用来记录在MySQL中响应时间超过阀值的语句,具体指运行时间超过long_query_time值的SQL,则会被记录到慢查询日志中。long_query_time的默认值为10,意思是运行10S以上的语句。默认情况下,Mysql数据库并不启动慢查询日志,需要我们手动来设置这个参数,当然,如果不是调优需要的话,一般不建议启动该参数,因为开启慢查询日志会或多或少带来一定的性能影响。慢查询日志支持将日志记录写入文件,也支持将日志记录写入数据库表。
全栈程序员站长
2022/11/07
20K0
Mysql慢查询_mysql并发查询慢
MySQL-获取有性能问题SQL的方法_慢查询 & 实时获取
http://www.searchdoc.cn/rdbms/mysql/dev.mysql.com/doc/refman/5.7/en/index.com.coder114.cn.html
小小工匠
2021/08/17
7590
【MySQL高级】MySQL的优化
在应用的的开发过程中,由于初期数据量小,开发人员写 SQL 语句时更重视功能上的实现,但是
陶然同学
2023/02/24
1.2K0
【MySQL高级】MySQL的优化
mysql 慢查询,你操作的对吗?
MySQL 慢查询,全称 慢查询日志 ,它是 MySQL 提供的一种日志记录,用了记录在 MySQL 中响应时间超过阈值的语句。
八点半的Bruce、D
2020/06/09
8110
MySQL优化--查询分析工具以及各种锁
该语法可以理解为:将主查询的数据,放到子查询中做条件验证,根据验证结果(TRUE或FALSE)来决定主查询的数据结果是否得以保留。
shimeath
2020/08/11
7390
【黄啊码】MySQL入门—15、技术老大:写的SQL性能这么差,还好意思说自己五年开发经验?
大家好!我是黄啊码,MySQL的入门篇已经讲到第14个课程了,今天我们继续讲讲大白篇系列——数据库服务器优化流程
黄啊码
2022/11/01
5190
【黄啊码】MySQL入门—15、技术老大:写的SQL性能这么差,还好意思说自己五年开发经验?
推荐阅读
相关推荐
mysql中的慢查询日志
更多 >
交个朋友
加入腾讯云官网粉丝站
蹲全网底价单品 享第一手活动信息
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验