首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >PySpark|RDD编程基础

PySpark|RDD编程基础

作者头像
数据山谷
发布2020-11-06 16:57:26
发布2020-11-06 16:57:26
88000
代码可运行
举报
文章被收录于专栏:数据山谷数据山谷
运行总次数:0
代码可运行

01 RDD(弹性分布式数据集)

RDD是Spark中最基本的数据抽象,其实就是分布式的元素集合。RDD有三个基本的特性:分区、不可变、并行操作

分区:每一个 RDD 包含的数据被存储在系统的不同节点上。逻辑上我们可以将 RDD 理解成一个大的数组,数组中的每个元素就代表一个分区 (Partition) 。

不可变:不可变性是指每个 RDD 都是只读的,它所包含的分区信息是不可变的。由于已有的 RDD 是不可变的,所以我们只有对现有的 RDD 进行转化 (Transformation) 操作,才能得到新的 RDD ,一步一步的计算出我们想要的结果。

并行操作:因为 RDD 的分区特性,所以其天然支持并行处理的特性。即不同节点上的数据可以分别被处理,然后生成一个新的 RDD。

02 RDD创建

在Pyspark中我们可以通过两种方式来进行RDD的创建,RDD是一种无schema的数据结构,所以我们几乎可以混合使用任何类型的数据结构:tuple、dict、list都可以使用。

  • parallelize()

直接使用数据容器创建RDD。

代码语言:javascript
代码运行次数:0
运行
复制
data = sc.parallelize([('Amber', 22), ('Alfred', 23), ('Skye', 4),
                       ('Albert', 12), ('Amber', 9)])
  • textFile()

引用位于本地或者外部的某个文件(或者多个文件)。

代码语言:javascript
代码运行次数:0
运行
复制
data_from_file = sc.\
    textFile(
        'xxxxx',
        4)

03 RDD转换

我们可以通过转换操作来进行数据集的调整,包括映射、筛选、连接、转换数据集中的值等操作。

  • map()

和python中的map映射相同,经常配合lambda使用。

代码语言:javascript
代码运行次数:0
运行
复制
data_2020 = data_from_file_conv.map(lambda row: int(row[16]))
  • filter()

从数据集中选择元素,该元素符合特定的标准。

代码语言:javascript
代码运行次数:0
运行
复制
data_filtered = data_from_file_conv.filter(lambda row: row[5] == 'F' and row[21] == '0')
  • flatMap()

和map()相似,但是返回一个扁平的列表(可以过滤一些格式不正确的记录)。

代码语言:javascript
代码运行次数:0
运行
复制
data_2014_flat = data_from_file_conv.flatMap(lambda row: (row[16], int(row[16]) + 1))
  • distinct()

返回指定列中不同值的列表。

代码语言:javascript
代码运行次数:0
运行
复制
distinct_gender = data_from_file_conv.map(lambda row: row[5]).distinct().collect()
  • sample()

返回数据集的随机样本:

参数1:指定采样是否应该替换;

参数2:定义返回数据的分数(百分之多少);

参数3:随机种子。

代码语言:javascript
代码运行次数:0
运行
复制
fraction = 0.1
data_sample = data_from_file_conv.sample(False, fraction, 666)
  • leftOuterJoin()

左链接。

代码语言:javascript
代码运行次数:0
运行
复制
rdd1 = sc.parallelize([('a', 1), ('b', 4), ('c',10)])
rdd2 = sc.parallelize([('a', 4), ('a', 1), ('b', '6'), ('d', 15)])

rdd3 = rdd1.leftOuterJoin(rdd2)

只留下能够关联的内容。

代码语言:javascript
代码运行次数:0
运行
复制
rdd4 = rdd1.join(rdd2)
  • intersection()

返回两个RDD中相等的记录

代码语言:javascript
代码运行次数:0
运行
复制
rdd5 = rdd1.intersection(rdd2)
  • repartition()

重新对数据进行分区。

代码语言:javascript
代码运行次数:0
运行
复制
rdd1 = rdd1.repartition(4)

04 RDD操作

和上面的转换不同,操作执行数据集上的计划任务。

  • take()

返回单个数据分区的前n行。

代码语言:javascript
代码运行次数:0
运行
复制
data_first = data_from_file_conv.take(1)
data_first
  • collect()

将所有RDD的元素返回给驱动程序。

代码语言:javascript
代码运行次数:0
运行
复制
rdd5.collect()
  • reduce()

使用指定的方法减少RDD中的元素。

代码语言:javascript
代码运行次数:0
运行
复制
rdd1.map(lambda row: row[1]).reduce(lambda x, y: x + y)
  • count()

统计RDD中元素的个数。

代码语言:javascript
代码运行次数:0
运行
复制
data_reduce.count()
  • countByKey()

获取不同键的计数。

代码语言:javascript
代码运行次数:0
运行
复制
data_key.countByKey().items()
  • saveAsTextFile

让RDD保存为文本文件。

代码语言:javascript
代码运行次数:0
运行
复制
data_key.saveAsTextFile('xxx')
  • foreach()

对RDD中的每个元素,使用迭代的方式应用相同的函数。

代码语言:javascript
代码运行次数:0
运行
复制
def f(x): 
    print(x)

data_key.foreach(f)

05 总结

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-10-27,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 数据山谷 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 01 RDD(弹性分布式数据集)
  • 02 RDD创建
  • 03 RDD转换
  • 04 RDD操作
  • 05 总结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档