前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >大数据开发!Pandas转spark无痛指南!⛵

大数据开发!Pandas转spark无痛指南!⛵

原创
作者头像
ShowMeAI
发布于 2022-11-24 05:32:59
发布于 2022-11-24 05:32:59
8.7K00
代码可运行
举报
文章被收录于专栏:ShowMeAI研究中心ShowMeAI研究中心
运行总次数:0
代码可运行

Pandas灵活强大,是数据分析必备工具库!但处理大型数据集时,需过渡到PySpark才可以发挥并行计算的优势。本文总结了Pandas与PySpark的核心功能代码段,掌握即可丝滑切换。


💡 作者:韩信子@ShowMeAI 📘 大数据技术◉技能提升系列:https://www.showmeai.tech/tutorials/84 📘 数据分析实战系列:https://www.showmeai.tech/tutorials/40 📘 本文地址:https://www.showmeai.tech/article-detail/338 📢 声明:版权所有,转载请联系平台与作者并注明出处 📢 收藏ShowMeAI查看更多精彩内容

Pandas 是每位数据科学家和 Python 数据分析师都熟悉的工具库,它灵活且强大具备丰富的功能,但在处理大型数据集时,它是非常受限的。

这种情况下,我们会过渡到 PySpark,结合 Spark 生态强大的大数据处理能力,充分利用多机器并行的计算能力,可以加速计算。不过 PySpark 的语法和 Pandas 差异也比较大,很多开发人员会感觉这很让人头大。

在本篇内容中, ShowMeAI 将对最核心的数据处理和分析功能,梳理 PySpark 和 Pandas 相对应的代码片段,以便大家可以无痛地完成 Pandas 到大数据 PySpark 的转换😉

大数据处理分析及机器学习建模相关知识,ShowMeAI制作了详细的教程与工具速查手册,大家可以通过如下内容展开学习或者回顾相关知识。📘图解数据分析:从入门到精通系列教程📘图解大数据技术:从入门到精通系列教程📘图解机器学习算法:从入门到精通系列教程📘数据科学工具库速查表 | Spark RDD 速查表📘数据科学工具库速查表 | Spark SQL 速查表

💡 导入工具库

在使用具体功能之前,我们需要先导入所需的库:

代码语言:python
代码运行次数:0
运行
AI代码解释
复制
# pandas vs pyspark,工具库导入
import pandas as pd
import pyspark.sql.functions as F

PySpark 所有功能的入口点是 SparkSession 类。通过 SparkSession 实例,您可以创建spark dataframe、应用各种转换、读取和写入文件等,下面是定义 SparkSession的代码模板:

代码语言:python
代码运行次数:0
运行
AI代码解释
复制
from pyspark.sql import SparkSession
spark = SparkSession\
.builder\
.appName('SparkByExamples.com')\
.getOrCreate()

💡 创建 dataframe

在 Pandas 和 PySpark 中,我们最方便的数据承载数据结构都是 dataframe,它们的定义有一些不同,我们来对比一下看看:

💦 Pandas

代码语言:python
代码运行次数:0
运行
AI代码解释
复制
columns = ["employee","department","state","salary","age"]
data = [("Alain","Sales","Paris",60000,34),
        ("Ahmed","Sales","Lyon",80000,45),
        ("Ines","Sales","Nice",55000,30),
        ("Fatima","Finance","Paris",90000,28),
        ("Marie","Finance","Nantes",100000,40)]

创建DataFrame的 Pandas 语法如下:

代码语言:python
代码运行次数:0
运行
AI代码解释
复制
df = pd.DataFrame(data=data, columns=columns)
# 查看头2行
df.head(2)

💦 PySpark

创建DataFrame的 PySpark 语法如下:

代码语言:python
代码运行次数:0
运行
AI代码解释
复制
df = spark.createDataFrame(data).toDF(*columns)
# 查看头2行
df.limit(2).show()

💡 指定列类型

💦 Pandas

Pandas 指定字段数据类型的方法如下:

代码语言:python
代码运行次数:0
运行
AI代码解释
复制
types_dict = {
    "employee": pd.Series([r[0] for r in data], dtype='str'),
    "department": pd.Series([r[1] for r in data], dtype='str'),
    "state": pd.Series([r[2] for r in data], dtype='str'),
    "salary": pd.Series([r[3] for r in data], dtype='int'),
    "age": pd.Series([r[4] for r in data], dtype='int')
}

df = pd.DataFrame(types_dict)

