Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >PySpark基础

PySpark基础

原创
作者头像
Heaven645
修改于 2024-08-12 16:25:40
修改于 2024-08-12 16:25:40
32500
代码可运行
举报
文章被收录于专栏:Python学习Python学习
运行总次数:0
代码可运行

前言

PySpark,作为 Apache Spark 的 Python API,使得处理和分析大数据变得更加高效且易于访问。本章详细讲解了PySpark 的基本概念和架构以及据的输入与输出操作。

一、PySpark入门

①定义

Apache Spark 是一个用于大规模数据处理的统一分析引擎。简单来说,Spark 是一款分布式计算框架,能够调度成百上千的服务器集群,以处理 TB、PB 乃至 EB 级别的海量数据。

作为全球顶级的分布式计算框架,Spark 支持多种编程语言进行开发,其中 Python 语言是 Spark 特别支持的重点方向。

Spark 对 Python 的支持主要体现在第三方库 PySpark 上。PySpark 是由Spark 官方开发的一款 Python 库,允许开发者使用 Python 代码完成 Spark 任务。

PySpark 不仅可以作为独立的 Python 库使用,还能将程序提交到 Spark 集群进行大规模的数据处理。

Python 的应用场景和就业方向相当广泛,其中大数据开发和人工智能是最为突出的方向。

②安装PySpark库

电脑输入Win+R打开运行窗口→在运行窗口输入“cmd”→点击“确定”→输入pip install pyspark

③编程模型

PySpark 的编程流程主要分为以下三个步骤:

准备数据到RDD → RDD迭代计算 → RDD导出为列表、元组、字典、文本文件或数据库等。

  • 数据输入:通过 SparkContext 对象读取数据
  • 数据计算:将读取的数据转换为 RDD 对象,并调用 RDD 的成员方法进行迭代计算
  • 数据输出:通过 RDD 对象的相关方法将结果输出到列表、元组、字典、文本文件或数据库等

④构建PySpark执行环境入口对象

SparkContext是PySpark的入口点,负责与 Spark 集群的连接,并提供了创建 RDD(弹性分布式数据集)的接口。

要使用 PySpark 库完成数据处理,首先需要构建一个执行环境的入口对象,该对象是 SparkContext 类的实例。创建 SparkContext 对象后,便可开始进行数据处理和分析。

代码语言:python
代码运行次数:0
运行
AI代码解释
复制
# 导包
# SparkConf:用于配置Spark应用的参数
# SparkContext:用于连接到Spark集群的入口点,负责协调整个Spark应用的运行
from pyspark import SparkConf, SparkContext
# 创建SparkConf类对象,用于设置 Spark 程序的配置
# local[*]表示在本地运行Spark
# [*]表示使用系统中的所有可用核心。这适合于开发和测试。
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
# 基于SparkConf类对象创建SparkContext对象
sc = SparkContext(conf=conf)
# 打印PySpark的运行版本
print(sc.version)
# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

SparkConf 类的常用方法:

方法

描述

setMaster(master)

设置 Spark 的运行模式

setAppName(name)

设置 Spark 应用程序的名称,在 Spark UI 中显示

set(key, value)

设置任意的配置参数,通过键-值对的方式设置配置项

setAll(pairs)

批量设置多个配置项,接收包含键-值对的列表或元组

setExecutorEnv(key, value)

设置 executor 的环境变量

get(key, defaultValue=None)

获取指定键的配置值,若不存在,则返回默认值

contains(key)

检查配置中是否包含某个键

clear()

清空所有设置的配置项

getAll()

获取所有的配置项,以键-值对的形式返回

set("spark.some.config.option", "value")

可设置任何有效的 Spark 配置选项

二、数据输入

①RDD对象

如下图所示,PySpark 支持多种格式的数据输入,并在输入完成后生成一个 RDD 对象。

RDD 的全称是弹性分布式数据集(Resilient Distributed Datasets),它是 PySpark 中数据计算的载体,具备以下功能:

RDD 具有迭代计算特性,RDD的数据计算方法,返回值依旧是RDD对象。

②Python数据容器转RDD对象

