前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark Core入门1【Spark集群安装、高可用、任务执行流程、使用Scala/Java/Lambda编写Spark WordCount】

Spark Core入门1【Spark集群安装、高可用、任务执行流程、使用Scala/Java/Lambda编写Spark WordCount】

作者头像
Java架构师必看
发布2021-05-14 17:31:20
1.5K0
发布2021-05-14 17:31:20
举报
文章被收录于专栏:Java架构师必看

一、Spark介绍

Spark是一种快速、通用、可扩展的大数据分析引擎,包含SparkSQL、Spark Streaming、GraphX、MLlib等子项目。

Spark是基于内存计算的大数据并行计算框架。Spark基于内存计算,提高了在大数据环境下数据处理的实时性,同时保证了高容错性和高可伸缩性,允许用户将Spark部署在大量廉价硬件之上,形成集群。

Spark的优点:

1、快:与Hadoop的MapReduce相比,Spark基于内存的运算要快100倍以上,基于硬盘的运算也要快10倍以上。Spark实现了高效的DAG(有向无环图)执行引擎,可以通过基于内存来高效处理数据流。

2、易用:Spark支持Java、Python和Scala的API,还支持超过80种高级算法,使用户可以快速构建不同的应用。而且Spark支持交互式的Python和Scala的shell,可以非常方便地在这些shell中使用Spark集群来验证解决问题的方法。

3、通用:Spark可以用于批处理、交互式查询(Spark SQL)、实时流处理(Spark Streaming)、机器学习(Spark MLlib)和图计算(GraphX)。这些不同类型的处理都可以在同一个应用中无缝使用。Spark统一的解决方案非常具有吸引力,毕竟任何公司都想用统一的平台去处理遇到的问题,减少开发和维护的人力成本和部署平台的物力成本。

4、Spark可以非常方便地与其他的开源产品进行融合。比如,Spark可以使用Hadoop的YARN和Apache Mesos作为它的资源管理和调度器,并且可以处理所有Hadoop支持的数据,包括HDFS、HBase和Cassandra等。这对于已经部署Hadoop集群的用户特别重要,因为不需要做任何数据迁移就可以使用Spark的强大处理能力。Spark也可以不依赖于第三方的资源管理和调度器,它实现了Standalone作为其内置的资源管理和调度框架,这样进一步降低了Spark的使用门槛,使得所有人都可以非常容易地部署和使用Spark。此外,Spark还提供了在EC2上部署Standalone的Spark集群的工具。

二、Spark集群安装

2.1   下载spark

(1)从spark官方下载spark安装包

(2)上传spark安装包到Linux上

(3)解压安装包到指定位置

代码语言:javascript
复制
tar -zxvf spark-2.3.3-bin-hadoop2.7.tgz -C /root/apps/spark

2.2    配置spark

spark配置文件都在spark/conf下

进入到spark安装目录

代码语言:javascript
复制
cd /root/apps/spark

进入conf目录并重命名并修改spark-env.sh.template文件

代码语言:javascript
复制
cd conf/
mv spark-env.sh.template spark-env.sh
vi spark-env.sh

在该配置文件中添加如下配置

代码语言:javascript
复制
export JAVA_HOME=/usr/local/jdk1.8
export SPARK_MASTER_IP=hdp-01
export SPARK_MASTER_PORT=7077

保存退出

重命名并修改slaves.template文件

代码语言:javascript
复制
mv slaves.template slaves
vi slaves

在该文件中添加子节点所在的位置(Worker节点,指定哪些机器需要作为从节点启动)

代码语言:javascript
复制
hdp-02
hdp-03

保存退出 将配置好的Spark拷贝到其他节点上 将spark拷贝到其他机器上hdp-02 、hdp-03的/root/apps目录下

代码语言:javascript
复制
for i in {2,3}; do scp -r /root/apps/spark/ hdp-0$i:/root/apps; done

Spark集群配置完毕,目前是1个Master,2个Worker,在hdp-01启动spark集群