Pandas 可以通过如下代码来检查数据类型:

代码语言:python
代码运行次数:0
运行
AI代码解释
复制
df.dtypes

💦 PySpark

PySpark 指定字段数据类型的方法如下:

代码语言:python
代码运行次数:0
运行
AI代码解释
复制
from pyspark.sql.types import StructType,StructField, StringType, IntegerType

schema = StructType([ \
    StructField("employee",StringType(),True), \
    StructField("department",StringType(),True), \
    StructField("state",StringType(),True), \
    StructField("salary", IntegerType(), True), \
    StructField("age", IntegerType(), True) \
  ])

df = spark.createDataFrame(data=data,schema=schema)

PySpark 可以通过如下代码来检查数据类型:

代码语言:python
代码运行次数:0
运行
AI代码解释
复制
df.dtypes
# 查看数据类型 
df.printSchema() 

💡 读写文件

Pandas 和 PySpark 中的读写文件方式非常相似。 具体语法对比如下:

💦 Pandas

代码语言:python
代码运行次数:0
运行
AI代码解释
复制
df = pd.read_csv(path, sep=';', header=True)
df.to_csv(path, ';', index=False)

💦 PySpark

代码语言:python
代码运行次数:0
运行
AI代码解释
复制
df = spark.read.csv(path, sep=';')
df.coalesce(n).write.mode('overwrite').csv(path, sep=';')

注意 ①

PySpark 中可以指定要分区的列:

代码语言:python
代码运行次数:0
运行
AI代码解释
复制
df.partitionBy("department","state").write.mode('overwrite').csv(path, sep=';')

注意 ②

可以通过上面所有代码行中的 parquet 更改 CSV 来读取和写入不同的格式,例如 parquet 格式

💡 数据选择 - 列

💦 Pandas

在 Pandas 中选择某些列是这样完成的:

代码语言:python
代码运行次数:0
运行
AI代码解释
复制
columns_subset = ['employee', 'salary']

df[columns_subset].head()

df.loc[:, columns_subset].head()

💦 PySpark

在 PySpark 中,我们需要使用带有列名列表的 select 方法来进行字段选择:

代码语言:python
代码运行次数:0
运行
AI代码解释
复制
columns_subset = ['employee', 'salary']

df.select(columns_subset).show(5)

💡 数据选择 - 行

💦 Pandas

Pandas可以使用 iloc对行进行筛选:

代码语言:python
代码运行次数:0
运行
AI代码解释
复制
# 头2行
df.iloc[:2].head()

💦 PySpark

在 Spark 中,可以像这样选择前 n 行:

代码语言:python
代码运行次数:0
运行
AI代码解释
复制
df.take(2).head()
# 或者
df.limit(2).head()

注意:使用 spark 时,数据可能分布在不同的计算节点上,因此“第一行”可能会随着运行而变化。

💡 条件选择

💦 Pandas

Pandas 中根据特定条件过滤数据/选择数据的语法如下:

代码语言:python
代码运行次数:0
运行
AI代码解释
复制
# First method
flt = (df['salary'] >= 90_000) & (df['state'] == 'Paris')
filtered_df = df[flt]

# Second Method: Using query which is generally faster
filtered_df = df.query('(salary >= 90_000) and (state == "Paris")')
# Or
target_state = "Paris"
filtered_df = df.query('(salary >= 90_000) and (state == @target_state)')

💦 PySpark

在 Spark 中,使用 filter方法或执行 SQL 进行数据选择。 语法如下:

代码语言:python
代码运行次数:0
运行
AI代码解释
复制
# 方法1:基于filter进行数据选择
filtered_df = df.filter((F.col('salary') >= 90_000) & (F.col('state') == 'Paris'))

# 或者
filtered_df = df.filter(F.expr('(salary >= 90000) and (state == "Paris")'))

# 方法2:基于SQL进行数据选择
df.createOrReplaceTempView("people")

filtered_df = spark.sql("""
SELECT * FROM people
WHERE (salary >= 90000) and (state == "Paris")
""") 

💡 添加字段

💦 Pandas

在 Pandas 中,有几种添加列的方法:

代码语言:python
代码运行次数:0
运行
AI代码解释
复制
seniority = [3, 5, 2, 4, 10]
# 方法1
df['seniority'] = seniority

# 方法2
df.insert(2, "seniority", seniority, True)

💦 PySpark

在 PySpark 中有一个特定的方法withColumn可用于添加列:

