Loading [MathJax]/jax/output/CommonHTML/config.js
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Spark Shell笔记

Spark Shell笔记

作者头像
CBeann
发布于 2023-12-25 09:13:42
发布于 2023-12-25 09:13:42
37500
代码可运行
举报
文章被收录于专栏:CBeann的博客CBeann的博客
运行总次数:0
代码可运行

学习感悟

(1)学习一定要敲,感觉很简单,但是也要敲一敲,不要眼高手低

(2)一定要懂函数式编程,一定,一定

(3)shell中的方法在scala写的项目中也会有对应的方法

(4)sc和spark是程序的入口,直接用

SparkShell

启动SparkShell

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
 ./bin/spark-shell

WordCount案例

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
sc.textFile("hdfs://iZm5ea99qngm2v98asii1aZ:9000/README.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).saveAsTextFile("hdfs://iZm5ea99qngm2v98asii1aZ:9000/out")

RDD创建(Shell)

从集合中创建RDD

parallelize和makeRDD

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
val rdd1246 = sc.parallelize(List("a","b","c"))
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
rdd1246.collect
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
val rdd1617=sc.makeRDD(List(1,List(("a","b","c")),(2,List("d","e","f"))))
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
 rdd1617.collect
从外部存储创建RDD

由外部存储系统的数据集创建,包括本地文件系统,还有Hadoop支持的数据集,如HDFSHBase

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
sc.textFile("hdfs://iZm5ea99qngm2v98asii1aZ:9000/README.txt")
从其他RDD转换

常用的Transformation和Action(Shell)

map(func):返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
scala> var rdd1638  = sc.parallelize(1 to 10)

scala> rdd1638.collect

scala> rdd1638.map(_*2).collect

filter(func):返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
scala> var rdd1643 =sc.parallelize(1 to 10)

scala> rdd1643.filter(_>5).collect

flatMap(func):类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素)

注意:func 必须是将一个数据映射为0或多个输出元素

通俗点说:一个数据通过func函数产生的集合压平

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
val rdd3=sc.makeRDD(List("hello1_hello2_hello3","hello4_hello5"))

scala> rdd3.flatMap(_.split("_")).collect

sample(withReplacement, fraction, seed):以指定的随机种子随机抽样出数量为 fraction 的数据,withReplacement 表示是抽 出的数据是否放回,true 为有放回的抽样, false 为无放回的抽样,seed 用于指定随机 数生成器种子。例子从 RDD 中随机且有放 回的抽出 50%的数据,随机种子值为 3(即 可能以 1 2 3 的其中一个起始值)

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
scala> val rdd5 = sc.makeRDD(List(1,2,3,4,5,6,7))

scala> rdd5.sample(false,0.2,3).collect

takeSample:和 Sample 的区别是:takeSample 返回的是最终的结果集合。

union(otherDataset):对源 RDD 和参数 RDD 求并集后返回一个 新的 RDD

intersection(otherDataset):对源 RDD 和参数 RDD 求交集后返回一个 新的 RDD

intersection(otherDataset):对源 RDD 和参数 RDD 求交集后返回一个 新的 RDD

distinct([numTasks])):对源 RDD 进行去重后返回一个新的 RDD. 默认情况下,只有 8 个并行任务来操作, 但是可以传入一个可选的 numTasks 参数 改变它。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
rdd3 = sc.makeRDD(List(1,1,2,3,4,4,5))

rdd3.distinct(2).collect

reduceByKey(func, [numTasks]):在一个(K,V)的 RDD 上调用,返回一个 (K,V)的 RDD,使用指定的 reduce 函数, 将相同 key 的值聚合到一起,reduce 任务 的个数可以通过第二个可选的参数来设置

groupByKey:groupByKey 也是对每个 key 进行操作,但只生成 一个 sequence。

sortByKey([ascending], [numTasks]):在一个(K,V)的 RDD 上调用,K 必须实现 Ordered 接口,返回一个按照 key 进行排序 的(K,V)的 RDD

sortBy(func,[ascending], [numTasks]):与 sortByKey 类似,但是更灵活,可以用 func 先对数据进行处理,按照处理后的数 据比较结果排序。

join(otherDataset, [numTasks]):在类型为(K,V)和(K,W)的 RDD 上调用,返 回一个相同 key 对应的所有元素对在一起 的(K,(V,W))的 RDD

cogroup(otherDataset, [numTasks]):在类型为(K,V)和(K,W)的 RDD 上调用,返 回一个(K,(Iterable,Iterable))类型 的 RDD

cartesian(otherDataset):笛卡尔积

