MapReduce 是一个分布式运算程序的编程框架,核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发地运行在 Hadoop 集群上。
海量数据在单机上处理受到硬件资源限制,而一旦将单机程序扩展到集群来分布式运行,将极大增加程序的复杂度和开发难度。为了提高开发效率,MapReduce 将分布式程序中的公共功能封装成框架。引入 MapReduce 框架后,开发人员可以将绝大部分工作集中在业务逻辑的开发上,而将分布式计算中复杂的工作交由框架来处理。
MapReduce 框架通常由三个阶段组成:
下图把 MapReduce 的过程分为两个部分,而实际上从两边的 Map 和 Reduce 到中间的那一大块都属于 Shuffle 过程,也就是说,Shuffle 过程有一部分是在 Map 端,有一部分是在 Reduce 端。
一般来说,“活的”对象只生存在内存里,关机断电就没有了。而且“活的”对象只能由本地的进程使用,不能被发送到网络上的另外一台计算机。然而序列化可以存储“活的”对象,将“活的”对象发送到远程计算机。序列化和反序列化在分布式数据处理领域经常出现,主要有两个作用:进程通信和永久存储。
序列化就是把内存中的对象,转换成字节序列(或其他数据传输协议)以便于存储(持久化)和网络传输。 反序列化就是将收到字节序列(或其他数据传输协议)或者是硬盘持久化的数据,转换成内存中的对象。
Java 的序列化使用了一个重量级序列化框架(Serializable),一个对象被序列化后,会附带很多额外的信息(校验信息,header,继承关系等),不便于在网络中高效传输。所以,Hadoop 自己开发了一套更加精简和高效序列化机制(Writable)。
Hadoop 中各个节点的通信是通过远程调用(RPC)实现的,RPC 序列化要求具有以下特点:
Java类型 | Hadoop Writable类型 |
---|---|
Boolean | BooleanWritable |
Byte | ByteWritable |
Int | IntWritable |
Float | FloatWritable |
Long | LongWritable |
Double | DoubleWritable |
String | Text |
Map | MapWritable |
Array | ArrayWritable |
Null | NullWritable |
MapperReduce 中主要有 3 类进程:
dfs.block.size
(HDFS 块大小)默认为128m。也就是说如果输入文件为 128m 时,会被划分为 1 个 Split;当输入文件为 150m 时,会被划分为 2 个 Split。dfs.block.size
的大小。job.setNumReduceTasks(int n)
设置即可。MapTask 数量过多的的话,会产生大量的小文件,过多 MapTask 的创建和初始化都会消耗大量的硬件资源 。MapTask 数量太小的话,会导致并发度过小,使得 Job 执行时间过长,无法充分利用分布式硬件资源。可以通过以下方法来控制 MapTask 的数量:
FileInputFormat.setMaxInputSplitSize(job,long bytes)
方法设置最大数据分片大小为一个小于默认 dfs.block.size
的值,越小 MapTask 数量越多。FileInputFormat.setMinInputSplitSize(job,long bytes)
方法设置最小数据分片大小为一个大于默认 dfs.block.size
的值,越大 MapTask 数量越少。从上面流程图整体可以看出,MapReduce 作业的执行可以分为 11 个步骤,涉及五个独立的实体。它们在 MapReduce 执行过程中的主要作用是:
在集群上运行一个 Job 主要分为 6个 大步骤,11 个小步骤,下面将具体内容。
在测试环境中使用少量数据集进行代码测试可以得到理想结果,而实际情况是,用户代码抛出异常、进程崩溃、机器故障等,主要有以下四种情况失败:
在 MapReduce 组件里,官方给我们提供了一些样例程序,其中非常有名的就是 wordcount 和 pi 程序。这些 MapReduce 程序的代码都在 hadoop-mapreduce-examples-2.7.4.jar 包里,这个 jar 包在Hadoop 安装目录下的 share/hadoop/mapreduce/ 目录里。
下面我们使用 Hadoop 命令来试跑例子程序,看看运行效果。
在 /root 目录下创建 wordcount.txt 文件,内容如下:
hello hbase
hello hadoop
hello hive
hello kubernetes
hello java
在 HDFS 中创建目录并上传文件:
#在 HDFS 中创建一个目录
hadoop fs -mkdir /wcinput
#将本机 /root/wordcount.txt 文件上传到 HDFS 的 /wcinput 目录中
hadoop fs -put /root/wordcount.txt /wcinput
使用以下命令运行 MapReduce 程序计算单词出现次数:
hadoop jar /software/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.4.jar wordcount /wcinput /wcoutput
查看输出结果,可以看到成功计算了每个单词出现的次数:
[root@hadoop1 ~]# hadoop fs -cat /wcoutput/part-r-00000
hadoop 1
hbase 1
hello 5
hive 1
java 1
kubernetes 1
下图说明了 wordcount 例子中 MapReduce 程序执行的流程:
用户编写的 MapReduce 程序分成三个部分:Mapper,Reducer,Driver:
<K,V>
对形式,<K,V>
类型可以根据实际情况自定义。MapTask 进程对每一个 <K,V>
调用一次。<K,V>
对形式,<K,V>
类型可以根据实际情况自定义。Reducetask 进程对每一组相同 K 的 <K,V>
组调用一次 reduce() 方法。<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.4</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.4</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.7.4</version>
</dependency>
编写一个 WordMapper 类继承 Mapper 类,并重写 map() 方法。Mapper 类是一个泛型类,4 个泛型类型分别代表(KeyIn,ValueIn,KeyOut,ValueOut )。泛型的类型可以根据自己实际的场景来指定。在 wordcount 这个例子中我们指定的类型如下:
Mapper 阶段依次读取每一行的数据,每行按照空格拆分出单词,得到 <单词,1> 的键值对,键是单词,值是 1,之后 Reduce 阶段累计单词出现的次数就累加 1 即可。
Mapper 阶段代码如下:
package com.chengzw.mr;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* @author chengzw
* @description Map 阶段,分别计算每行每个单词出现的次数,key 是单词,value 为 1(表示 1 个单词)。
* @since 2021/5/17 10:00 下午
*/
public class WordMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//1、切分单词
String[] words = value.toString().split(" ");
//2、单词转换 单词 -> <单词,1>
for (String word : words) {
//3、写入到上下文
context.write(new Text(word),new LongWritable(1));
}
}
}
编写一个类 WordReducer 继承 Reducer 类,并重写 reduce() 方法。Reducer 类是也是一个泛型类,4 个泛型类型分别代表(KeyIn,ValueIn,KeyOut,ValueOut )。泛型的类型可以根据自己实际的场景来指定。在 wordcount 这个例子中我们指定的类型如下:
Reduce 阶段接收到数据键是单词,值是一个可迭代的对象,是相同单词对应的次数(每个都是 1),只需要把这些 1 累加起来,就可以得到单词出现的总数了。
<hello,[1,1,1,1,1]>
Reduce 阶段代码如下:
package com.chengzw.mr;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* @author chengzw
* @description Reduce 阶段,把 key 相同的数据进行累计,得到每个单词出现的次数
* @since 2021/5/17 10:00 下午
*/
public class WordReducer extends Reducer<Text, LongWritable,Text,LongWritable> {
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
//1、定义一个变量
long count = 0;
//2、迭代
for (LongWritable value : values) {
count += value.get();
}
//3、写入上下文
context.write(key,new LongWritable(count));
}
}
创建提交给 YARN 集群运行的 Job 对象,其中封装了 MapReduce 程序运行所需要的相关参数,例如输入数据路径,输出数据路径,Mapper 参数,Reduce 参数等。
package com.chengzw.mr;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import java.io.IOException;
/**
* @description Driver驱动类
* @author chengzw
* @since 2021/5/20 8:39 下午
*/
public class JobMain {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//一、初始化Job
Configuration configuration = new Configuration();
//获取运行命令的参数,参数一:输入文件路径,参数二:输出文件路径
//如果输入路径是一个文件,那么只处理这个文件,如果指定的路径是目录,则处理这个目录下的所有文件
//输出路径只能是不存在的目录名
String [] otherArgs = new GenericOptionsParser(configuration,args).getRemainingArgs();
if(otherArgs.length < 2){
System.err.println("必须提供输入文件路径和输出文件路径");
System.exit(2);
}
Job job = Job.getInstance(configuration, "mr");
//二、设置Job的相关信息 8个小步骤
//1、设置输入路径
job.setInputFormatClass(TextInputFormat.class);
//本地运行
//TextInputFormat.addInputPath(job,new Path("/tmp/input/mr.txt"));
TextInputFormat.addInputPath(job,new Path(args[0]));
//2、设置Mapper类型,并设置输出键和输出值
job.setMapperClass(WordMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
//shuffle阶段,使用默认的
//3、设置Reducer类型,并设置输出键和输出值
job.setReducerClass(WordReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//4、设置输出路径
job.setOutputFormatClass(TextOutputFormat.class);
//本地运行
//TextOutputFormat.setOutputPath(job,new Path("/tmp/output/mr"));
TextOutputFormat.setOutputPath(job,new Path(args[1]));
//三、等待完成
boolean b = job.waitForCompletion(true);
System.out.println(b==true?"MapReduce 任务执行成功!":"MapReduce 任务执行失败!");
System.exit(b ? 0 : 1);
}
}
编写完代码以后我们可以先在本地进行测试,我们可以在 IntelloiJ IDEA 上设置运行程序时传递的参数(main 方法的 args)。
第一个参数是输入的目录路径,该目录下只有一个 mr.txt文件,文件内容如下:
❯ cat /tmp/input/mr.txt
hello hbase
hello hadoop
hello hive
hello kubernetes
hello java
第二个参数是输出的目录路径,这个目录名是不存在的,在运行完 MapReduce 程序后会自动生成该目录(该目录前面的目录不存在也会递归创建)。
点击 run 运行程序:
查看输出结果,可以看到成功统计了每个单词出现的总数。
❯ cat /tmp/output/mr/part-r-00000
hadoop 1
hbase 1
hello 5
hive 1
java 1
kubernetes 1
本地验证程序可以正常运行后,将程序打包为 JAR 包,放到 Hadoop 集群上运行。我们的项目是一个 Maven 项目,点击 Maven -> package 就可以打包了。
将生成的 JAR 包拷贝到 Hadoop 机器上:
scp mapreduce-1.0-SNAPSHOT.jar root@hadoop1:/root
使用 hadoop jar 命令运行程序,第一个参数为 JAR 包的路径,第二个参数为 Driver 类全名,第三个参数为输入目录,第四个参数为输出目录。
[root@hadoop1 ~]# hadoop jar /root/mapreduce.jar com.chengzw.mr.JobMain /wcinput /my_wcoutput
运行完成后,查看输出结果:
[root@hadoop1 ~]# hadoop fs -cat /my_wcoutput/part-r-00000
hadoop 1
hbase 1
hello 5
hive 1
java 1
kubernetes 1
接下来实现一个稍微复杂点的例子,有 order_001 ~ order_006 总共 6 个订单编号,将这些订单分成 3 组,输出到 3 个文件中。然后在每个文件中再根据订单编号进行分组,保留这个订单编号中金额最大的前 3 条记录。
原始数据如下:
订单编号 商品编号 金额
order_001 goods_001 100
order_001 goods_002 200
order_002 goods_001 300
order_002 goods_002 400
order_004 goods_003 500
order_003 goods_001 800
order_002 goods_004 500
order_005 goods_002 320
order_001 goods_003 230
order_002 goods_005 730
order_003 goods_003 100
order_006 goods_001 100
order_004 goods_002 350
order_002 goods_001 300
order_006 goods_002 1100
order_006 goods_003 500
order_003 goods_001 800
order_004 goods_004 1200
order_003 goods_002 100
order_005 goods_003 200
order_002 goods_005 700
order_005 goods_003 1300
编写订单实体类,实现 WritableComparable 接口,我们需要实现 3 个方法:
package com.chengzw.order;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* @author chengzw
* @description 订单实体类
* @since 2021/6/11
*/
public class OrderBean implements WritableComparable<OrderBean> {
private String orderId; //订单编号
private Double price; //订单中某个商品的价格
public String getOrderId() {
return orderId;
}
public void setOrderId(String orderId) {
this.orderId = orderId;
}
public Double getPrice() {
return price;
}
public void setPrice(Double price) {
this.price = price;
}
@Override
public String toString() {
return "OrderBean{" +
"orderId='" + orderId + '\'' +
", price=" + price +
'}';
}
/**
* @param o 实体参数
* @return 指定排序的规则
*/
@Override
public int compareTo(OrderBean o) {
//1、先比较订单的id,如果id一样,则将订单的商品按金额排序(降序)
int i = this.orderId.compareTo(o.orderId);
if (i == 0) {
//因为是降序,所以有-1
i = this.price.compareTo(o.price) * - 1;
}
return i;
}
/**
* 实现对象的序列化
*
* @param out
* @throws IOException
*/
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(orderId);
out.writeDouble(price);
}
/**
* 实现对象的反序列化
*
* @param in
* @throws IOException
*/
@Override
public void readFields(DataInput in) throws IOException {
this.orderId = in.readUTF();
this.price = in.readDouble();
}
}
Mapper 输入的键是每行文字的起始位置,输入的值是一行文本。Mapper 输出的键是 OrderBean,输出的值是一行文本。
package com.chengzw.order;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* @author chengzw
* @description Mapper
* @since 2021/6/11
*/
public class OrderMapper extends Mapper<LongWritable, Text,OrderBean,Text> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//1、拆分行文本数据,得到订单的id和订单的金额
String[] split = value.toString().split(" ");
//2、封装OrderBean实体类
OrderBean orderBean = new OrderBean();
orderBean.setOrderId(split[0]);
orderBean.setPrice(Double.parseDouble(split[2]));
//3、写入上下文
context.write(orderBean,value);
}
}
Partition 和 Group 都是属于 Shuffle 阶段,在 Partition 阶段根据订单编号对数据进行分区,把结果发送给对应的 Reduce 节点并行执行。
package com.chengzw.order;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
/**
* @author chengzw
* @description 根据订单号分区,输出到不同的文件
* 分区的目的是根据Key值决定Mapper的输出记录被送到哪一个Reducer上去处理。
* @since 2021/6/11
*/
public class OrderPartition extends Partitioner<OrderBean, Text> {
/**
*
* @param orderBean k2
* @param text v2
* @param numPartitions ReduceTask的个数
* @return 返回的是分区的编号:比如说:ReduceTask的个数3个,返回的编号是 0 1 2
* 比如说:ReduceTask的个数2个,返回的编号是 0 1
* 比如说:ReduceTask的个数1个,返回的编号是 0
*/
@Override
public int getPartition(OrderBean orderBean, Text text, int numPartitions) {
//参考源码 return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
//按照key的hash值进行分区,解决int类型数据hashcode值取模出现负数而影响分区
//key.hashCode() & Integer.MAX_VALUE 是要保证任何map输出的key在numReduceTasks取模后决定的分区为正整数。
return (orderBean.getOrderId().hashCode() & Integer.MAX_VALUE) % numPartitions;
}
}
将同一个分区内的数据根据订单编号再进行分组。
package com.chengzw.order;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
/**
* @author chengzw
* @description 根据订单编号orderId分组,一个分区内可能有多个订单号
* 实现分组有固定的步骤:
* 1.继承WritableComparator
* 2.调用父类的构造器
* 3.指定分组的规则,重写一个方法
* @since 2021/6/11
*/
public class OrderGroup extends WritableComparator {
//1、继承WritableComparator类
//2、调用父类的构造器
public OrderGroup(){
//第一个参数就是分组使用的javabean,第二个参数就是布尔类型,表示是否可以创建这个类的实例
super(OrderBean.class,true);
}
// 3、指定分组的规则,需要重写一个方法
/**
* @param a WritableComparable是接口,Orderbean实现了这个接口
* @param b WritableComparable是接口,Orderbean实现了这个接口
* @return
*/
@Override
public int compare(WritableComparable a, WritableComparable b) {
//1、对形参a b 做强制类型转换
OrderBean first = (OrderBean) a;
OrderBean second = (OrderBean) b;
//2、指定分组的规则
return first.getOrderId().compareTo(second.getOrderId());
}
}
Reduce 输入的键的是相同订单编号的 OrderBean 对象,输入的值可迭代对象,是这些 OrderBean 对象对应的一行文本。
由于我们之前在 OrderBean 实体类中定义了排序规则(compareTo()方法),因此这里接收到的 OrderBean 对象是按照金额从大到小的顺序排序的,我们要取相同订单编号 TOP3 的商品,只需要循环遍历输入的值,取前 3 个即可。
输出的键这里直接使用输入的一行文本,输出的值就不需要了,就简单地指定为 NullWritable 类型。
package com.chengzw.order;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* @author chengzw
* @description Reduce
* @since 2021/6/11
*/
public class OrderReducer extends Reducer<OrderBean, Text,Text, NullWritable> {
@Override
protected void reduce(OrderBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
int i = 0;
//OrderBean中实现了compareTo方法,这里的values是已经排好序的了,先比较订单的id,如果id一样,则将订单的商品按金额排序(降序)
//获取top N ,下面的代码就是取出来top3。
for (Text value : values) {
context.write(value,NullWritable.get());
i++;
if (i >= 3){
break;
}
}
}
}
Driver 类这次除了指定了 Mapper 和 Reduce 参数,还指定了 Shuffle 参数。
package com.chengzw.order;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import java.io.IOException;
/**
* @author chengzw
* @description 求订单最大值的主类
* @since 2021/6/11
*/
public class JobMain {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//一、初始化一个job
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration, "mygroup");
//二、配置Job信息
//1.设置输入信息
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path("/tmp/input/orders.txt"));
//2.设置mapper 并设置输出的键和输出的值
job.setMapperClass(OrderMapper.class);
job.setMapOutputKeyClass(OrderBean.class);
job.setMapOutputValueClass(Text.class);
//3.分区设置,根据订单编号orderId分区,输出到不同的文件
job.setPartitionerClass(OrderPartition.class);
//4.分组设置,根据订单编号orderId分区,因为同一个文件中可能有多个订单号
job.setGroupingComparatorClass(OrderGroup.class);
//5.设置Reducer,输出的键和输出的值,计算出每个订单号中金额最大的3个商品
job.setReducerClass(OrderReducer.class);
job.setOutputKeyClass(OrderBean.class);
job.setOutputValueClass(NullWritable.class);
//设置NumReduceTask(分区)的个数 默认是1
job.setNumReduceTasks(3);
//6.设置输出
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job,new Path("/tmp/output/order"));
//三、等待完成 实际上就是提交任务
boolean b = job.waitForCompletion(true);
System.out.println(b==true?"MapReduce 任务执行成功!":"MapReduce 任务执行失败!");
System.exit(b ? 0 : 1);
}
}
本地运行程序:
输出有 3 个文件(分区),分别查看 3 个 文件,可以看到根据订单编号分成了 3 个文件,并且每个文件内保留了相同订单编号中金额最大的记录。
❯ cat /tmp/output/order/part-r-00000
order_002 goods_005 730
order_002 goods_005 700
order_002 goods_004 500
order_005 goods_003 1300
order_005 goods_002 320
order_005 goods_003 200
❯ cat /tmp/output/order/part-r-00001
order_003 goods_001 800
order_003 goods_001 800
order_003 goods_003 100
order_006 goods_002 1100
order_006 goods_003 500
order_006 goods_001 100
❯ cat /tmp/output/order/part-r-00002
order_001 goods_003 230
order_001 goods_002 200
order_001 goods_001 100
order_004 goods_004 1200
order_004 goods_003 500
order_004 goods_002 350