代码语言:python
代码运行次数:0
运行
AI代码解释
复制
seniority = [3, 5, 2, 4, 10]
df = df.withColumn('seniority', seniority)

💡 dataframe拼接

💦 2个dataframe - pandas

代码语言:python
代码运行次数:0
运行
AI代码解释
复制
# pandas拼接2个dataframe
df_to_add = pd.DataFrame(data=[("Robert","Advertisement","Paris",55000,27)], columns=columns)
df = pd.concat([df, df_to_add], ignore_index = True)

💦 2个dataframe - PySpark

代码语言:python
代码运行次数:0
运行
AI代码解释
复制
# PySpark拼接2个dataframe
df_to_add = spark.createDataFrame([("Robert","Advertisement","Paris",55000,27)]).toDF(*columns)
df = df.union(df_to_add)

💦 多个dataframe - pandas

代码语言:python
代码运行次数:0
运行
AI代码解释
复制
# pandas拼接多个dataframe
dfs = [df, df1, df2,...,dfn]
df = pd.concat(dfs, ignore_index = True)

💦 多个dataframe - PySpark

PySpark 中 unionAll 方法只能用来连接两个 dataframe。我们使用 reduce 方法配合unionAll来完成多个 dataframe 拼接:

代码语言:python
代码运行次数:0
运行
AI代码解释
复制
# pyspark拼接多个dataframe
from functools import reduce
from pyspark.sql import DataFrame

def unionAll(*dfs):
    return reduce(DataFrame.unionAll, dfs)

dfs = [df, df1, df2,...,dfn]
df = unionAll(*dfs)

💡 简单统计

Pandas 和 PySpark 都提供了为 dataframe 中的每一列进行统计计算的方法,可以轻松对下列统计值进行统计计算:

  • 列元素的计数
  • 列元素的平均值
  • 最大值
  • 最小值
  • 标准差
  • 三个分位数:25%、50% 和 75%

Pandas 和 PySpark 计算这些统计值的方法很类似,如下:

💦 Pandas & PySpark

代码语言:python
代码运行次数:0
运行
AI代码解释
复制
df.summary()
#或者
df.describe()

💡 数据分组聚合统计

Pandas 和 PySpark 分组聚合的操作也是非常类似的:

💦 Pandas

代码语言:python
代码运行次数:0
运行
AI代码解释
复制
df.groupby('department').agg({'employee': 'count', 'salary':'max', 'age':'mean'})

💦 PySpark

代码语言:python
代码运行次数:0
运行
AI代码解释
复制
df.groupBy('department').agg({'employee': 'count', 'salary':'max', 'age':'mean'})

但是,最终显示的结果需要一些调整才能一致。

在 Pandas 中,要分组的列会自动成为索引,如下所示:

要将其作为列恢复,我们需要应用 reset_index方法:

代码语言:python
代码运行次数:0
运行
AI代码解释
复制
df.groupby('department').agg({'employee': 'count', 'salary':'max', 'age':'mean'}).reset_index()

在 PySpark 中,列名会在结果dataframe中被重命名,如下所示:

要恢复列名,可以像下面这样使用别名方法:

代码语言:python
代码运行次数:0
运行
AI代码解释
复制
df.groupBy('department').agg(F.count('employee').alias('employee'), F.max('salary').alias('salary'), F.mean('age').alias('age'))

💡 数据转换

在数据处理中,我们经常要进行数据变换,最常见的是要对「字段/列」应用特定转换,在Pandas中我们可以轻松基于apply函数完成,但在PySpark 中我们可以使用udf(用户定义的函数)封装我们需要完成的变换的Python函数。

例如,我们对salary字段进行处理,如果工资低于 60000,我们需要增加工资 15%,如果超过 60000,我们需要增加 5%。

💦 Pandas

Pandas 中的语法如下:

代码语言:python
代码运行次数:0
运行
AI代码解释
复制
df['new_salary'] = df['salary'].apply(lambda x: x*1.15 if x<= 60000 else x*1.05)

💦 Pyspark

PySpark 中的等价操作下:

代码语言:python
代码运行次数:0
运行
AI代码解释
复制
from pyspark.sql.types import FloatType

df.withColumn('new_salary', F.udf(lambda x: x*1.15 if x<= 60000 else x*1.05, FloatType())('salary'))

⚠️ 请注意, udf方法需要明确指定数据类型(在我们的例子中为 FloatType)

💡 总结

本篇内容中, ShowMeAI 给大家总结了Pandas和PySpark对应的功能操作细节,我们可以看到Pandas和PySpark的语法有很多相似之处,但是要注意一些细节差异。