coalesce(numPartitions):缩减分区数,用于大数据集过滤后,提高 小数据集的执行效率。

repartition(numPartitions):根据分区数,从新通过网络随机洗牌所有 数据。

glom:将每一个分区形成一个数组,形成新的 RDD 类型时 RDD[Array[T]]

subtract:计算差的一种函数去除两个 RDD 中相同的 元素,不同的 RDD 将保留下来

mapValues:针对于(K,V)形式的类型只对 V 进行操作

reduce(func):通过 func 函数聚集 RDD 中的所有元素, 这个功能必须是可交换且可并联的

collect():在驱动程序中,以数组的形式返回数据 集的所有元素

count():返回 RDD 的元素个数

first():返回 RDD 的第一个元素(类似于 take(1))

take(n);返回一个由数据集的前 n 个元素组成的 数组

takeOrdered(n):返回前几个的排序

saveAsTextFile(path):将数据集的元素以 textfile 的形式保存 到 HDFS 文件系统或者其他支持的文件 系统,对于每个元素,Spark 将会调用 toString 方法,将它装换为文件中的文 本

saveAsSequenceFile(path):将数据集中的元素以 Hadoop sequencefile 的格式保存到指定的目录 下,可以使 HDFS 或者其他 Hadoop 支 持的文件系统。

saveAsObjectFile(path):用于将 RDD 中的元素序列化成对象, 存储到文件中。

countByKey();针对(K,V)类型的 RDD,返回一个 (K,Int)的 map,表示每一个 key 对应的 元素个数。

数据读取与保存主要方式(Shell)

文本文件输入输出
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
val rdd1 =sc.textFile("hdfs://Master:9000/cbeann/README.txt")
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
 rdd.saveAsTextFile("hdfs://Master:9000/cbeann/README2.txt")
JSON 、CSV文件输入输出(Shell)

先通过文本文件读入,然后通过fastjson等第三方库解析字符串为自定义的类型

先将自定义的类型通过第三方库转换为字符串,在同文本文件的形式保存到RDD中

SequenceFile 文件输入输出(Shell)

SequenceFile 文件是 Hadoop 用来存储二进制形式的 key-value 对而设计的 一种平面文件(Flat File)。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
 val data=sc.parallelize(List((2,"aa"),(3,"bb"),(4,"cc"),(5,"dd"),(6,"ee")))
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
data.saveAsSequenceFile("hdfs://Master:9000/cbeann/seq")
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
val sdata = sc.sequenceFile[Int,String]("hdfs://Master:9000/cbeann/seq/p*")
对象文件输入输出(Shell)
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
val data=sc.parallelize(List((2,"aa"),(3,"bb"),(4,"cc"),(5,"dd"),(6,"ee")))
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
data.saveAsObjectFile("hdfs://master01:9000/objfile")
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
val objrdd:RDD[(Int,String)] = sc.objectFile[(Int,String)]("hdfs://master01:9000/objfile/p*")

Spark SQL(Shell)

启动SparkShell

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
./bin/spark-shell

读取数据,创建DataFrame

我的hdfs上/cbeann/person.json

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
{  "name": "王小二",   "age": 15}
{  "name": "王小三",   "age": 25}
{  "name": "王小四",   "age": 35}
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
 val df = spark.read.json("hdfs://Master:9000/cbeann/person.json")
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
df.show

将数据注册一张表,表名为 people

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
df.createOrReplaceTempView("people")

发送SQL

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
spark.sql("select * from people where age > 16").show

或者

RDD、DataFrame、DataSet之间的转化(Shell)

RDD-》DataFrame
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
val rdd = sc.makeRDD(List(("zhangsan",11),("lisi",13)))
rdd.toDF("name","age").show

或者 

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
val rdd = sc.makeRDD(List(("zhangsan",11),("lsi",12),("wanwu",16)))
case class Person(name:String, age:Int)
 val df =  rdd.map(x=>Person(x._1,x._2.toInt)).toDF
DataFrame-》RDD
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
val rdd1 = df.rdd
RDD-》DataSet
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
val rdd = sc.makeRDD(List(("zhangsan",11),("lsi",12),("wanwu",16)))
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
val ds = rdd.toDS

或者

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
val rdd = sc.makeRDD(List(("zhangsan",11),("lsi",12),("wanwu",16)))
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
case class Person(name:String, age:Int)
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
 rdd.map(x=>Person(x._1,x._2.toInt)).toDS
DataSet-》RDD
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
ds.rdd
DataFrame》DataSet
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
scala> val rdd = sc.makeRDD(List(("zhangsan",11),("lsi",12),("wanwu",16)))