代码语言:javascript
复制
/root/apps/spark/sbin/start-all.sh

启动后执行jps命令,主节点(hdp-01)上有Master进程,其他子节点(hdp-02、hdp-03)上有Worker进行。

登录Spark管理界面查看集群状态(主节点):http://hdp-01:8080/   【检验】 注意7077是rpc通信端口,内部是Netty

到此为止,Spark集群安装完毕,但是有一个很大的问题,那就是Master节点存在单点故障,要解决此问题,就要借助zookeeper,并且启动至少两个Master节点来实现高可靠,配置方式比较简单:

Spark集群规划:hdp-01,hdp-04是Master;hdp-02、hdp-03是Worker

安装配置zk集群,并启动zk集群

停止spark所有服务,修改配置文件spark-env.sh,在该配置文件中删掉SPARK_MASTER_IP

并添加如下配置

代码语言:javascript
复制
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=hdp-01:2181,hdp-02:2181,hdp-03:2181 -Dspark.deploy.zookeeper.dir=/spark"

#除此之外还可以修改Worker的核数(线程数,不能超过实际物理机器的线程)、内存大小等配置
#配置文件上有示例,直接抄即可
export SPARK_WORKER_CORES=8
export SPARK_WORKER_MEMORY=2g

1.在hdp-01节点上修改slaves配置文件内容指定worker节点 2.在hdp-01上执行sbin/start-all.sh脚本,后在hdp-04上执行sbin/start-master.sh启动第二个Master【意味着只在hdp-04启动一个Master】如果连接http://hdo-04:8080,会发现Status显示STANDBY状态,且没有Workers信息。

在Spark集群启动的时候,所有的Master和Worker都连接到Zookeeper集群中。zk的作用如下:

1、zk集群会选举出一个Master作为活跃(alive)的Master,另外一个Master处于Stand By状态。

2、zk集群还会保存活跃的Master信息

3、zk集群还会保存所有Worker的资源信息和资源使用情况,如图中hdp-01作为活跃的Master,它能获取所有的Worker(hdp-02、hdp-03)的使用情况,如果hdp-01挂掉,那么会切换为hdp-04作为活跃的Master,它也应该能获取获取所有的Worker信息,那么Worker的资源信息和资源使用情况就应该保存在zk中。【为了故障切换】

2.3    总结:

1、先启动zk集群

2、启动spark集群,但只会启动一个Master,另外一台Master机器需要手动启动

3、如果模拟hdp-01故障,那么hdp-04会由STANDBY状态切换为MASTER状态。当hdp-01修复后,hdp-01为STANDBY状态,hdp-04仍为MASTER状态。在故障切换的过程中,会短暂性终止spark服务。

三、执行Spark程序

3.1    入门案例——蒙特卡罗算法求Pi

实际上是通过数学采样的方式计算Pi,采样的次数越多,计算的Pi值越准确。

代码语言:javascript
复制
/root/apps/spark-2.3.3-bin-hadoop2.7/bin/spark-submit \
--master spark://hdp-01:7077,spark://hdp-02:7077 \
--class org.apache.spark.examples.SparkPi \
--executor-memory 2048mb \
--total-executor-cores 24 \
/root/apps/spark-2.3.3-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.3.3.jar \
10000

/**
解析:
第一行是指通过spark提交任务的客户端spark-submit
第二行是指定master所在的机器 底层是rpc通信协议 spark://主机名或ip地址:7077  7077是rpc通信端口,而8080是http外部访问端口,需要区分开来。提交任务可以指定多个master地址,目的是为了提交任务高可用
第三行是指执行哪一个类 全路径类名,官方自带的蒙特卡罗求Pi样例(底层是通过反射执行)
第四、五行是指执行的内存大小,cpu核数(实际上这里的核数是执行的线程数)
第六行是指该样例所在的jar包位置   2.11为scala版本  2.3.3为spark版本
第六行是指采样的次数,采样次数越多,求Pi越精确
*/

