Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >数据读取与保存

数据读取与保存

作者头像
用户1483438
发布于 2022-05-09 07:41:00
发布于 2022-05-09 07:41:00
1.1K00
代码可运行
举报
文章被收录于专栏:大数据共享大数据共享
运行总次数:0
代码可运行

摘要

Spark的数据读取及数据保存可以从两个维度来作区分:文件格式以及文件系统

文件格式分为:

  • Text文件
  • Json文件
  • Csv文件
  • Sequence文件
  • Object文件;

文件系统分为:

文件类数据读取与保存

Text文件 基本语法:

  • 数据读取:textFile(String)
  • 数据保存:saveAsTextFile(String)

案例演示:经典的worldCount程序,并将程序计算结果写入到本地文件中

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
  @Test
  def textTest(): Unit ={
    // 创建sc
    val conf =new SparkConf().setMaster("local[4]").setAppName("test")
    val sc=new SparkContext(conf)

    // 读取文件
    val rdd1: RDD[String] =sc.textFile(path = "file:///C:/Users/123456/Desktop/worldCount.txt",minPartitions = 4)

    // 数据扁平化,
    val rdd2: RDD[String] =rdd1.flatMap(e=>e.split(" "))

    // 映射
    val rdd3: RDD[(String, Int)] =rdd2.map(e=>(e,1))

    // 计算单词个数
    val rdd4: RDD[(String, Int)] = rdd3.reduceByKey((v1, v2) => v1 + v2)


    //将数据写入目录中,该目录不能存在
    rdd4.saveAsTextFile("file:///C:/Users/123456/Desktop/worldCount_0001")

    // 关闭资源;养成良好编码习惯
    sc.stop()

  }

worldCount_0001 是一个目录,并且不能存在

程序结果
程序结果

就像跑了一个MR,将数据按照分区存入不同的目录中。

Sequence文件 SequenceFile文件是Hadoop用来存储二进制形式的key-value对而设计的一种平面文件(Flat File)。在SparkContext中,可以调用sequenceFile[keyClass, valueClass](path)。

案例演示: 保存Sequence文件

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
 @Test
  def sequenceWriteTest(): Unit ={
    // 创建sc
    val conf =new SparkConf().setMaster("local[4]").setAppName("test")
    val sc=new SparkContext(conf)

    // 读取文件
    val rdd1: RDD[String] =sc.textFile(path = "file:///C:/Users/123456/Desktop/worldCount.txt",minPartitions = 4)

    // 数据扁平化,
    val rdd2: RDD[String] =rdd1.flatMap(e=>e.split(" "))

    // 映射
    val rdd3: RDD[(String, Int)] =rdd2.map(e=>(e,1))

    // 计算单词个数
    val rdd4: RDD[(String, Int)] = rdd3.reduceByKey((v1, v2) => v1 + v2)


    //将数据写入目录中,该目录不能存在
    rdd4.saveAsSequenceFile("file:///C:/Users/123456/Desktop/worldCount_0003")

    // 关闭资源;养成良好编码习惯
    sc.stop()

  }

读取Sequence文件

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
  @Test
  def sequenceReadTest(): Unit ={
    // 创建sc
    val conf =new SparkConf().setMaster("local[4]").setAppName("test")
    val sc=new SparkContext(conf)

    // 读取文件
    val rdd1: RDD[(String, Int)] =sc.sequenceFile[String,Int](path = "file:///C:/Users/123456/Desktop/worldCount_0003",minPartitions = 4)
    //打印
    rdd1.foreach(e=>{
      println(e)
    })

    // 关闭资源;养成良好编码习惯
    sc.stop()
  }

打印结果

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
(python,1)
(shell,4)
(wahaha,1)
(java,5)
(hello,2)

注意: sc.sequenceFile[String,Int] 需要指定返回参数类型 。

Object对象文件 对象文件是将对象序列化后保存的文件,采用Java的序列化机制。可以通过objectFile[k,v](path)函数接收一个路径,读取对象文件,返回对应的RDD,也可以通过调用saveAsObjectFile()实现对对象文件的输出。因为是序列化所以要指定类型。