在 PySpark 中,可以通过 SparkContext 对象的 parallelize 方法将 list、tuple、set、dict 和 str 转换为 RDD 对象。

parallelize() :用于将本地集合(即 Python 的原生数据结构)转换为 RDD 对象。

方法签名:

SparkContext.parallelize(collection, numSlices=None)

  • 参数collection: 可以是任何可迭代的数据结构(例如list、tuple、set、dict 或 str 的列表)
  • 参数numSlices: 可选参数,用于指定将数据划分为多少个分片
代码语言:python
代码运行次数:0
运行
AI代码解释
复制
# 导包
from pyspark import SparkConf,SparkContext
# 创建SparkConf类对象
conf=SparkConf().setMaster("local[*]").setAppName("test_spark_app")
# 基于SparkConf类对象创建SparkContext对象
sc=SparkContext(conf=conf)
# 通过parallelize方法将Python对象加载到Spark内,成为RDD对象
rdd1=sc.parallelize([1,2,3,4,5])
rdd2=sc.parallelize((1,2,3,4,5))
rdd3=sc.parallelize("abcdefg")
rdd4=sc.parallelize({1,2,3,4,5})
rdd5=sc.parallelize({"key1":"value1","key2":"value=2"})

# 使用collect()方法查看RDD里面有什么内容
print(rdd1.collect())
print(rdd2.collect())
print(rdd3.collect())
print(rdd4.collect())
print(rdd5.collect())

# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

输出结果: 1, 2, 3, 4, 5 1, 2, 3, 4, 5 'a', 'b', 'c', 'd', 'e', 'f', 'g' 1, 2, 3, 4, 5 'key1', 'key2'

【注意】

  • 对于字符串,parallelize 方法会将其拆分为单个字符并存入 RDD。
  • 对于字典,只有键会被存入 RDD 对象,值会被忽略。

③读取文件转RDD对象

在 PySpark 中,可通过 SparkContext 的 textFile 成员方法读取文本文件并生成RDD对象。

textFile():用于读取文本文件并将其内容作为 RDD(弹性分布式数据集)加载。

方法签名:textFile(path, minPartitions=None)

  • 参数path:要读取的文件的路径
  • 参数minPartitions:可选参数,用于指定数据划分的最小分片数

例如:电脑D盘中有一个test.txt文本文件,内容如下:

代码语言:python
代码运行次数:0
运行
AI代码解释
复制
# 导包
from pyspark import SparkConf,SparkContext
# 创建SparkConf类对象
conf=SparkConf().setMaster("local[*]").setAppName("test_spark_app")
# 基于SparkConf类对象创建SparkContext对象
sc=SparkContext(conf=conf)
# 使用textFile方法,读取文件数据加载到Spark内,使其成为RDD对象
rdd=sc.textFile("D:/test.txt")
print(rdd.collect())
# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

输出结果: 'Hello python!', '你好 Python!!!', '123456'

三、数据输出

①collect算子

功能:

将分布在集群上的所有 RDD 元素收集到驱动程序(Driver)节点,从而形成一个普通的 Python 列表

用法:

rdd.collect()

代码语言:python
代码运行次数:0
运行
AI代码解释
复制
# 导包
from pyspark import SparkConf,SparkContext
# 创建SparkConf类对象
conf=SparkConf().setMaster("local[*]").setAppName("test_spark_app")
# 基于SparkConf类对象创建SparkContext对象
sc=SparkContext(conf=conf)
# 准备RDD
rdd=sc.parallelize([1,2,3,4,5,6])
# collect 算子,输出RDD为List对象
# print(rdd)  输出的是类名,输出结果:ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:289
rdd_list=rdd.collect()
print(rdd_list)
print(type(rdd_list))
sc.stop()

输出结果:

1, 2, 3, 4, 5, 6

<class 'list'>

②reduce算子

功能:

将 RDD 中的元素两两应用指定的聚合函数,最终合并为一个值,适用于需要归约操作的场景。

用法:

rdd.reduce(lambda a, b: a + b)

