Spark中,数据集被抽象为分布式弹性数据集(Resilient Distributed Datasets, RDDs)。
Master/Slave架构:一个Spark Driver负责协调和管理整个Spark应用程序,而Worker节点(也称Executor)负责执行特定的RDD转换操作或计算任务。
Spark应用程序通常是由多个RDD转换操作和Action操作组成的DAG图形。
在创建并操作RDD时,Spark会将其转换为一系列可重复计算的操作,最后生成DAG图形。
当触发Action操作时,Spark将根据DAG图形计算出结果(Lazy Evaluation),并将结果返回驱动程序Driver。
其中DAG图可以优化(例如选择合适的操作顺序或进行数据分区和Shuffle操作等),从而提高计算效率。
可以分为两类,Transformation操作是指创建新的RDD的操作,Action操作是触发计算结果并返回值的操作。
Transformation操作是指不会立即执行的一系列操作,只有当遇到Action操作时才会触发Spark进行数据的计算和处理。例如,Spark中对RDD进行的map、filter、flatMap、 union、distinct、groupByKey、reduceByKey、sortByKey等操作均属于Transformations操作,这些操作可以将RDD通过复合多个RDD构建出新的RDD,但实际上并不会真正计算数据。因此,Transformations操作通常支持链式调用,可以同时应用多个不同的操作,并在计算的开销下最小化批量处理和数据分片的访问。
Action操作是指Spark中所执行的计算任务必须返回结果的操作,即需要立即进行计算和处理,触发Spark来处理数据并将结果返回给驱动程序。例如,Spark中对RDD进行的count、collect、reduce、foreach等操作都属于Action操作,这些操作可以返回具体的结果或将RDD转换为其他格式(如序列、文件等)。在执行Action操作期间,Spark会在所有Worker节点上同时运行相关计算任务,并考虑数据的分区、缓存等性能因素进行调度。
注意:
共享变量是指在不同的操作之间(如map、filter等)可以共享的可读写变量。根据共享模式的不同,Spark支持两种类型的共享变量:
依赖关系是说明一个RDD生成方式的抽象概念。它定义了父RDD和子RDD之间的关系,标示出RDD之间的血缘关系。因此,依赖关系是构建DAG执行计划所必需的部分。
Spark SQL的四个库:
首先创建一个DataFrame对象。可以通过读取文件、从RDD转换等方式来创建一个DataFrame。
在DataFrame上执行WHERE查询以进行筛选和过滤。
分组、聚合:groupBy()和agg()。
连接、联合:join()和union()。
优化查询:使用explain()
除非必须要使用SQL查询,否则建议尽可能使用DataFrame API来进行转换操作。
限制:Spark SQL不支持跨表联接、不支持子查询嵌套等。
注意:
DataFrame是不可变的,每次对DataFrame进行操作实际上都会返回一个新的DataFrame。
Spark SQL采用了类似于SQL查询的API,其中操作更接近查询而不是在内存中操作RDD。
缓存和持久化:为加速数据处理而缓存DataFrame对象。尤其是对于频繁查询和对小结果集做聚合操作的场景非常有用。此外,可以选择持久化到磁盘,这将有助于更长时间的维护这个数据集。
分区数:适当设置分区数有助于提高性能,并避免将大数据集拆分为过多的小分区而产生管理上的负担。
行列宽度:对于大型数据集来说,选择正确的存储格式和压缩方法(如Parquet和Orc等),有助于减少行和列占用的字节,减少I/O、内存和CPU开销,提高性能。
波士顿房价数据分析流程:
数据读取:可以使用Spark将数据从本地文件系统或远程文件系统中读入,并存储为一个DataFrame对象。
数据可视化:为了更好地理解数据,我们可以使用一些数据可视化工具,如matplotlib, seaborn 等。在Spark中,可以使用pyspark.ml.api 来方便地完成数据可视化操作。
特征提取与转换:波士顿房价数据集中包含了多个特征(如房屋面积、犯罪率、公共设施情况等),Spark中可以使用VectorAssembler特征转换器将这些特征合并为一个向量,供下一步机器学习算法使用。
模型训练和调优:Spark提供了常见的回归模型训练算法,如线性回归、决策树回归等。在训练模型之前,需要划分训练集和测试集,在训练过程中可以尝试不同的参数组合(如maxDepth、numTrees等),使用交叉验证来评估模型性能,并选择合适的模型进行预测。
模型效果评估:在训练完模型后,需要对模型进行效果评估。可以使用Spark中的RegressionEvaluator来计算预测结果和真实值之间的差异(如均方根误差、平均绝对误差等)。
数据读取
|
数据清洗
|
可视化与探索性数据分析
|
特征提取与转换
|
模型训练和调优
|
模型效果评估
|
模型预测
|
结果展示与可视化
注意:
数据清洗:波士顿房价数据集相对比较干净,但在实际应用中可能会出现缺失值、异常值等问题,需要进行数据清洗或处理。
特征选择:在选择特征时需要尽量选择和目标相关性高、且不同特征之间相互独立的特征,避免特征冗余导致模型过于复杂。
模型调优:在模型调优时需要注意过拟合和欠拟合问题,另外通过并行化训练、优化内存使用等手段提高Spark训练模型的效率。
import java.io.StringReader
import au.com.bytecode.opencsv.CSVReader
val boston = sc. textFile("file:/home/hadoop/boston. csv") //创建RDD
val bostonRDD = boston.map {
line => val splits = line.split(",");
(splits(0).toInt, splits(1).toDouble, splits(2).toDouble, splits(3).toDouble, splits(4).toInt, splits(5).toDouble, splits(6).toDouble, splits(7).toDouble, splits(8). toDouble, splits(9).toDouble, splits(10).toDouble, splits(11).toDouble, splits(12).toDouble, splits(13).toDouble, splits(14).toDouble)
}
val bostomDF = bostonRDD.toDF("ID","CRIM","ZN","INDUS","CHAS","NOX","RM","AGE","DIS","RAD","TAX","PTRATIO","B","LSTAT","MEDV")
//查看数据格式
bostonDF.printSchema()
//查看前5条记录
bostonDF.show(5,false)
//分组统计
bostonDF.groupBy("CHAS").count().show()
参考:
https://www.edureka.co/blog/spark-tutorial/#Real_Time_Analytics
https://spark.apache.org/docs/latest/rdd-programming-guide.html
https://techvidvan.com/tutorials/spark-shared-variable/
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。