最终求的:Pi is roughly 3.141852462837049   采样次数可以设置更高试试

此时登录http://hdp-01:8080中,即spark后台管理界面,查看到新增了一个已完成任务。

Completed Applications

Application ID

Name

Cores

Memory per Executor

Submitted Time

User

State

Duration

app-20190427200200-0004

Spark Pi

16

2.0 GB

2019/04/27 20:02:00

root

FINISHED

2.8 min

在执行过程中,有一些细节需要说一下:

假设我现在的集群架构是这样:

hdp-01为Master(alive)、hdp-02也为Master(stand by) 

hdp-03、hdp-04、hdp-05为Worker  , 假设我在机器hdp-05中提交了蒙特卡罗求Pi任务

在执行任务的过程中,给集群中的所有机器输入jps,查看后台java任务都有哪些?

(1)在hdp-05中,存在CoarseGrainedExecutorBackend(执行任务真正执行的地方)、SparkSubmit(提交任务到Spark集群,和Master通信、调度任务等功能)、Worker等

(2)在hdp-03、hdp-04【即Worker机器】中都多了CoarseGrainedExecutorBackend进程,但无SparkSubmit进程

(3)在任务执行完成后再jps,发现SparkSubmit和CoarseGrainedExecutorBackend都消失了,原因是被释放了,节约资源

总结:CoarseGrainedExecutorBackend(简称Executor)在Worker执行任务时候启动进程,SparkSubmit在提交任务的机器执行进程,在任务执行完毕后,Executor和SparkSubmit都被释放。

3.2    Spark shell

spark-shell是Spark自带的交互式Shell程序,方便用户进行交互式编程,用户可以在该命令行下用scala编写spark程序。

上面的方式没有指定master的地址,即用的是spark的local模式运行【模拟spark集群运行的过程】

代码语言:javascript
复制
/root/apps/spark-2.3.3-bin-hadoop2.7/bin/spark-shell

只有书写master地址,才能与master建立连接,才能向master申请资源,才能将任务提交到集群

代码语言:javascript
复制
/root/apps/spark-2.3.3-bin-hadoop2.7/bin/spark-shell  \
--master spark://hdp-01:7077 \
--executor-memory 2g \
--total-executor-cores 2

参数说明: --master spark://hdp-01:7077  指定Master的地址,如果需要指定多个Master地址,只需要使用逗号分割即可 --executor-memory 2g              指定每个worker可用内存为2G,如果不指定内存,默认运行内存1024mb --total-executor-cores 2            指定整个集群使用的cup核数为2个

在spark-shell运行后,执行jps命令,发现提交任务的机器存在CoarseGrainedExecutorBackend和SparkSubmit,而其他worker寄去存在CoarseGrainedExecutorBackend,Master机器的进程和执行spark-shell之前没有明显变化。说明spark-shell在执行后,即使任务未提交到spark集群中,进程也依旧在后台保持执行。【实际上就是创建SparkContext】

指定了Master地址,那么就会将任务提交到集群中,开始时sparksubmit(客户端)要连接Master,并向Master申请计算资源(内存和核数等),Master进行资源调度(就是让哪些Worker启动Executor)。在准备工作时,这些进程都准备好了【实际上该过程底层就是创建SparkContext的过程】

注意: 如果启动spark shell时没有指定master地址,但是也可以正常启动spark shell和执行spark shell中的程序,其实是启动了spark的local模式,该模式仅在本机启动一个进程,没有与集群建立联系。

Spark Shell中已经默认将SparkContext类初始化为对象sc。用户代码如果需要用到,则直接应用sc即可

WordCount代码:【本地文件系统】