scala> val df = rdd.toDF("name","age")

scala> case class Person(name:String, age:Int)

scala> val ds = df.as[Person]

scala> ds.collect
DataSet-》DataFrame
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
ds.toDF

SparkSQl输入输出(Shell)

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
val personDF= spark.read.format("json").load("hdfs://Master:9000/cbeann/person.json")

等价于 

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
 val personDF1= spark.read.json("hdfs://Master:9000/cbeann/person.json")

相同的用法还有parquet,csv,text,jdbc

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
personDF1.write.format("json").save("hdfs://Master:9000/cbeann/person")

等价于与

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
personDF1.write.json("hdfs://Master:9000/cbeann/person1")

 相同的用法还有parquet,csv,text,jdbc

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022-03-07,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
BigData--大数据分析引擎Spark
(1)zeroValue:给每一个分区中的每一个key一个初始值; (2)seqOp:函数用于在每一个分区中用初始值逐步迭代value; (3)combOp:函数用于合并每个分区中的结果。
MiChong
2020/09/24
1.1K0
BigData--大数据分析引擎Spark
Spark RDD 操作详解——Transformations
Spark RDD 支持2种类型的操作: transformations 和 actions。transformations: 从已经存在的数据集中创建一个新的数据集,如 map。actions: 数据集上进行计算之后返回一个值,如 reduce。
李振
2021/11/26
8240
大数据技术之_19_Spark学习_02_Spark Core 应用解析+ RDD 概念 + RDD 编程 + 键值对 RDD + 数据读取与保存主要方式 + RDD 编程进阶 + Spark Cor
  我们需要一个效率非常快,且能够支持迭代计算和有效数据共享的模型,Spark 应运而生。RDD 是基于工作集的工作模式,更多的是面向工作流。   但是无论是 MR 还是 RDD 都应该具有类似位置感知、容错和负载均衡等特性。
黑泽君
2019/05/10
2.6K0
大数据技术之_19_Spark学习_02_Spark Core 应用解析+ RDD 概念 + RDD 编程 + 键值对 RDD + 数据读取与保存主要方式 + RDD 编程进阶 + Spark Cor
原 荐 Spark框架核心概念
Spark框架核心概念     首先介绍Spark中的核心名词概念,然后再逐一详细说明。 RDD:弹性分布式数据集,是Spark最核心的数据结构。有分区机制,所以可以分布式进行处理。有容错机制,通过RDD之间的依赖关系来恢复数据。 依赖关系:RDD的依赖关系是通过各种Transformation(变换)来得到的。父RDD和子RDD之间的依赖关系分两种:①窄依赖②宽依赖。     ①窄依赖:父RDD的分区和子RDD的分区关系是:一对一。     窄依赖不会发生Shuffle,执行效率高,spark框架底层
云飞扬
2018/05/17
1.5K0
Spark常用的算子以及Scala函数总结
上海站 | 高性能计算之GPU CUDA培训 4月13-15日 三天密集式学习 快速带你晋级 阅读全文 > 正文共11264个字,7张图,预计阅读时间28分钟。 Spark与Scala 首先,介绍一
用户1332428
2018/04/17
2K0
Spark常用的算子以及Scala函数总结
Spark的常用算子大总结
作用:返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成 2. 需求:创建一个1-10数组的RDD,将所有元素2形成新的RDD (1)创建 scala> var source = sc.parallelize(1 to 10) source: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at :24 (2)打印 scala> source.collect() res7: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) (3)将所有元素2 scala> val mapadd = source.map(_ * 2) mapadd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[9] at map at :26 (4)打印最终结果 scala> mapadd.collect() res8: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18, 20)
Maynor
2021/12/07
5250
Spark算子总结
由于计算过程是在内存进行,然后spill出来,每到达一个checkpoint就会将内存中的数据写入到磁盘,这个功能就是手动使其到达checkpoint
vincentbbli
2021/08/18
9510
SparkCore快速入门系列(5)
铁铁们,博主前段时间在做一些项目加上找工作所以到现在才更新,(__) 嘻嘻…… 博主现在已经工作啦,后期会给你们更新一些关于数据库以及报表开发的文章哦! 接下来言归正传!!!!!!
刘浩的BigDataPath
2021/04/13
4220
SparkCore快速入门系列(5)
惊了!10万字的Spark全文!
Hello,大家好,这里是857技术社区,我是社区创始人之一,以后会持续给大家更新大数据各组件的合集内容,路过给个关注吧!!!
刘浩的BigDataPath
2022/11/30
1.6K0
Spark Core快速入门系列(3) | <Transformation>转换算子
  从一个已知的 RDD 中创建出来一个新的 RDD 例如: map就是一个transformation.