案例演示 将数据保存成Object文件

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
  @Test
  def ObjectWriteTest(): Unit ={
    // 创建sc
    val conf =new SparkConf().setMaster("local[4]").setAppName("test")
    val sc=new SparkContext(conf)

    // 读取文件
    val rdd1: RDD[String] =sc.textFile(path = "file:///C:/Users/123456/Desktop/worldCount.txt",minPartitions = 4)

    // 数据扁平化,
    val rdd2: RDD[String] =rdd1.flatMap(e=>e.split(" "))

    // 映射
    val rdd3: RDD[(String, Int)] =rdd2.map(e=>(e,1))

    // 计算单词个数
    val rdd4: RDD[(String, Int)] = rdd3.reduceByKey((v1, v2) => v1 + v2)


    //将数据写入目录中,该目录不能存在
    rdd4.saveAsObjectFile("file:///C:/Users/123456/Desktop/worldCount_0002")

    // 关闭资源;养成良好编码习惯
    sc.stop()

  }

读取 Object 文件

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
  @Test
  def ObjectReadTest(): Unit ={
    // 创建sc
    val conf =new SparkConf().setMaster("local[4]").setAppName("test")
    val sc=new SparkContext(conf)

    // 读取对象文件
    // sc.objectFile[(String,Int)] 需要指定数据类型,写入进去的是一个元组,读取的时候应该也元组的形式返回
    val rdd1=sc.objectFile[(String,Int)](path = "file:///C:/Users/123456/Desktop/worldCount_0002",minPartitions = 4)

    //打印
    rdd1.foreach(e=>{
      println(e)
    })

    // 关闭资源;养成良好编码习惯
    sc.stop()
  }

结果

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
(python,1)
(wahaha,1)
(shell,4)
(hello,2)
(java,5)

注意: sc.objectFile[(String,Int)] 必须指定数据类型

文件系统类数据读取与保存 Spark的整个生态系统与Hadoop是完全兼容的,所以对于Hadoop所支持的文件类型或者数据库类型,Spark也同样支持。另外,由于Hadoop的API有新旧两个版本,所以Spark为了能够兼容Hadoop所有的版本,也提供了两套创建操作接口。如TextInputFormat,新旧两个版本所引用分别是org.apache.hadoop.mapred.InputFormat、org.apache.hadoop.mapreduce.InputFormat(NewInputFormat)

本文系转载,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文系转载,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
Action行动算子
在spark中,有两种算子,Transformation转换算子和 Action行动算子。Transformation转换算子在整个job任务中,都是一个懒加载,只有真正执行了 Action行动算子的时候,整个job任务才会得到正在的运行。 可以把Transformation转换算子理解成工厂中的流水线, Action行动算子相当于总闸,只有拉下总闸,整条流水线便开始了运行。
用户1483438
2022/05/09
9310
SparkCore 编程
2.创建一个数组,根据数据创建一个Bean对象,继承Order,实现序列化(Serializable).从而对数组进行排序。
曼路
2018/10/18
2800
Spark Core快速入门系列(11) | 文件中数据的读取和保存
  从文件中读取数据是创建 RDD 的一种方式.   把数据保存的文件中的操作是一种 Action.   Spark 的数据读取及数据保存可以从两个维度来作区分:文件格式以及文件系统。   文件格式分为:Text文件、Json文件、csv文件、Sequence文件以及Object文件;   文件系统分为:本地文件系统、HDFS、Hbase 以及 数据库。   平时用的比较多的就是: 从 HDFS 读取和保存 Text 文件.