代码语言:javascript
复制
scala> sc.textFile("/root/w.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect

注意:需要具备的条件是:所有Worker机器上都需要有/root/w.txt,否则将会报错。真正执行计算的不是Master,也不是Worker,而是进程CoarseGrainedExecutorBackend。上述的方式是从本地文件系统读取数据的WordCount计算,真实环境应该是基于HDFS分布式文件系统读取文件。Spark先与namenode通信,找到数据存在哪些datanode中,最后从具体的datanode中读取数据。如果当前的机器或者集群的其他机器,其本地文件系统没有数据文件也没关系,基于HDFS分布式文件系统,集群上的每个节点都可以通过网络从HDFS中读取数据进行计算。

WordCount代码:【HDFS分布式文件系统】

代码语言:javascript
复制
scala> sc.textFile("hdfs://hdp-01:9000/wordcount").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
res1: Array[(String, Int)] = Array((scala,1), (hello,3), (java,1), (spark,2), (hi,2), (dianxin,2))

排序:

代码语言:javascript
复制
scala> sc.textFile("hdfs://hdp-01:9000/wordcount").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).sortBy(_._2).collect
res2: Array[(String, Int)] = Array((scala,1), (java,1), (spark,2), (hi,2), (dianxin,2), (hello,3))

scala> sc.textFile("hdfs://hdp-01:9000/wordcount").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).sortBy(_._2,false).collect 
res3: Array[(String, Int)] = Array((hello,3), (spark,2), (hi,2), (dianxin,2), (scala,1), (java,1))

四、Scala和Java执行WordCount对比

4.1    Scala执行WordCount

1、导入pom.xml

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>cn.itcats</groupId>
    <artifactId>spark-wordcount</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <scala.version>2.11.8</scala.version>
        <spark.version>2.2.0</spark.version>
        <hadoop.version>2.6.5</hadoop.version>
        <encoding>UTF-8</encoding>
    </properties>

    <dependencies>
        <!-- 导入scala的依赖 -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>

        <!-- 导入spark的依赖 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <!-- 指定hadoop-client API的版本 -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>

    </dependencies>

    <build>
        <pluginManagement>
            <plugins>
                <!-- 编译scala的插件 -->
                <plugin>
                    <groupId>net.alchim31.maven</groupId>
                    <artifactId>scala-maven-plugin</artifactId>
                    <version>3.2.2</version>
                </plugin>
                <!-- 编译java的插件 -->
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.5.1</version>
                </plugin>
            </plugins>
        </pluginManagement>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <executions>
                    <execution>
                        <id>scala-compile-first</id>
                        <phase>process-resources</phase>
                        <goals>
                            <goal>add-source</goal>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                    <execution>
                        <id>scala-test-compile</id>
                        <phase>process-test-resources</phase>
                        <goals>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <executions>
                    <execution>
                        <phase>compile</phase>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>


            <!-- 打jar插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

2、WordCount Scala代码

代码语言:javascript
复制
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object ScalaWordCount {
  def main(args: Array[String]): Unit = {
    //1.创建SparkConfig配置对象,配置Spark应用程序的名字
    val conf: SparkConf = new SparkConf()
    conf.setAppName("scalaWordCount")
    //2.创建Spark执行入口————SparkContext Driver和Master通信就是通过SparkContext进行通信
    val sc = new SparkContext(conf)
    //3.指定以后从哪读取数据创建RDD(弹性分布式数据集)
    val lines: RDD[String] = sc.textFile(args(0)) //返回的结果是读取的一行行文件数据集
    //4.切分压平
    val words: RDD[String] = lines.flatMap(_.split(" "))
    //5.将单词和1组合在一起变成元组
    val wordWithOne: RDD[(String, Int)] = words.map((_,1))
    //6.按照key进行聚合
    val reduced: RDD[(String, Int)] = wordWithOne.reduceByKey(_ + _)
    //7.排序
    val sortReduced = reduced.sortBy(_._2, false)   //_为元组(key,出现次数),按出现次数降序排列
    //8.将结果存入HDFS中
    sortReduced.saveAsTextFile(args(1))
    //9.释放资源sc
    sc.stop()
  }
}

3、使用Maven命令打包

4、上传至服务器且确保HDFS处于运行状态,执行命令