代码语言:python
代码运行次数:0
运行
AI代码解释
复制
# 导包
from pyspark import SparkConf,SparkContext
# 创建SparkConf类对象
conf=SparkConf().setMaster("local[*]").setAppName("test_spark_app")
# 基于SparkConf类对象创建SparkContext对象
sc=SparkContext(conf=conf)
# 准备RDD
rdd=sc.parallelize([1,2,3,4,5,])

# reduce算子,对RDD进行两两聚合
num=rdd.reduce(lambda a,b:a+b)
print(num)
sc.stop()

输出结果:

15

【分析】

③take算子

功能:

从 RDD 中获取指定数量的元素,以列表形式返回,同时不会将所有数据传回驱动。如果指定的元素数量超出 RDD 元素数量,则返回所有元素。

用法:

rdd.take(n)

代码语言:python
代码运行次数:0
运行
AI代码解释
复制
# 导包
from pyspark import SparkConf,SparkContext
# 创建SparkConf类对象
conf=SparkConf().setMaster("local[*]").setAppName("test_spark_app")
# 基于SparkConf类对象创建SparkContext对象
sc=SparkContext(conf=conf)
# 准备RDD
rdd=sc.parallelize([1,2,3,4,5,])
# take算子,取出RDD前N个元素并组成list返回
take_list=rdd.take(3)
print(take_list)
sc.stop()

输出结果:

1, 2, 3

④count算子

功能:

返回 RDD 中元素的总个数。

用法:

rdd.count()

代码语言:python
代码运行次数:0
运行
AI代码解释
复制
# 导包
from pyspark import SparkConf,SparkContext
# 创建SparkConf类对象
conf=SparkConf().setMaster("local[*]").setAppName("test_spark_app")
# 基于SparkConf类对象创建SparkContext对象
sc=SparkContext(conf=conf)
# 准备RDD
rdd=sc.parallelize([1,2,3,4,5,])
# count算子,统计rdd内有多少条数据,返回值为数字
num_count=rdd.count()
print(f"rdd内有{num_count}个元素")
sc.stop()

输出结果:

rdd内有5个元素

⑤saveAsTextFile算子

功能:

将 RDD 中的数据写入文本文件中。

用法:

rdd.saveAsTextFile(path)

调用保存文件的算子,需配置Hadoop依赖,配置方法如下:

  1. 下载Hadoop安装包: 下载网址http://archive.apache.org/dist/hadoop/common/hadoop-3.0.0/hadoop-3.0.0.tar.gz
  2. 将Hadoop安装包解压到电脑任意位置
  3. 在Python代码中配置os模块: os.environ‘HADOOP_HOME’ = ‘HADOOP解压文件夹路径’
  4. 下载winutils.exe: 下载网址https://raw.githubusercontent.com/steveloughran/winutils/master/hadoop-3.0.0/bin/winutils.exe
  5. 将winutils.exe放入Hadoop解压文件夹的bin目录内
  6. 下载hadoop.dll: 下载网址https://raw.githubusercontent.com/steveloughran/winutils/master/hadoop-3.0.0/bin/hadoop.dll
  7. 将hadoop.dll放入:C:/Windows/System32 文件夹内
代码语言:python
代码运行次数:0
运行
AI代码解释
复制
from pyspark import SparkConf, SparkContext
# os用于操作系统级功能,这里用来设置环境变量
import os
# 指定 PySpark 使用的 Python 解释器路径
os.environ['PYSPARK_PYTHON'] = 'D:/dev/python/python310/python.exe'
# 指定 Hadoop 的安装目录
os.environ['HADOOP_HOME'] = "D:/dev/hadoop-3.0.0"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)

# 准备RDD1,传入numSlices参数为1,数据集划分为一个切片
rdd1 = sc.parallelize([1, 2, 3, 4, 5], numSlices=1)

# 准备RDD2,传入numSlices参数为1,数据集划分为一个切片
rdd2 = sc.parallelize([("Hello", 3), ("Spark", 5), ("Hi", 7)], 1)

# 准备RDD3,传入numSlices参数为1,数据集划分为一个切片
rdd3 = sc.parallelize([[1, 3, 5], [6, 7, 9], [11, 13, 11]], 1)

# 输出到文件中
rdd1.saveAsTextFile("D:/output1")
rdd2.saveAsTextFile("D:/output2")
rdd3.saveAsTextFile("D:/output3")