不温卜火
2020/10/28
2K0
Spark Core快速入门系列(3) | <Transformation>转换算子
Spark之【RDD编程】详细讲解(No3)——《Action行动算子》
本篇博客是Spark之【RDD编程】系列第三篇,为大家带来的是Action的内容。
大数据梦想家
2021/01/27
3750
Spark之【RDD编程】详细讲解(No3)——《Action行动算子》
Spark Core快速入门系列(4) | <Action> 行动算子转换算子
  返回一个由RDD的前n个元素组成的数组   take 的数据也会拉到 driver 端, 应该只对小数据集使用
不温卜火
2020/10/28
5550
Spark之【RDD编程】详细讲解(No2)——《Transformation转换算子》
本篇博客是Spark之【RDD编程】系列第二篇,为大家带来的是RDD的转换的内容。
大数据梦想家
2021/01/27
2K0
Spark之【RDD编程】详细讲解(No2)——《Transformation转换算子》
[大数据之Spark]——Transformations转换入门经典实例
Spark相比于Mapreduce的一大优势就是提供了很多的方法,可以直接使用;另一个优势就是执行速度快,这要得益于DAG的调度,想要理解这个调度规则,还要理解函数之间的依赖关系。 本篇就着重描述
用户1154259
2018/01/17
1.2K0
[大数据之Spark]——Transformations转换入门经典实例
Spark Core 学习笔记
1:Spark Core:内核,也是Spark中最重要的部分,相当于Mapreduce                 SparkCore 和 Mapreduce都是进行离线数据分析                 SparkCore的核心:RDD(弹性分布式数据集),由分区组成 2:Spark Sql:相当于Hive                 支持Sql和DSL语句 -》Spark任务(RDD)-》运行
曼路
2018/10/18
2.2K0
Spark2.3.0 RDD操作
例如,map 是一个转换操作,传递给每个数据集元素一个函数并返回一个新 RDD 表示返回结果。另一方面,reduce 是一个动作操作,使用一些函数聚合 RDD 的所有元素并将最终结果返回给驱动程序(尽管还有一个并行的 reduceByKey 返回一个分布式数据集)。
smartsi
2019/08/07
2.5K0
上万字详解Spark Core(好文建议收藏)
Spark 产生之前,已经有MapReduce这类非常成熟的计算系统存在了,并提供了高层次的API(map/reduce),把计算运行在集群中并提供容错能力,从而实现分布式计算。
五分钟学大数据
2021/04/02
7980
Spark核心RDD、什么是RDD、RDD的属性、创建RDD、RDD的依赖以及缓存、
1:什么是Spark的RDD??? RDD(Resilient Distributed Dataset)叫做分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合。RDD具有数据流模型的特点:自动容错、位置感知性调度和可伸缩性。RDD允许用户在执行多个查询时显式地将工作集缓存在内存中,后续的查询能够重用工作集,这极大地提升了查询速度。 2:RDD的属性: a、一组分片(Partition),即数据集的基本组成单位。对于RDD来说,每个分片都会被一个计算任务处理,
别先生
2018/04/02
1.3K0
Spark核心RDD、什么是RDD、RDD的属性、创建RDD、RDD的依赖以及缓存、
Spark开发指南
总的来说,每一个Spark的应用,都是由一个驱动程序(driver program)构成,它运行用户的main函数,在一个集群上执行各种各样的并行操作。Spark提出的最主要抽象概念是弹性分布式数据集 (resilient distributed dataset,RDD),它是元素的集合,划分到集群的各个节点上,可以被并行操作。RDDs的创建可以从HDFS(或者任意其他支持Hadoop文件系统) 上的一个文件开始,或者通过转换驱动程序(driver program)中已存在的Scala集合而来。用户也可以让Spark保留一个RDD在内存中,使其能在并行操作中被有效的重复使用。最后,RDD能自动从节点故障中恢复。
幽鸿
2020/04/02
2.1K0
Apache Spark 2.2.0 中文文档 - Spark 编程指南 | ApacheCN
本文介绍了 Apache Spark 的 RDD 程序设计指南,从 RDD 的基本概念、创建与操作、缓存与存储、性能优化等方面进行了详细阐述,并提供了丰富的实例和代码以帮助读者更好地理解和掌握 RDD 的使用方法。
片刻
2018/01/05
1.8K0
Apache Spark 2.2.0 中文文档 - Spark 编程指南 | ApacheCN
相关推荐
BigData--大数据分析引擎Spark
更多 >
LV.1
这个人很懒,什么都没有留下~
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验