继上篇了解了使用MapReduce计算平均数以及去重后,我们再来一探MapReduce在排序以及单表关联上的处理方法。 在MapReduce系列的第一篇就有说过,MapReduce不仅是一种分布式的计算方法,更是一种解决问题的新思维、新思路。将原先看似可以一条龙似的处理一刀切成两端,一端是Map、一端是Reduce,Map负责分,Reduce负责合。
1.MapReduce排序 问题模型: 给出多个数据文件输入如:
sortfile1.txt
11
13
15
17
19
21
23
25
27
29
sortfile2.txt
10
12
14
16
18
20
22
24
26
28
30
sortfile3.txt
1
2
3
4
5
6
7
8
9
10
最终要完成排序形成结果文件格式如下:
1 1
2 2
3 3
4 4
5 5
6 6
7 7
8 8
9 9
10 10
11 10
……
要解决的问题有了,那么如何排序,如何实现,我们尚且存在哪些问题: 1.我们知道MapReduce本身就有自带的排序功能,能够直接用; 2.如果用MapReduce默认排序功能,如何使用,针对key为int类型以及String类型又有何不同; 3.如何保证三个输入文件乃至更多个输入文件的输入,使得在排序结果中全局有序
实际需求有了,问题也来了,那么需要一一解决。MapReduce确实有自己的排序机制,我们不会排开不用,但是不能完全靠内部机制实现。要知道MapReduce是根据key进行排序的,如果key为int类型,则按照key的数值大小排序;如果key为String类型,则按照字典先后顺序进行排序。为了保证这里的全局有序,需要定义一个自己的Partition类,起到一个全局筛选的作用,是的map过后的分配到reduce端的都是有序的。具体做法就是用输入数据的最大值除以系统partition数量的商作为分割数据的边界增量,也就是说分割数据的边界为此商的1倍、2倍至numPartitions-1倍,这样就能保证执行partition后的数据是整体有序的。之后,在Reduce端得到的<key, value-list>,根据value-list中的元素个数将输入的key作为value的输出次数,输出的key是一个全局变量,用于统计当前的位次。 具体代码如下:
public class Sort {
//map将输入中的value转化成IntWritable类型,作为输出的key
public static class Map extends Mapper<Object, Text, IntWritable, IntWritable>{
private static IntWritable data = new IntWritable();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
System.out.println("line:" + line);
try{
data.set(Integer.parseInt(line));
}catch(Exception e){
data.set(1000);
}
System.out.println("Map key:" + data .toString() );
context.write(data, new IntWritable(1));
}
}
//reduce将输入的key复制到输出的value,然后根据输入的
//value-list中元素的个数决定key的输出次数
//用全局linenum来代表key的位次
public static class Reduce extends Reducer<IntWritable,IntWritable,IntWritable,IntWritable> {
private static IntWritable linenum = new IntWritable(1);
public void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
for (IntWritable val : values) {
context.write(linenum , key);
System.out.println("Reduce key:" + linenum + "\tReduce value:" + key );
linenum = new IntWritable(linenum.get() + 1);
}
}
}
//自定义Partition函数,此函数根据输入数据的最大值和MapReduce框架中
//Partition的数量获取将输入数据按照大小分块的边界,然后根据输入数值和
//边界的关系返回对应的Partition ID
public static class Partition extends Partitioner <IntWritable,IntWritable> {
@Override
public int getPartition(IntWritable key, IntWritable value, int numPartitions) {
int Maxnumber = 65223;
int bound = Maxnumber/numPartitions + 1;
int keynumber = key.get();
for(int i = 0; i < numPartitions; i++){
System.out.println("numPartitions:" + numPartitions);
if(keynumber < bound*i && keynumber >= bound*(i-1))
return i-1;
}
return 0;
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "Sort");
job.setJarByClass(Sort.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setPartitionerClass(Partition.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
注意: 1.在自己新建测试数据的时候,需要小心处理,比如在sortfile1.txt中一共是10行数据,如果将换行符停留在第11行,则在map阶段会抛出格式转换异常,所以添加代码中try catch处理。 2.为了更清晰的看出MapReduce以及Partition的执行过程,通过打印信息来了解每一个执行过程。 3.Reduce中应该是“return 0”,圣经《hadoop 实战2》中写成了return -1,实践证明是有错误的
程序执行,打印信息如下:
15/01/28 21:19:28 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
15/01/28 21:19:28 WARN mapred.JobClient: No job jar file set. User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
15/01/28 21:19:28 INFO input.FileInputFormat: Total input paths to process : 3
15/01/28 21:19:29 INFO mapred.JobClient: Running job: job_local_0001
15/01/28 21:19:29 INFO input.FileInputFormat: Total input paths to process : 3
15/01/28 21:19:29 INFO mapred.MapTask: io.sort.mb = 100
15/01/28 21:19:29 INFO mapred.MapTask: data buffer = 79691776/99614720
15/01/28 21:19:29 INFO mapred.MapTask: record buffer = 262144/327680
line:11
15/01/28 21:19:29 INFO mapred.MapTask: Starting flush of map output
Map key:11
numPartitions:1
line:13
Map key:13
numPartitions:1
line:15
Map key:15
numPartitions:1
line:17
Map key:17
numPartitions:1
line:19
Map key:19
numPartitions:1
line:21
Map key:21
numPartitions:1
line:23
Map key:23
numPartitions:1
line:25
Map key:25
numPartitions:1
line:27
Map key:27
numPartitions:1
line:29
Map key:29
numPartitions:1
15/01/28 21:19:29 INFO mapred.MapTask: Finished spill 0
15/01/28 21:19:29 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
15/01/28 21:19:29 INFO mapred.LocalJobRunner:
15/01/28 21:19:29 INFO mapred.TaskRunner: Task 'attempt_local_0001_m_000000_0' done.
15/01/28 21:19:29 INFO mapred.MapTask: io.sort.mb = 100
15/01/28 21:19:29 INFO mapred.MapTask: data buffer = 79691776/99614720
15/01/28 21:19:29 INFO mapred.MapTask: record buffer = 262144/327680
line:10
Map key:10
numPartitions:1
line:12
Map key:12
numPartitions:1
line:14
Map key:14
numPartitions:1
line:16
Map key:16
numPartitions:1
line:18
Map key:18
numPartitions:1
line:20
Map key:20
numPartitions:1
line:22
Map key:22
numPartitions:1
line:24
Map key:24
numPartitions:1
line:26
Map key:26
numPartitions:1
line:28
Map key:28
numPartitions:1
line:30
Map key:30
numPartitions:1
15/01/28 21:19:29 INFO mapred.MapTask: Starting flush of map output
15/01/28 21:19:29 INFO mapred.MapTask: Finished spill 0
15/01/28 21:19:29 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000001_0 is done. And is in the process of commiting
15/01/28 21:19:29 INFO mapred.LocalJobRunner:
15/01/28 21:19:29 INFO mapred.TaskRunner: Task 'attempt_local_0001_m_000001_0' done.
15/01/28 21:19:29 INFO mapred.MapTask: io.sort.mb = 100
15/01/28 21:19:30 INFO mapred.JobClient: map 100% reduce 0%
15/01/28 21:19:30 INFO mapred.MapTask: data buffer = 79691776/99614720
15/01/28 21:19:30 INFO mapred.MapTask: record buffer = 262144/327680
line:1
Map key:1
numPartitions:1
line:2
Map key:2
numPartitions:1
line:3
Map key:3
numPartitions:1
line:4
Map key:4
numPartitions:1
line:5
Map key:5
numPartitions:1
line:6
Map key:6
numPartitions:1
line:7
Map key:7
numPartitions:1
line:8
Map key:8
numPartitions:1
line:9
Map key:9
numPartitions:1
line:10
Map key:10
numPartitions:1
15/01/28 21:19:30 INFO mapred.MapTask: Starting flush of map output
15/01/28 21:19:30 INFO mapred.MapTask: Finished spill 0
15/01/28 21:19:30 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000002_0 is done. And is in the process of commiting
15/01/28 21:19:30 INFO mapred.LocalJobRunner:
15/01/28 21:19:30 INFO mapred.TaskRunner: Task 'attempt_local_0001_m_000002_0' done.
15/01/28 21:19:30 INFO mapred.LocalJobRunner:
15/01/28 21:19:30 INFO mapred.Merger: Merging 3 sorted segments
15/01/28 21:19:30 INFO mapred.Merger: Down to the last merge-pass, with 3 segments left of total size: 316 bytes
15/01/28 21:19:30 INFO mapred.LocalJobRunner:
Reduce key:1 Reduce value:1
Reduce key:2 Reduce value:2
Reduce key:3 Reduce value:3
Reduce key:4 Reduce value:4
Reduce key:5 Reduce value:5
Reduce key:6 Reduce value:6
Reduce key:7 Reduce value:7
Reduce key:8 Reduce value:8
Reduce key:9 Reduce value:9
Reduce key:10 Reduce value:10
Reduce key:11 Reduce value:10
Reduce key:12 Reduce value:11
Reduce key:13 Reduce value:12
Reduce key:14 Reduce value:13
Reduce key:15 Reduce value:14
Reduce key:16 Reduce value:15
Reduce key:17 Reduce value:16
Reduce key:18 Reduce value:17
Reduce key:19 Reduce value:18
Reduce key:20 Reduce value:19
Reduce key:21 Reduce value:20
Reduce key:22 Reduce value:21
Reduce key:23 Reduce value:22
Reduce key:24 Reduce value:23
Reduce key:25 Reduce value:24
Reduce key:26 Reduce value:25
Reduce key:27 Reduce value:26
Reduce key:28 Reduce value:27
Reduce key:29 Reduce value:28
Reduce key:30 Reduce value:29
Reduce key:31 Reduce value:30
15/01/28 21:19:30 INFO mapred.TaskRunner: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
15/01/28 21:19:30 INFO mapred.LocalJobRunner:
15/01/28 21:19:30 INFO mapred.TaskRunner: Task attempt_local_0001_r_000000_0 is allowed to commit now
15/01/28 21:19:30 INFO output.FileOutputCommitter: Saved output of task 'attempt_local_0001_r_000000_0' to hdfs://hadoop:9000/usr/hadoop/output3
15/01/28 21:19:30 INFO mapred.LocalJobRunner: reduce > reduce
15/01/28 21:19:30 INFO mapred.TaskRunner: Task 'attempt_local_0001_r_000000_0' done.
15/01/28 21:19:31 INFO mapred.JobClient: map 100% reduce 100%
15/01/28 21:19:31 INFO mapred.JobClient: Job complete: job_local_0001
15/01/28 21:19:31 INFO mapred.JobClient: Counters: 14
15/01/28 21:19:31 INFO mapred.JobClient: FileSystemCounters
15/01/28 21:19:31 INFO mapred.JobClient: FILE_BYTES_READ=67220
15/01/28 21:19:31 INFO mapred.JobClient: HDFS_BYTES_READ=261
15/01/28 21:19:31 INFO mapred.JobClient: FILE_BYTES_WRITTEN=138115
15/01/28 21:19:31 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=168
15/01/28 21:19:31 INFO mapred.JobClient: Map-Reduce Framework
15/01/28 21:19:31 INFO mapred.JobClient: Reduce input groups=30
15/01/28 21:19:31 INFO mapred.JobClient: Combine output records=0
15/01/28 21:19:31 INFO mapred.JobClient: Map input records=31
15/01/28 21:19:31 INFO mapred.JobClient: Reduce shuffle bytes=0
15/01/28 21:19:31 INFO mapred.JobClient: Reduce output records=31
15/01/28 21:19:31 INFO mapred.JobClient: Spilled Records=62
15/01/28 21:19:31 INFO mapred.JobClient: Map output bytes=248
15/01/28 21:19:31 INFO mapred.JobClient: Combine input records=0
15/01/28 21:19:31 INFO mapred.JobClient: Map output records=31
15/01/28 21:19:31 INFO mapred.JobClient: Reduce input records=31
通过打印信息我们知道: Map output records=31 Reduce input records=31 首先执行了Map,进行数据逐行输入,然后执行Partition过程,给每个元素打上唯一标记,确保进入Reduce阶段时整齐有序,最后执行Reduce阶段,完成全局排序过程。
最终的输出文件信息:
1 1
2 2
3 3
4 4
5 5
6 6
7 7
8 8
9 9
10 10
11 10
12 11
13 12
14 13
15 14
16 15
17 16
18 17
19 18
20 19
21 20
22 21
23 22
24 23
25 24
26 25
27 26
28 27
29 28
30 29
31 30
其实MapReduce的排序就是这么easy,先是让所有的人都进来,按照map的指定格式写入context,再经过partition全局指挥官的考量,打上排序的标记,最后在reduce中完成最终排序结果的输出。
2.MapReduce单表关联 问题模型,给出多个输入文件如下:
table1.txt
大儿子 爸爸
小儿子 爸爸
大女儿 爸爸
小女儿 爸爸
爸爸 爷爷
爸爸 二大爷
爸爸 三大爷
table2.txt
二女儿 妈妈
二儿子 妈妈
妈妈 爷爷
妈妈 二大爷
妈妈 三大爷
最终要得到的数据形式为:
grandchild grandparent
二女儿 爷爷
二女儿 二大爷
二女儿 三大爷
二儿子 爷爷
二儿子 二大爷
……
MapReduce下的表与表或者表与自身的连接不会像传统SQL语句那样直接一个left join、right join就能出一个最终表,鉴于本场景的需求,需要进行表连接,一个左表、一个右表,都是同一张表,连接的条件是左表的parent列以及右表的child列,整个过程就是一个自连接过程。 我们的解决思路如下: 1.Map端将输入数据分割为parent和child列,将parent设置为key,child设置为value输出,记为左表;再将同意对child和parent中的child设为key,parent设为value输出,记为右表 2.为了区分左右表,需要在输出的value中添加有关左右表的标示信息 3.在Reduce接收到的经过shuffle过程的结果中,每个key的value-list就包含了grandchild和grandparent关系,取出每个key的value-list进行解析,将左表的child放入一个数组,右表中的parent放入一个数组,然后对这两个数据求笛卡尔积就是最终结果
代码如下:
public class STjoin {
public static int time = 0;
//map将输入分割成child和parent,然后正序输出一次作为右表,反//序输出一次作为左表,需要注意的是在输出的value中必须加上左右表//区别标志
public static class Map extends Mapper<Object, Text, Text, Text>{
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String childname = new String();
String parentname = new String();
String relationtype = new String();
String line = value.toString();
int i = 0;
while(line.charAt(i)!=' '){
i++;
}
String[] values = {line.substring(0,i),line.substring(i+1)};
if(values[0].compareTo("child") != 0)
{
childname = values[0];
parentname = values[1];
relationtype = "1"; //左右表区分标志
context.write(new Text(values[1]), new Text(relationtype + "+" + childname + "+" + parentname));
System.out.println("左表 Map key:" + new Text(values[1]) + "\tvalue:" + (relationtype + "+" + childname + "+" + parentname) );
//左表
relationtype = "2";
context.write(new Text(values[0]), new Text(relationtype + "+" + childname + "+" + parentname));
System.out.println("右表 Map key:" + new Text(values[0]) + "\tvalue:" + (relationtype + "+" + childname + "+" + parentname) );
//右表
}
}
}
public static class Reduce extends Reducer<Text,Text,Text,Text> {
public void reduce(Text key, Iterable<Text> values,Context context) throws IOException, InterruptedException {
if(time == 0){ //输出表头
context.write(new Text("grandchild"),new Text("grandparent"));
time++;
}
int grandchildnum = 0;
String grandchild[] = new String[10];
int grandparentnum = 0;
String grandparent[] = new String[10];
Iterator ite = values.iterator();
while(ite.hasNext())
{
String record = ite.next().toString();
int len = record.length();
int i = 2;
if(len == 0) continue;
char relationtype = record.charAt(0);
String childname = new String();
String parentname = new String();
//获取value-list中value的child
while(record.charAt(i) != '+')
{
childname = childname + record.charAt(i);
i++;
}
i = i+1;
//获取value-list中value的parent
while(i < len)
{
parentname = parentname + record.charAt(i);
i++;
}
//左表,取出child放入grandchild
if(relationtype == '1'){
grandchild[grandchildnum] = childname;
grandchildnum++;
}
else{//右表,取出parent放入grandparent
grandparent[grandparentnum] = parentname;
grandparentnum++;
}
}
//grandchild和grandparent数组求笛卡儿积
if(grandparentnum != 0 && grandchildnum != 0){
for(int m = 0; m < grandchildnum; m++){
for(int n = 0; n < grandparentnum; n++){
context.write(new Text(grandchild[m]),new Text(grandparent[n])); //输出结果
System.out.println("Reduce 孙子:" + grandchild[m] + "\t 爷爷:" + grandparent[n]);
}
}
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "single table join");
job.setJarByClass(STjoin.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
代码写的很明白,为了弄清楚MapReduce每一步还是加入了打印信息,程序执行的过程信息如下:
15/01/28 22:06:28 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
15/01/28 22:06:28 WARN mapred.JobClient: No job jar file set. User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
15/01/28 22:06:28 INFO input.FileInputFormat: Total input paths to process : 2
15/01/28 22:06:28 INFO mapred.JobClient: Running job: job_local_0001
15/01/28 22:06:28 INFO input.FileInputFormat: Total input paths to process : 2
15/01/28 22:06:28 INFO mapred.MapTask: io.sort.mb = 100
15/01/28 22:06:28 INFO mapred.MapTask: data buffer = 79691776/99614720
15/01/28 22:06:28 INFO mapred.MapTask: record buffer = 262144/327680
左表 Map key:爸爸 value:1+大儿子+爸爸
右表 Map key:大儿子 value:2+大儿子+爸爸
左表 Map key:爸爸 value:1+小儿子+爸爸
右表 Map key:小儿子 value:2+小儿子+爸爸
左表 Map key:爸爸 value:1+大女儿+爸爸
右表 Map key:大女儿 value:2+大女儿+爸爸
左表 Map key:爸爸 value:1+小女儿+爸爸
右表 Map key:小女儿 value:2+小女儿+爸爸
左表 Map key:爷爷 value:1+爸爸+爷爷
右表 Map key:爸爸 value:2+爸爸+爷爷
左表 Map key:二大爷 value:1+爸爸+二大爷
右表 Map key:爸爸 value:2+爸爸+二大爷
左表 Map key:三大爷 value:1+爸爸+三大爷
右表 Map key:爸爸 value:2+爸爸+三大爷
15/01/28 22:06:28 INFO mapred.MapTask: Starting flush of map output
15/01/28 22:06:28 INFO mapred.MapTask: Finished spill 0
15/01/28 22:06:28 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
15/01/28 22:06:28 INFO mapred.LocalJobRunner:
15/01/28 22:06:28 INFO mapred.TaskRunner: Task 'attempt_local_0001_m_000000_0' done.
15/01/28 22:06:28 INFO mapred.MapTask: io.sort.mb = 100
15/01/28 22:06:28 INFO mapred.MapTask: data buffer = 79691776/99614720
15/01/28 22:06:28 INFO mapred.MapTask: record buffer = 262144/327680
左表 Map key:妈妈 value:1+二女儿+妈妈
右表 Map key:二女儿 value:2+二女儿+妈妈
左表 Map key:妈妈 value:1+二儿子+妈妈
右表 Map key:二儿子 value:2+二儿子+妈妈
左表 Map key:爷爷 value:1+妈妈+爷爷
右表 Map key:妈妈 value:2+妈妈+爷爷
左表 Map key:二大爷 value:1+妈妈+二大爷
右表 Map key:妈妈 value:2+妈妈+二大爷
左表 Map key:三大爷 value:1+妈妈+三大爷
右表 Map key:妈妈 value:2+妈妈+三大爷
15/01/28 22:06:28 INFO mapred.MapTask: Starting flush of map output
15/01/28 22:06:28 INFO mapred.MapTask: Finished spill 0
15/01/28 22:06:28 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000001_0 is done. And is in the process of commiting
15/01/28 22:06:28 INFO mapred.LocalJobRunner:
15/01/28 22:06:28 INFO mapred.TaskRunner: Task 'attempt_local_0001_m_000001_0' done.
15/01/28 22:06:28 INFO mapred.LocalJobRunner:
15/01/28 22:06:28 INFO mapred.Merger: Merging 2 sorted segments
15/01/28 22:06:28 INFO mapred.Merger: Down to the last merge-pass, with 2 segments left of total size: 697 bytes
15/01/28 22:06:28 INFO mapred.LocalJobRunner:
Reduce 孙子:二女儿 爷爷:爷爷
Reduce 孙子:二女儿 爷爷:二大爷
Reduce 孙子:二女儿 爷爷:三大爷
Reduce 孙子:二儿子 爷爷:爷爷
Reduce 孙子:二儿子 爷爷:二大爷
Reduce 孙子:二儿子 爷爷:三大爷
Reduce 孙子:大儿子 爷爷:爷爷
Reduce 孙子:大儿子 爷爷:二大爷
Reduce 孙子:大儿子 爷爷:三大爷
Reduce 孙子:小儿子 爷爷:爷爷
Reduce 孙子:小儿子 爷爷:二大爷
Reduce 孙子:小儿子 爷爷:三大爷
Reduce 孙子:大女儿 爷爷:爷爷
Reduce 孙子:大女儿 爷爷:二大爷
Reduce 孙子:大女儿 爷爷:三大爷
Reduce 孙子:小女儿 爷爷:爷爷
Reduce 孙子:小女儿 爷爷:二大爷
Reduce 孙子:小女儿 爷爷:三大爷
15/01/28 22:06:28 INFO mapred.TaskRunner: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
15/01/28 22:06:28 INFO mapred.LocalJobRunner:
15/01/28 22:06:28 INFO mapred.TaskRunner: Task attempt_local_0001_r_000000_0 is allowed to commit now
15/01/28 22:06:28 INFO output.FileOutputCommitter: Saved output of task 'attempt_local_0001_r_000000_0' to hdfs://hadoop:9000/usr/hadoop/output4
15/01/28 22:06:28 INFO mapred.LocalJobRunner: reduce > reduce
15/01/28 22:06:28 INFO mapred.TaskRunner: Task 'attempt_local_0001_r_000000_0' done.
15/01/28 22:06:29 INFO mapred.JobClient: map 100% reduce 100%
15/01/28 22:06:29 INFO mapred.JobClient: Job complete: job_local_0001
15/01/28 22:06:29 INFO mapred.JobClient: Counters: 14
15/01/28 22:06:29 INFO mapred.JobClient: FileSystemCounters
15/01/28 22:06:29 INFO mapred.JobClient: FILE_BYTES_READ=50580
15/01/28 22:06:29 INFO mapred.JobClient: HDFS_BYTES_READ=515
15/01/28 22:06:29 INFO mapred.JobClient: FILE_BYTES_WRITTEN=103312
15/01/28 22:06:29 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=369
15/01/28 22:06:29 INFO mapred.JobClient: Map-Reduce Framework
15/01/28 22:06:29 INFO mapred.JobClient: Reduce input groups=12
15/01/28 22:06:29 INFO mapred.JobClient: Combine output records=0
15/01/28 22:06:29 INFO mapred.JobClient: Map input records=12
15/01/28 22:06:29 INFO mapred.JobClient: Reduce shuffle bytes=0
15/01/28 22:06:29 INFO mapred.JobClient: Reduce output records=19
15/01/28 22:06:29 INFO mapred.JobClient: Spilled Records=48
15/01/28 22:06:29 INFO mapred.JobClient: Map output bytes=645
15/01/28 22:06:29 INFO mapred.JobClient: Combine input records=0
15/01/28 22:06:29 INFO mapred.JobClient: Map output records=24
15/01/28 22:06:29 INFO mapred.JobClient: Reduce input records=24
最终得到的文件就是打印信息中的输出信息:
grandchild grandparent
二女儿 爷爷
二女儿 二大爷
二女儿 三大爷
二儿子 爷爷
二儿子 二大爷
二儿子 三大爷
大儿子 爷爷
大儿子 二大爷
大儿子 三大爷
小儿子 爷爷
小儿子 二大爷
小儿子 三大爷
大女儿 爷爷
大女儿 二大爷
大女儿 三大爷
小女儿 爷爷
小女儿 二大爷
小女儿 三大爷
如果觉得有用,记得点赞哦,也欢迎加入大数据群413471695进行技术讨论^_^