代码语言:javascript
复制
[root@hdp-01 bin]# ./spark-submit --master spark://hdp-01:7077 --class cn.itcats.spark.ScalaWordCount /root/spark-wordcount-1.0-SNAPSHOT.jar hdfs://hdp-01:9000/wordcount hdfs://hdp-01:9000/wordcount_res

需要注意的是:args(1),即结果存入HDFS中的文件路径不应该为HDFS中已存在的路径,否则将会抛出异常

代码语言:javascript
复制
Exception in thread "main" org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory hdfs://hdp-01:9000/wordcount already exists

5、查看执行结果

代码语言:javascript
复制
[root@hdp-01 bin]# hadoop fs -ls /wordcount_res
Found 3 items
-rw-r--r--   3 root supergroup          0 2019-04-28 21:42 /wordcount_res/_SUCCESS
-rw-r--r--   3 root supergroup         10 2019-04-28 21:42 /wordcount_res/part-00000
-rw-r--r--   3 root supergroup         48 2019-04-28 21:42 /wordcount_res/part-00001

实际上Spark读写HDFS中的数据是基于Hadoop中的HDFSClient,即基于HDFS的API读取数据。

6、数据结果实际上被写入多个文件中,全局有序

代码语言:javascript
复制
[root@hdp-01 bin]# hadoop fs -cat /wordcount_res/part-00000
(hello,3)
[root@hdp-01 bin]# hadoop fs -cat /wordcount_res/part-00001
(spark,2)
(hi,2)
(dianxin,2)
(scala,1)
(java,1)

在MapRecue中,有多少个ReduceTask决定了有多少个结果文件,可以通过指定ReduceTask数量来决定最后结果文件的数量。在我们上文在写Spark程序的时候我并没有指定以后生成多少个结果文件?那么为什么最终是三个结果文件呢?

4.2    Java执行WordCount

1、导入pom.xml依赖,可以直接使用4.1中的pom依赖文件

2、WordCount Java代码

代码语言:javascript
复制
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

import java.util.Arrays;
import java.util.Iterator;

public class JavaWordCount {
    public static void main(String[] args) {
        //1.创建SparkContext对象
        SparkConf sparkConf = new SparkConf().setAppName("javaWordCount");
        JavaSparkContext sc = new JavaSparkContext(sparkConf);
        //2.指定以后从哪读数据
        JavaRDD<String> lines = sc.textFile(args[0]);
        //3.读取的数据为一行行的RDD数据集  切分压平   输入为String类型  输出也为String类型
        JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String lines) throws Exception {
                //lines.split(" ")返回类型为数组类型,需要返回Iterator类型
                return Arrays.asList(lines.split(" ")).iterator();
            }
        });
        //4.将words组装为元组类型  传入String的words 返回元组 需要调用mapToPair
        JavaPairRDD<String, Integer> wordWithOne = words.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String word) throws Exception {
                return new Tuple2<String, Integer>(word, 1);
            }
        });
        //5.将key相同的元素聚合在一起
        JavaPairRDD<String, Integer> reduced = wordWithOne.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });
        //6.对结果进行排序  发现只有sortByKey  所以应该将Tuple中的键值对换位置,调用mapToPair方法
        JavaPairRDD<Integer, String> swaped = reduced.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
            @Override
            public Tuple2<Integer, String> call(Tuple2<String, Integer> tuple) throws Exception {
                //return new Tuple2<Integer, String>(tuple._2, tuple._1);
                return tuple.swap();
            }
        });
        //排序后的结果  (次数, Key)
        JavaPairRDD<Integer, String> sorted = swaped.sortByKey(false);
        //再换回 (Key, 次数的顺序)
        JavaPairRDD<String, Integer> res = sorted.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(Tuple2<Integer, String> tuple2) throws Exception {
                return tuple2.swap();
            }
        });
        //7.将结果保存到HDFS中
        res.saveAsTextFile(args[1]);
        //8.关闭sparkContext资源
        sc.stop();
    }
}

3、使用Maven命令打包