打开output2文本文件,输出结果如下:

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
Python大数据之PySpark(三)使用Python语言开发Spark程序代码
需求:[(‘Spark’, 2), (‘Flink’, 1), (‘hello’, 3), (‘you’, 1), (‘me’, 1), (‘she’, 1)]
Maynor
2023/09/29
7340
Python大数据之PySpark(三)使用Python语言开发Spark程序代码
PySpark数据计算
在大数据处理的时代,Apache Spark以其高效的数据处理能力和灵活的编程模型,成为了数据科学家和工程师的热门选择。PySpark作为Spark的Python接口,使得数据处理和分析更加直观和便捷。本文详细讲解了PySpark中的常用RDD算子,包括map、flatMap、reduceByKey、filter、distinct和sortBy。
Heaven645
2024/08/13
2640
PySpark数据计算
2021年大数据Spark(十三):Spark Core的RDD创建
官方文档:http://spark.apache.org/docs/latest/rdd-programming-guide.html#resilient-distributed-datasets-rdds
Lansonli
2021/10/09
5570
pyspark 内容介绍(一)
pyspark 包介绍 子包 pyspark.sql module pyspark.streaming module pyspark.ml package pyspark.mllib package 内容 PySpark是针对Spark的Python API。根据网上提供的资料,现在汇总一下这些类的基本用法,并举例说明如何具体使用。也是总结一下经常用到的这些公有类的使用方式。方便初学者查询及使用。 Public 类们: SparkContext: Spark 功能的主入口。 RDD: 弹性分布式数
用户1217611
2018/01/30
2.7K0
【Python】PySpark 数据计算 ④ ( RDD#filter 方法 - 过滤 RDD 中的元素 | RDD#distinct 方法 - 对 RDD 中的元素去重 )
RDD#filter 方法 可以 根据 指定的条件 过滤 RDD 对象中的元素 , 并返回一个新的 RDD 对象 ;
韩曙亮
2023/10/11
6220
【Python】PySpark 数据计算 ④ ( RDD#filter 方法 - 过滤 RDD 中的元素 | RDD#distinct 方法 - 对 RDD 中的元素去重 )
Python大数据之PySpark(五)RDD详解
rdd2=sc.textFile(“hdfs://node1:9820/pydata”)
Maynor
2023/10/04
7690
Python大数据之PySpark(五)RDD详解
【Python】PySpark 数据输入 ① ( RDD 简介 | RDD 中的数据存储与计算 | Python 容器数据转 RDD 对象 | 文件文件转 RDD 对象 )
RDD 英文全称为 " Resilient Distributed Datasets " , 对应中文名称 是 " 弹性分布式数据集 " ;
韩曙亮
2023/10/11
7490
【Python】PySpark 数据输入 ① ( RDD 简介 | RDD 中的数据存储与计算 | Python 容器数据转 RDD 对象 | 文件文件转 RDD 对象 )
Python大数据之PySpark(八)SparkCore加强
print(“释放缓存之后,直接从rdd的依赖链重新读取”) print(join_result_rdd.count())
Maynor
2023/10/10
2320
Python大数据之PySpark(八)SparkCore加强
RDD编程
Spark采用textFile()方法来从文件系统中加载数据创建RDD 该方法把文件的URI作为参数,这个URI可以是: 本地文件系统的地址、或者是分布式文件系统HDFS的地址或者是Amazon S3的地址等等。
Francek Chen
2025/01/22
1090
RDD编程
Python大数据之PySpark(六)RDD的操作
from pyspark import SparkConf, SparkContext import re
Maynor
2023/10/08
3920
Python大数据之PySpark(六)RDD的操作
【Python】PySpark 数据计算 ① ( RDD#map 方法 | RDD#map 语法 | 传入普通函数 | 传入 lambda 匿名函数 | 链式调用 )
在 PySpark 中 RDD 对象 提供了一种 数据计算方法 RDD#map 方法 ;
韩曙亮
2023/10/11
8400
【Python】PySpark 数据计算 ① ( RDD#map 方法 | RDD#map 语法 | 传入普通函数 | 传入 lambda 匿名函数 | 链式调用 )
PySpark在windows下的安装及使用
官网下载http://spark.apache.org/downloads.html,遇到加载不了选项框的情况可以尝试用手机打开网址获取下载链接后下载
Mirza Zhao
2023/05/08
1.5K0
PySpark在windows下的安装及使用
【Spark篇】---Spark中Action算子
Action类算子也是一类算子(函数)叫做行动算子,如foreach,collect,count等。Transformations类算子是延迟执行,Action类算子是触发执行。一个application应用程序(就是我们编写的一个应用程序)中有几个Action类算子执行,就有几个job运行。
LhWorld哥陪你聊算法
2018/09/13
1K0
【Spark篇】---Spark中Action算子
PySpark入门级学习教程,框架思维(上)
为什么要学习Spark?作为数据从业者多年,个人觉得Spark已经越来越走进我们的日常工作了,无论是使用哪种编程语言,Python、Scala还是Java,都会或多或少接触到Spark,它可以让我们能够用到集群的力量,可以对BigData进行高效操作,实现很多之前由于计算资源而无法轻易实现的东西。网上有很多关于Spark的好处,这里就不做过多的赘述,我们直接进入这篇文章的正文!
Sam Gor
2021/04/26
1.7K0
PySpark入门级学习教程,框架思维(上)
spark入门框架+python
不可否认,spark是一种大数据框架,它的出现往往会有Hadoop的身影,其实Hadoop更多的可以看做是大数据的基础设施,它本身提供了HDFS文件系统用于大数据的存储,当然还提供了MR用于大数据处理,但是MR有很多自身的缺点,针对这些缺点也已经有很多其他的方法,类如针对MR编写的复杂性有了Hive,针对MR的实时性差有了流处理Strom等等,spark设计也是针对MR功能的,它并没有大数据的存储功能,只是改进了大数据的处理部分,它的最大优势就是快,因为它是基于内存的,不像MR每一个job都要和磁盘打交道,所以大大节省了时间,它的核心是RDD,里面体现了一个弹性概念意思就是说,在内存存储不下数据的时候,spark会自动的将部分数据转存到磁盘,而这个过程是对用户透明的。
py3study
2020/01/06
1.5K0
从零到一spark进阶之路(二)
两种方式:①读取外部数据集② 在驱动器程序中对一个集合进行并行化 RDD可以从普通数组创建出来,也可以从文件系统或者HDFS中的文件创建出来。
学到老
2019/01/25
5700
从零到一spark进阶之路(二)
Spark入门_1_RddTransAction
本文介绍了Spark编程模型的基本概念,包括RDD、DataFrame和DataSet等,并讲解了Spark编程模型在不同编程语言下的使用方式。同时,本文还提供了Spark编程模型的代码示例,以帮助读者更好地理解Spark编程模型的原理和实现方式。
用户1147754
2018/01/02
9090
Spark入门_1_RddTransAction
Spark之【数据读取与保存】详细说明
Spark的数据读取及数据保存可以从两个维度来作区分:文件格式以及文件系统。文件格式分为:Text文件、Json文件、Csv文件、Sequence文件以及Object文件;文件系统分为:本地文件系统、HDFS、HBASE以及数据库。
大数据梦想家
2021/01/27
1.7K0
Spark之【数据读取与保存】详细说明
2021年大数据Spark(十五):Spark Core的RDD常用算子
RDD中包含很多函数,主要可以分为两类:Transformation转换函数和Action函数。
Lansonli
2021/10/09
8990
Spark2.x学习笔记:7、Spark应用程序设计
本文介绍了Spark编程的一些基础概念和常用操作,包括RDD、DataFrame、DataSet、Transformations、Actions、Spark Streaming、GraphX和Machine Learning。同时,文章还探讨了Spark在不同领域的应用,包括互联网广告、推荐系统、数据挖掘和自然语言处理等。文章还介绍了Spark的生态系统,包括Spark SQL、MLlib、GraphX和Structured Streaming。
程裕强
2018/01/02
1.1K0
Spark2.x学习笔记:7、Spark应用程序设计
推荐阅读
相关推荐
Python大数据之PySpark(三)使用Python语言开发Spark程序代码
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验