Spark是一种快速、通用、可扩展的大数据分析引擎,包含SparkSQL、Spark Streaming、GraphX、MLlib等子项目。
Spark是基于内存计算的大数据并行计算框架。Spark基于内存计算,提高了在大数据环境下数据处理的实时性,同时保证了高容错性和高可伸缩性,允许用户将Spark部署在大量廉价硬件之上,形成集群。
Spark的优点:
1、快:与Hadoop的MapReduce相比,Spark基于内存的运算要快100倍以上,基于硬盘的运算也要快10倍以上。Spark实现了高效的DAG(有向无环图)执行引擎,可以通过基于内存来高效处理数据流。
2、易用:Spark支持Java、Python和Scala的API,还支持超过80种高级算法,使用户可以快速构建不同的应用。而且Spark支持交互式的Python和Scala的shell,可以非常方便地在这些shell中使用Spark集群来验证解决问题的方法。
3、通用:Spark可以用于批处理、交互式查询(Spark SQL)、实时流处理(Spark Streaming)、机器学习(Spark MLlib)和图计算(GraphX)。这些不同类型的处理都可以在同一个应用中无缝使用。Spark统一的解决方案非常具有吸引力,毕竟任何公司都想用统一的平台去处理遇到的问题,减少开发和维护的人力成本和部署平台的物力成本。
4、Spark可以非常方便地与其他的开源产品进行融合。比如,Spark可以使用Hadoop的YARN和Apache Mesos作为它的资源管理和调度器,并且可以处理所有Hadoop支持的数据,包括HDFS、HBase和Cassandra等。这对于已经部署Hadoop集群的用户特别重要,因为不需要做任何数据迁移就可以使用Spark的强大处理能力。Spark也可以不依赖于第三方的资源管理和调度器,它实现了Standalone作为其内置的资源管理和调度框架,这样进一步降低了Spark的使用门槛,使得所有人都可以非常容易地部署和使用Spark。此外,Spark还提供了在EC2上部署Standalone的Spark集群的工具。
(1)从spark官方下载spark安装包
(2)上传spark安装包到Linux上
(3)解压安装包到指定位置
tar -zxvf spark-2.3.3-bin-hadoop2.7.tgz -C /root/apps/spark
spark配置文件都在spark/conf下
进入到spark安装目录
cd /root/apps/spark
进入conf目录并重命名并修改spark-env.sh.template文件
cd conf/
mv spark-env.sh.template spark-env.sh
vi spark-env.sh
在该配置文件中添加如下配置
export JAVA_HOME=/usr/local/jdk1.8
export SPARK_MASTER_IP=hdp-01
export SPARK_MASTER_PORT=7077
保存退出
重命名并修改slaves.template文件
mv slaves.template slaves
vi slaves
在该文件中添加子节点所在的位置(Worker节点,指定哪些机器需要作为从节点启动)
hdp-02
hdp-03
保存退出 将配置好的Spark拷贝到其他节点上 将spark拷贝到其他机器上hdp-02 、hdp-03的/root/apps目录下
for i in {2,3}; do scp -r /root/apps/spark/ hdp-0$i:/root/apps; done
Spark集群配置完毕,目前是1个Master,2个Worker,在hdp-01启动spark集群
/root/apps/spark/sbin/start-all.sh
启动后执行jps命令,主节点(hdp-01)上有Master进程,其他子节点(hdp-02、hdp-03)上有Worker进行。
登录Spark管理界面查看集群状态(主节点):http://hdp-01:8080/ 【检验】 注意7077是rpc通信端口,内部是Netty
到此为止,Spark集群安装完毕,但是有一个很大的问题,那就是Master节点存在单点故障,要解决此问题,就要借助zookeeper,并且启动至少两个Master节点来实现高可靠,配置方式比较简单:
Spark集群规划:hdp-01,hdp-04是Master;hdp-02、hdp-03是Worker
安装配置zk集群,并启动zk集群
停止spark所有服务,修改配置文件spark-env.sh,在该配置文件中删掉SPARK_MASTER_IP
并添加如下配置
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=hdp-01:2181,hdp-02:2181,hdp-03:2181 -Dspark.deploy.zookeeper.dir=/spark"
#除此之外还可以修改Worker的核数(线程数,不能超过实际物理机器的线程)、内存大小等配置
#配置文件上有示例,直接抄即可
export SPARK_WORKER_CORES=8
export SPARK_WORKER_MEMORY=2g
1.在hdp-01节点上修改slaves配置文件内容指定worker节点 2.在hdp-01上执行sbin/start-all.sh脚本,后在hdp-04上执行sbin/start-master.sh启动第二个Master【意味着只在hdp-04启动一个Master】如果连接http://hdo-04:8080,会发现Status显示STANDBY状态,且没有Workers信息。
在Spark集群启动的时候,所有的Master和Worker都连接到Zookeeper集群中。zk的作用如下:
1、zk集群会选举出一个Master作为活跃(alive)的Master,另外一个Master处于Stand By状态。
2、zk集群还会保存活跃的Master信息
3、zk集群还会保存所有Worker的资源信息和资源使用情况,如图中hdp-01作为活跃的Master,它能获取所有的Worker(hdp-02、hdp-03)的使用情况,如果hdp-01挂掉,那么会切换为hdp-04作为活跃的Master,它也应该能获取获取所有的Worker信息,那么Worker的资源信息和资源使用情况就应该保存在zk中。【为了故障切换】
1、先启动zk集群
2、启动spark集群,但只会启动一个Master,另外一台Master机器需要手动启动
3、如果模拟hdp-01故障,那么hdp-04会由STANDBY状态切换为MASTER状态。当hdp-01修复后,hdp-01为STANDBY状态,hdp-04仍为MASTER状态。在故障切换的过程中,会短暂性终止spark服务。
实际上是通过数学采样的方式计算Pi,采样的次数越多,计算的Pi值越准确。
/root/apps/spark-2.3.3-bin-hadoop2.7/bin/spark-submit \
--master spark://hdp-01:7077,spark://hdp-02:7077 \
--class org.apache.spark.examples.SparkPi \
--executor-memory 2048mb \
--total-executor-cores 24 \
/root/apps/spark-2.3.3-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.3.3.jar \
10000
/**
解析:
第一行是指通过spark提交任务的客户端spark-submit
第二行是指定master所在的机器 底层是rpc通信协议 spark://主机名或ip地址:7077 7077是rpc通信端口,而8080是http外部访问端口,需要区分开来。提交任务可以指定多个master地址,目的是为了提交任务高可用
第三行是指执行哪一个类 全路径类名,官方自带的蒙特卡罗求Pi样例(底层是通过反射执行)
第四、五行是指执行的内存大小,cpu核数(实际上这里的核数是执行的线程数)
第六行是指该样例所在的jar包位置 2.11为scala版本 2.3.3为spark版本
第六行是指采样的次数,采样次数越多,求Pi越精确
*/
最终求的:Pi is roughly 3.141852462837049 采样次数可以设置更高试试
此时登录http://hdp-01:8080中,即spark后台管理界面,查看到新增了一个已完成任务。
Completed Applications
Application ID | Name | Cores | Memory per Executor | Submitted Time | User | State | Duration |
---|---|---|---|---|---|---|---|
app-20190427200200-0004 | Spark Pi | 16 | 2.0 GB | 2019/04/27 20:02:00 | root | FINISHED | 2.8 min |
在执行过程中,有一些细节需要说一下:
假设我现在的集群架构是这样:
hdp-01为Master(alive)、hdp-02也为Master(stand by)
hdp-03、hdp-04、hdp-05为Worker , 假设我在机器hdp-05中提交了蒙特卡罗求Pi任务
在执行任务的过程中,给集群中的所有机器输入jps,查看后台java任务都有哪些?
(1)在hdp-05中,存在CoarseGrainedExecutorBackend(执行任务真正执行的地方)、SparkSubmit(提交任务到Spark集群,和Master通信、调度任务等功能)、Worker等。
(2)在hdp-03、hdp-04【即Worker机器】中都多了CoarseGrainedExecutorBackend进程,但无SparkSubmit进程。
(3)在任务执行完成后再jps,发现SparkSubmit和CoarseGrainedExecutorBackend都消失了,原因是被释放了,节约资源。
总结:CoarseGrainedExecutorBackend(简称Executor)在Worker执行任务时候启动进程,SparkSubmit在提交任务的机器执行进程,在任务执行完毕后,Executor和SparkSubmit都被释放。
spark-shell是Spark自带的交互式Shell程序,方便用户进行交互式编程,用户可以在该命令行下用scala编写spark程序。
上面的方式没有指定master的地址,即用的是spark的local模式运行【模拟spark集群运行的过程】
/root/apps/spark-2.3.3-bin-hadoop2.7/bin/spark-shell
只有书写master地址,才能与master建立连接,才能向master申请资源,才能将任务提交到集群
/root/apps/spark-2.3.3-bin-hadoop2.7/bin/spark-shell \
--master spark://hdp-01:7077 \
--executor-memory 2g \
--total-executor-cores 2
参数说明: --master spark://hdp-01:7077 指定Master的地址,如果需要指定多个Master地址,只需要使用逗号分割即可 --executor-memory 2g 指定每个worker可用内存为2G,如果不指定内存,默认运行内存1024mb --total-executor-cores 2 指定整个集群使用的cup核数为2个
在spark-shell运行后,执行jps命令,发现提交任务的机器存在CoarseGrainedExecutorBackend和SparkSubmit,而其他worker寄去存在CoarseGrainedExecutorBackend,Master机器的进程和执行spark-shell之前没有明显变化。说明spark-shell在执行后,即使任务未提交到spark集群中,进程也依旧在后台保持执行。【实际上就是创建SparkContext】
指定了Master地址,那么就会将任务提交到集群中,开始时sparksubmit(客户端)要连接Master,并向Master申请计算资源(内存和核数等),Master进行资源调度(就是让哪些Worker启动Executor)。在准备工作时,这些进程都准备好了【实际上该过程底层就是创建SparkContext的过程】
注意: 如果启动spark shell时没有指定master地址,但是也可以正常启动spark shell和执行spark shell中的程序,其实是启动了spark的local模式,该模式仅在本机启动一个进程,没有与集群建立联系。
Spark Shell中已经默认将SparkContext类初始化为对象sc。用户代码如果需要用到,则直接应用sc即可
WordCount代码:【本地文件系统】
scala> sc.textFile("/root/w.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
注意:需要具备的条件是:所有Worker机器上都需要有/root/w.txt,否则将会报错。真正执行计算的不是Master,也不是Worker,而是进程CoarseGrainedExecutorBackend。上述的方式是从本地文件系统读取数据的WordCount计算,真实环境应该是基于HDFS分布式文件系统读取文件。Spark先与namenode通信,找到数据存在哪些datanode中,最后从具体的datanode中读取数据。如果当前的机器或者集群的其他机器,其本地文件系统没有数据文件也没关系,基于HDFS分布式文件系统,集群上的每个节点都可以通过网络从HDFS中读取数据进行计算。
WordCount代码:【HDFS分布式文件系统】
scala> sc.textFile("hdfs://hdp-01:9000/wordcount").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
res1: Array[(String, Int)] = Array((scala,1), (hello,3), (java,1), (spark,2), (hi,2), (dianxin,2))
排序:
scala> sc.textFile("hdfs://hdp-01:9000/wordcount").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).sortBy(_._2).collect
res2: Array[(String, Int)] = Array((scala,1), (java,1), (spark,2), (hi,2), (dianxin,2), (hello,3))
scala> sc.textFile("hdfs://hdp-01:9000/wordcount").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).sortBy(_._2,false).collect
res3: Array[(String, Int)] = Array((hello,3), (spark,2), (hi,2), (dianxin,2), (scala,1), (java,1))
1、导入pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cn.itcats</groupId>
<artifactId>spark-wordcount</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<scala.version>2.11.8</scala.version>
<spark.version>2.2.0</spark.version>
<hadoop.version>2.6.5</hadoop.version>
<encoding>UTF-8</encoding>
</properties>
<dependencies>
<!-- 导入scala的依赖 -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<!-- 导入spark的依赖 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- 指定hadoop-client API的版本 -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
</dependencies>
<build>
<pluginManagement>
<plugins>
<!-- 编译scala的插件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
</plugin>
<!-- 编译java的插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<executions>
<execution>
<phase>compile</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- 打jar插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
2、WordCount Scala代码
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object ScalaWordCount {
def main(args: Array[String]): Unit = {
//1.创建SparkConfig配置对象,配置Spark应用程序的名字
val conf: SparkConf = new SparkConf()
conf.setAppName("scalaWordCount")
//2.创建Spark执行入口————SparkContext Driver和Master通信就是通过SparkContext进行通信
val sc = new SparkContext(conf)
//3.指定以后从哪读取数据创建RDD(弹性分布式数据集)
val lines: RDD[String] = sc.textFile(args(0)) //返回的结果是读取的一行行文件数据集
//4.切分压平
val words: RDD[String] = lines.flatMap(_.split(" "))
//5.将单词和1组合在一起变成元组
val wordWithOne: RDD[(String, Int)] = words.map((_,1))
//6.按照key进行聚合
val reduced: RDD[(String, Int)] = wordWithOne.reduceByKey(_ + _)
//7.排序
val sortReduced = reduced.sortBy(_._2, false) //_为元组(key,出现次数),按出现次数降序排列
//8.将结果存入HDFS中
sortReduced.saveAsTextFile(args(1))
//9.释放资源sc
sc.stop()
}
}
3、使用Maven命令打包
4、上传至服务器且确保HDFS处于运行状态,执行命令
[root@hdp-01 bin]# ./spark-submit --master spark://hdp-01:7077 --class cn.itcats.spark.ScalaWordCount /root/spark-wordcount-1.0-SNAPSHOT.jar hdfs://hdp-01:9000/wordcount hdfs://hdp-01:9000/wordcount_res
需要注意的是:args(1),即结果存入HDFS中的文件路径不应该为HDFS中已存在的路径,否则将会抛出异常
Exception in thread "main" org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory hdfs://hdp-01:9000/wordcount already exists
5、查看执行结果
[root@hdp-01 bin]# hadoop fs -ls /wordcount_res
Found 3 items
-rw-r--r-- 3 root supergroup 0 2019-04-28 21:42 /wordcount_res/_SUCCESS
-rw-r--r-- 3 root supergroup 10 2019-04-28 21:42 /wordcount_res/part-00000
-rw-r--r-- 3 root supergroup 48 2019-04-28 21:42 /wordcount_res/part-00001
实际上Spark读写HDFS中的数据是基于Hadoop中的HDFSClient,即基于HDFS的API读取数据。
6、数据结果实际上被写入多个文件中,全局有序
[root@hdp-01 bin]# hadoop fs -cat /wordcount_res/part-00000
(hello,3)
[root@hdp-01 bin]# hadoop fs -cat /wordcount_res/part-00001
(spark,2)
(hi,2)
(dianxin,2)
(scala,1)
(java,1)
在MapRecue中,有多少个ReduceTask决定了有多少个结果文件,可以通过指定ReduceTask数量来决定最后结果文件的数量。在我们上文在写Spark程序的时候我并没有指定以后生成多少个结果文件?那么为什么最终是三个结果文件呢?
1、导入pom.xml依赖,可以直接使用4.1中的pom依赖文件
2、WordCount Java代码
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;
public class JavaWordCount {
public static void main(String[] args) {
//1.创建SparkContext对象
SparkConf sparkConf = new SparkConf().setAppName("javaWordCount");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
//2.指定以后从哪读数据
JavaRDD<String> lines = sc.textFile(args[0]);
//3.读取的数据为一行行的RDD数据集 切分压平 输入为String类型 输出也为String类型
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String lines) throws Exception {
//lines.split(" ")返回类型为数组类型,需要返回Iterator类型
return Arrays.asList(lines.split(" ")).iterator();
}
});
//4.将words组装为元组类型 传入String的words 返回元组 需要调用mapToPair
JavaPairRDD<String, Integer> wordWithOne = words.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String word) throws Exception {
return new Tuple2<String, Integer>(word, 1);
}
});
//5.将key相同的元素聚合在一起
JavaPairRDD<String, Integer> reduced = wordWithOne.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
//6.对结果进行排序 发现只有sortByKey 所以应该将Tuple中的键值对换位置,调用mapToPair方法
JavaPairRDD<Integer, String> swaped = reduced.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
@Override
public Tuple2<Integer, String> call(Tuple2<String, Integer> tuple) throws Exception {
//return new Tuple2<Integer, String>(tuple._2, tuple._1);
return tuple.swap();
}
});
//排序后的结果 (次数, Key)
JavaPairRDD<Integer, String> sorted = swaped.sortByKey(false);
//再换回 (Key, 次数的顺序)
JavaPairRDD<String, Integer> res = sorted.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
@Override
public Tuple2<String, Integer> call(Tuple2<Integer, String> tuple2) throws Exception {
return tuple2.swap();
}
});
//7.将结果保存到HDFS中
res.saveAsTextFile(args[1]);
//8.关闭sparkContext资源
sc.stop();
}
}
3、使用Maven命令打包
4、上传至服务器且确保HDFS处于运行状态,执行命令,同4.1中的操作。需要注意的是修改主函数全包名引用
[root@hdp-01 bin]# ./spark-submit --master spark://hdp-01:7077 --class cn.itcats.spark.JavaWordCount /root/spark-wordcount-1.0-SNAPSHOT.jar hdfs://hdp-01:9000/wordcount hdfs://hdp-01:9000/wordcount_res
5、查看执行结果
[root@hdp-01 bin]# hadoop fs -ls /wordcount_res
Found 3 items
-rw-r--r-- 3 root supergroup 0 2019-04-28 21:42 /wordcount_res/_SUCCESS
-rw-r--r-- 3 root supergroup 10 2019-04-28 21:42 /wordcount_res/part-00000
-rw-r--r-- 3 root supergroup 48 2019-04-28 21:42 /wordcount_res/part-00001
实际上Spark读写HDFS中的数据是基于Hadoop中的HDFSClient,即基于HDFS的API读取数据。
6、数据结果实际上被写入多个文件中,全局有序
[root@hdp-01 bin]# hadoop fs -cat /wordcount_res/part-00000
(hello,3)
[root@hdp-01 bin]# hadoop fs -cat /wordcount_res/part-00001
(spark,2)
(hi,2)
(dianxin,2)
(scala,1)
(java,1)
编写Lambda表达式代码
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import java.util.Arrays;
public class LambdaWordCount {
public static void main(String[] args) {
//1.创建SparkContext对象
SparkConf sparkConf = new SparkConf().setAppName("lambdaWordCount");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
//2.指定以后从哪读数据
JavaRDD<String> lines = sc.textFile(args[0]);
//3.读取的数据为一行行的RDD数据集 切分压平
JavaRDD<String> words = lines.flatMap(x -> Arrays.asList(x.split(" ")).iterator());
//4.将words组装为元组类型 w为words中的每个单词
JavaPairRDD<String, Integer> wordWithOne = words.mapToPair(x -> new Tuple2<String, Integer>(x, 1));
//5.根据Key进行聚合
JavaPairRDD<String, Integer> reduced = wordWithOne.reduceByKey((m, n) -> m + n);
//6.调整顺序 (次数,Key)
JavaPairRDD<Integer, String> swaped1 = reduced.mapToPair(x -> x.swap());
//7.排序
JavaPairRDD<Integer, String> sorted = swaped1.sortByKey(false);
//7.调整顺序 (Key,次数)
JavaPairRDD<Integer, String> swaped2 = reduced.mapToPair(x -> x.swap());
//8.将结果保存到HDFS中
swaped2.saveAsTextFile(args[1]);
//9.关闭资源
sc.stop();
}
}
代码上只有一行改动:
//1.创建SparkConfig配置对象,配置Spark应用程序的名字
//2.local为本地单线程执行 local[4]为本地4线程执行 local[*]本地多少线程就多少线程执行
val conf: SparkConf = new SparkConf().setAppName("sparkWordCount").setMaster("local[4]")