4、上传至服务器且确保HDFS处于运行状态,执行命令,同4.1中的操作。需要注意的是修改主函数全包名引用

代码语言:javascript
复制
[root@hdp-01 bin]# ./spark-submit --master spark://hdp-01:7077 --class cn.itcats.spark.JavaWordCount /root/spark-wordcount-1.0-SNAPSHOT.jar hdfs://hdp-01:9000/wordcount hdfs://hdp-01:9000/wordcount_res

5、查看执行结果

代码语言:javascript
复制
[root@hdp-01 bin]# hadoop fs -ls /wordcount_res
Found 3 items
-rw-r--r--   3 root supergroup          0 2019-04-28 21:42 /wordcount_res/_SUCCESS
-rw-r--r--   3 root supergroup         10 2019-04-28 21:42 /wordcount_res/part-00000
-rw-r--r--   3 root supergroup         48 2019-04-28 21:42 /wordcount_res/part-00001

实际上Spark读写HDFS中的数据是基于Hadoop中的HDFSClient,即基于HDFS的API读取数据。

6、数据结果实际上被写入多个文件中,全局有序

代码语言:javascript
复制
[root@hdp-01 bin]# hadoop fs -cat /wordcount_res/part-00000
(hello,3)
[root@hdp-01 bin]# hadoop fs -cat /wordcount_res/part-00001
(spark,2)
(hi,2)
(dianxin,2)
(scala,1)
(java,1)

4.2    Lambda表达式执行WordCount

编写Lambda表达式代码

代码语言:javascript
复制
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

import java.util.Arrays;

public class LambdaWordCount {
    public static void main(String[] args) {
        //1.创建SparkContext对象
        SparkConf sparkConf = new SparkConf().setAppName("lambdaWordCount");
        JavaSparkContext sc = new JavaSparkContext(sparkConf);
        //2.指定以后从哪读数据
        JavaRDD<String> lines = sc.textFile(args[0]);
        //3.读取的数据为一行行的RDD数据集  切分压平
        JavaRDD<String> words = lines.flatMap(x -> Arrays.asList(x.split(" ")).iterator());
        //4.将words组装为元组类型 w为words中的每个单词
        JavaPairRDD<String, Integer> wordWithOne = words.mapToPair(x -> new Tuple2<String, Integer>(x, 1));
        //5.根据Key进行聚合
        JavaPairRDD<String, Integer> reduced = wordWithOne.reduceByKey((m, n) -> m + n);
        //6.调整顺序  (次数,Key)
        JavaPairRDD<Integer, String> swaped1 = reduced.mapToPair(x -> x.swap());
        //7.排序
        JavaPairRDD<Integer, String> sorted = swaped1.sortByKey(false);
        //7.调整顺序 (Key,次数)
        JavaPairRDD<Integer, String> swaped2 = reduced.mapToPair(x -> x.swap());
        //8.将结果保存到HDFS中
        swaped2.saveAsTextFile(args[1]);
        //9.关闭资源
        sc.stop();
    }
}

4.3    本地调试代码

代码上只有一行改动:

代码语言:javascript
复制
//1.创建SparkConfig配置对象,配置Spark应用程序的名字
    //2.local为本地单线程执行  local[4]为本地4线程执行   local[*]本地多少线程就多少线程执行
    val conf: SparkConf = new SparkConf().setAppName("sparkWordCount").setMaster("local[4]")
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、Spark介绍
  • 二、Spark集群安装
    • 2.1   下载spark
      • 2.2    配置spark
        • 2.3    总结:
    • 三、执行Spark程序
      • 3.1    入门案例——蒙特卡罗算法求Pi
        • 3.2    Spark shell
        • 四、Scala和Java执行WordCount对比
          • 4.1    Scala执行WordCount
            • 4.2    Java执行WordCount
              • 4.2    Lambda表达式执行WordCount
                • 4.3    本地调试代码
                相关产品与服务
                大数据
                全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
                领券
                问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档