另外,大家还是要基于场景进行合适的工具选择:

  • 在处理大型数据集时,使用 PySpark 可以为您提供很大的优势,因为它允许并行计算。
  • 如果您正在使用的数据集很小,那么使用Pandas会很快和灵活。

参考资料

推荐阅读

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
Pandas数据显示不全?快来了解这些设置技巧! ⛵
我们在应用 Python 进行数据分析挖掘和机器学习时,最常用的工具库就是 Pandas,它可以帮助我们快捷地进行数据处理和分析。
ShowMeAI
2022/07/12
3.3K0
Pandas数据显示不全?快来了解这些设置技巧! ⛵
Pandas中你一定要掌握的时间序列相关高级功能 ⛵
Pandas 是大家都非常熟悉的数据分析与处理工具库,对于结构化的业务数据,它能很方便地进行各种数据分析和数据操作。但我们的数据中,经常会存在对应时间的字段,很多业务数据也是时间序组织,很多时候我们不可避免地需要和时间序列数据打交道。其实 Pandas 中有非常好的时间序列处理方法,但是因为使用并不特别多,很多基础教程也会略过这一部分。
ShowMeAI
2022/11/28
1.9K0
Pandas中你一定要掌握的时间序列相关高级功能 ⛵
Python数据分析 | Pandas数据分组与操作
教程地址:http://www.showmeai.tech/tutorials/33
ShowMeAI
2022/02/25
3K0
Python数据分析 | Pandas数据分组与操作
Python数据分析 | 基于Pandas的数据可视化
教程地址:http://www.showmeai.tech/tutorials/33
ShowMeAI
2022/02/25
9980
Python数据分析 | 基于Pandas的数据可视化
数据专家最常使用的 10 大类 Pandas 函数 ⛵
Python具有极其活跃的社区和覆盖全领域的第三方库工具库,近年来一直位居编程语言热度头部位置,而数据科学领域最受欢迎的python工具库之一是 Pandas。随着这么多年来的社区高速发展和海量的开源贡献者,使得 pandas 几乎可以胜任任何数据处理工作。
ShowMeAI
2022/08/09
3.8K0
数据专家最常使用的 10 大类 Pandas 函数 ⛵
Python中内置数据库!SQLite使用指南! ⛵
Python 是一个广泛使用的编程语言,在各个领域都能发挥很大的作用,而且安装 Python 环境的同时,我们也安装了很多其他出色的工具,其中当然少不了数据库。
ShowMeAI
2022/12/01
3K1
Python中内置数据库!SQLite使用指南! ⛵
客户流失?来看看大厂如何基于spark+机器学习构建千万数据规模上的用户留存模型 ⛵
Sparkify 是一个音乐流媒体平台,用户可以获取部分免费音乐资源,也有不少用户开启了会员订阅计划(参考QQ音乐),在Sparkify中享受优质音乐内容。
ShowMeAI
2022/08/09
1.7K0
客户流失?来看看大厂如何基于spark+机器学习构建千万数据规模上的用户留存模型 ⛵
数据分析大作战,SQL V.S. Python,来看看这些考题你都会吗 ⛵
SQL与Pandas都可以完成大部分数据分析需求。本文用SQL与Pands逐一实现10类核心数据分析需求,轻松进行对比学习:数据选择、限制、统计计数、排序、新字段生成、数据选择、数据分组、统计均值、方差、极差/范围。
ShowMeAI
2022/08/26
3550
数据分析大作战,SQL V.S. Python,来看看这些考题你都会吗 ⛵
Python数据分析 | 数据分析工具库Pandas介绍
教程地址:http://www.showmeai.tech/tutorials/33
ShowMeAI
2022/02/25
1.7K0
Python数据分析 | 数据分析工具库Pandas介绍
Python数据分析 | Pandas数据变换高级函数
教程地址:http://www.showmeai.tech/tutorials/33
ShowMeAI
2022/02/25
1.5K0
Python数据分析 | Pandas数据变换高级函数
精准营销!用机器学习完成客户分群!⛵
我们总会听到很多公司的技术人员在做用户画像的工作,细分客户/客户分群是一个很有意义的工作,可以确保企业构建更个性化的消费者针对策略,同时优化产品和服务。
ShowMeAI
2022/11/10
1.5K0
精准营销!用机器学习完成客户分群!⛵
高手系列!数据科学家私藏pandas高阶用法大全 ⛵
如果你是数据科学家、数据分析师、机器学习工程师,或者任何 python 数据从业者,你一定会高频使用 pandas 这个工具库——它操作简单功能强大,可以很方便完成数据处理、数据分析、数据变换等过程,优雅且便捷。
ShowMeAI
2022/12/04
6.3K0
高手系列!数据科学家私藏pandas高阶用法大全 ⛵
数据科学手把手:碳中和下的二氧化碳排放分析 ⛵
气候是全球性的话题,本文基于owid co2数据集,分析了世界各地的二氧化碳排放量,并将二氧化碳排放的主要国家以及二氧化碳排放来源进行了可视化。
ShowMeAI
2022/08/26
6620
数据科学手把手:碳中和下的二氧化碳排放分析 ⛵
二手车价格预测 | 构建AI模型并部署Web应用 ⛵
一份来自『RESEARCH AND MARKETS』的二手车报告预计,从 2022 年到 2030 年,全球二手车市场将以 6.1% 的复合年增长率增长,到 2030 年达到 2.67 万亿美元。人工智能技术的广泛使用增加了车主和买家之间的透明度,提升了购买体验,极大地推动了二手车市场的增长。
ShowMeAI
2022/08/09
2.7K0
二手车价格预测 | 构建AI模型并部署Web应用 ⛵
员工离职困扰?来看AI如何解决,基于人力资源分析的 ML 模型构建全方案 ⛵
本文通过数据科学和AI的方法,分析挖掘人力资源流失问题,构建基于机器学习的解决方案,并通过对AI模型的反向解释,深入理解导致人员流失的主要因素。
ShowMeAI
2022/08/26
6070
员工离职困扰?来看AI如何解决,基于人力资源分析的 ML 模型构建全方案 ⛵
精准用户画像!商城用户分群2.0!⛵
很多公司的技术人员在做用户画像的工作,细分客户/客户分群是一个很有意义的工作,可以确保企业构建更个性化的消费者针对策略,同时优化产品和服务。
ShowMeAI
2022/11/20
7040
精准用户画像!商城用户分群2.0!⛵
数据科学家赚多少?基于pandasql和plotly的薪资分析与可视化 ⛵
图片 本文揭秘全球数据科学岗位的薪资分布情况!以及分析岗位、国家、工作经验、雇佣形式、公司规模对薪资的影响,并贴心提供了求职建议和跳槽Tips! 💡 作者:韩信子@ShowMeAI 📘 数据分析实战系列:https://www.showmeai.tech/tutorials/40 📘 AI 岗位&攻略系列:https://www.showmeai.tech/tutorials/47 📘 本文地址:https://www.showmeai.tech/article-detail/402 📢 声明:版权所有,
ShowMeAI
2022/12/09
1.2K0
数据科学家赚多少?基于pandasql和plotly的薪资分析与可视化 ⛵
数据驱动!精细化运营!用机器学习做客户生命周期与价值预估!⛵
现在的互联网平台都有着海量的客户,但客户和客户之间有很大的差异,了解客户的行为方式对于充分理解用户与优化服务增强业务至关重要。而借助机器学习,我们可以实现更精细化地运营,具体来说,我们可以预测客户价值,即在特定时间段内将为公司带来多少价值。
ShowMeAI
2022/11/16
6060
数据驱动!精细化运营!用机器学习做客户生命周期与价值预估!⛵
一键自动化数据分析!快来看看 2022 年最受欢迎的 Python 宝藏工具库! ⛵
实际工作中,我们往往依托于业务数据分析制定业务策略。这个过程需要频繁地进行数据分析和挖掘,发现模式规律。对于算法工程师而言,一个有效的 AI 算法系统落地,不仅仅是模型这么简单——数据才是最底层的驱动。
ShowMeAI
2022/07/12
2K0
一键自动化数据分析!快来看看 2022 年最受欢迎的 Python 宝藏工具库! ⛵
求职指南!给数据开发的SQL面试准备路径!⛵
大量的数据科学职位需要精通 SQL,它也是数据分析师、数据科学家、数据建模岗最常考核的面试技能。在本篇内容中 ShowMeAI 将梳理汇总所有面试 SQL 问题,按照不同的主题构建练习专项块,要求职的同学们可以按照对应板块内容进行专项击破与复习。
ShowMeAI
2022/11/30
4.2K1
求职指南!给数据开发的SQL面试准备路径!⛵
推荐阅读
相关推荐
Pandas数据显示不全?快来了解这些设置技巧! ⛵
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验