不温卜火
2020/10/28
2.1K0
Spark Core快速入门系列(11) |  文件中数据的读取和保存
Spark之【数据读取与保存】详细说明
Spark的数据读取及数据保存可以从两个维度来作区分:文件格式以及文件系统。文件格式分为:Text文件、Json文件、Csv文件、Sequence文件以及Object文件;文件系统分为:本地文件系统、HDFS、HBASE以及数据库。
大数据梦想家
2021/01/27
1.7K0
Spark之【数据读取与保存】详细说明
RDD持久化
所谓的持久化,就是将数据进行保存,避免数据丢失。RDD持久化并非将数据落盘保存,而是用作缓存。 了解RDD持久化前需要先了解什么是RDD?
用户1483438
2022/04/28
6900
Spark2.0学习(一)--------Spark简介
Apache Spark™ is a unified analytics engine for large-scale data processing
大数据流动
2019/08/08
7540
SparkCore快速入门系列(5)
铁铁们,博主前段时间在做一些项目加上找工作所以到现在才更新,(__) 嘻嘻…… 博主现在已经工作啦,后期会给你们更新一些关于数据库以及报表开发的文章哦! 接下来言归正传!!!!!!
刘浩的BigDataPath
2021/04/13
3870
SparkCore快速入门系列(5)
PySpark基础
PySpark,作为 Apache Spark 的 Python API,使得处理和分析大数据变得更加高效且易于访问。本章详细讲解了PySpark 的基本概念和架构以及据的输入与输出操作。
Heaven645
2024/08/12
3151
PySpark基础
SparkCore之RDD
https://blog.csdn.net/zym1117/article/details/79532458
用户1483438
2022/04/26
6830
记一次使用Spark算子之用top()求Top N遇到的问题!
需求:使用spark统计词频,并求出现次数最多的10个词以及出现次数 问题:用Spark算子top(),求top N的时候结果不准确 我们用一首被初中生唱收费的《That girl》来做测试:
Spark学习技巧
2018/08/01
1.7K0
记一次使用Spark算子之用top()求Top N遇到的问题!
Spark2.x学习笔记:9、 Spark编程实例
程裕强
2018/01/02
1.1K0
Spark2.x学习笔记:9、 Spark编程实例
Spark-2.WordCount的4种写法
搭好的Spark当然要先写一个最简单的WordCount练练手。 那么,需求是: 1、统计Spark下README.md文件的词频; 2、输出较多,筛选出现次数超过10次的,词频逆序显示
悠扬前奏
2019/05/30
1.5K0
Python大数据之PySpark(六)RDD的操作
from pyspark import SparkConf, SparkContext import re
Maynor
2023/10/08
3850
Python大数据之PySpark(六)RDD的操作
2021年大数据Spark(十三):Spark Core的RDD创建
官方文档:http://spark.apache.org/docs/latest/rdd-programming-guide.html#resilient-distributed-datasets-rdds
Lansonli
2021/10/09
5510
【Spark篇】---Spark中transformations算子二
coalesce常用来减少分区,第二个参数是减少分区的过程中是否产生shuffle。
LhWorld哥陪你聊算法
2018/09/13
1K0
【Spark篇】---Spark中transformations算子二
2021年大数据Spark(十七):Spark Core的RDD持久化
在实际开发中某些RDD的计算或转换可能会比较耗费时间,如果这些RDD后续还会频繁的被使用到,那么可以将这些RDD进行持久化/缓存,这样下次再使用到的时候就不用再重新计算了,提高了程序运行的效率。
Lansonli
2021/10/09
4050
Spark累加器(Accumulator)
累加器:分布式共享只写变量。(Executor和Executor之间不能读数据) 累加器用来把Executor端变量信息聚合到Driver端。在Driver程序中定义的变量,在Executor端的每个task都会得到这个变量的一份新的副本,每个task更新这些副本的值后,传回Driver端进行merge。
用户1483438
2022/07/26
1.8K0
spark作业-源码分析
b.默认分区器,对于第一个join会返回一个以电脑core总数为分区数量的HashPartitioner.第二个join会返回我们设定的HashPartitioner(分区数目3)
用户2337871
2021/12/28
2750
spark作业-源码分析
Spark案例库V1.0版
基于Spark框架使用Scala语言编程实现词频统计WordCount程序,将符号数据过滤,并统计出现的次数
Maynor
2021/12/07
1.2K0
【Spark篇】---Spark中Action算子
Action类算子也是一类算子(函数)叫做行动算子,如foreach,collect,count等。Transformations类算子是延迟执行,Action类算子是触发执行。一个application应用程序(就是我们编写的一个应用程序)中有几个Action类算子执行,就有几个job运行。
LhWorld哥陪你聊算法
2018/09/13
1K0
【Spark篇】---Spark中Action算子
相关推荐
Action行动算子
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档