任务2:计数实例
在深入细节之前,先看一个MapReduce的应用示例,以便对MapReduce的工作方式有一个初步的认识。Hadoop 自带的示例程序 WordCount用于统计一批文本文件中单词出现的频率,完整的代码可在Hadoop 安装包中得到(在 src/examples 目录中)。这个应用适用于单机模式、伪分布式模式或完全分布式模式。
以WordCount字频统计工具为实例,可以更清晰地看到它在Hadoop中是如何进行工作的。首先看wordcount.java的执行过程。
WordCount实例代码如下。
//--WordCount.java程序源码--//
package org.myorg;
import java.io.IOException;
import java.util.*;
mport org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
public class WordCount {
/*
这个类实现 Mapper 接口中的 map方法,
输入参数中的 value 是文本文件中的一行,
利用 StringTokenizer将这个字符串拆成单词,
然后将输出结果<单词,1>
写入到 org.apache.hadoop.mapred.OutputCollector 中.
*/
public static class MapClass extends MapReduceBase implements Mapper<LongWritable, Text,
Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
/*代码中 LongWritable, IntWritable, Text
均是 Hadoop 中实现的用于封装 Java 数据类型的类,
这些类都能够被串行化从而便于在分布式环境中进行数据交换,
可以将它们分别视为 long, int, String 的替代品
*/
public void map(LongWritable key, Text value,
OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException {
String line = value.toString();
StringTokenizer itr = new StringTokenizer(line);
while (itr.hasMoreTokens())
{
word.set(itr.nextToken());
output.collect(word, one);
}
}
}
//这个类实现 Reducer 接口中的 reduce 方法, 输入参数中的 key, values 是由 Map
//任务输出的中间结果,values 是一个 Iterator
public static class Reduce extends MapReduceBase
implements Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException {
int sum = 0;
//遍历这个 Iterator, 就可以得到属于同一个 key 的所有 value. 此处,key
//是一个单词,value是词频。
while (values.hasNext()) {
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}
}
//在 Hadoop 中一次计算任务称为一个job, 可以通过一个 JobConf 对象设置如何
//运行这个job。此处定义了输出的 key 的类型是 Text, value 的类型是 IntWritable。
public int run(String[] args) throws Exception {
JobConf conf = new JobConf(getConf(), WordCount.class);
conf.setJobName("wordcount");
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(MapClass.class);
conf.setCombinerClass(Reduce.class);
conf.setReducerClass(Reduce.class);
conf.setInputPath(new Path(args[0]));
conf.setOutputPath(new Path(args[1]));
JobClient.runJob(conf);
return 0;
}
//主函数main
public static void main(String[] args) throws Exception {
if(args.length != 2)
{
System.err.println("Usage: WordCount <input path> <output path>");
System.exit(-1);
}
// ToolRunner的run方法开始,run方法需要三个参数,第一个是一个Configuration
//类的实例。第二个是WorCount类的实例,args就是从控制台接收到的命令行数组。
int res = ToolRunner.run(new Configuration(), new WordCount(), args);
System.exit(res);
}
}
(1)编译WordCount.java来创建jar包。HADOOP_HOME环境变量对应安装时的根目录,HADOOP_VERSION对应Hadoop的当前安装版本,编译WordCount.java来创建jar包,代码如下。
$ mkdir wordcount_classes
$javac-classpath${HADOOP_HOME}/hadoop-${HADOOP_VERSION}-core.jar-dwordcount_classesWord
Count.java
$ jar -cvf /usr/joe/wordcount.jar -C wordcount_classes/
(2)用实例文本文件作为输入,代码如下。
/usr/ccwan/wordcount/input - 是HDFS中的输入路径
/usr/ccwan/wordcount/output - 是HDFS中的输出路径
用示例文本文件做为输入。
$ bin/hadoop dfs -ls /usr/ccwan/wordcount/input/
/usr/ccwan/wordcount/input/file01
/usr/ccwan/wordcount/input/file02
$ bin/hadoop dfs -cat /usr/ccwan/wordcount/input/file01 Hello World Bye World
$ bin/hadoop dfs -cat /usr/ccwan/wordcount/input/file02 Hello Hadoop Goodbye Hadoop
应用程序能够使用-files选项来指定一个由逗号分隔的路径列表,这些路径是task的当前工作目录。使用选项-libjars可以向map和reduce的classpath中添加jar包。使用-archives选项程序可以传递档案文件作为参数,这些档案文件会被解压并且在task的当前工作目录下创建一个指向解压生成的目录的符号链接(以压缩包的名字命名)。
(3)运行应用程序,代码如下。
$bin/hadoop jar /usr/ccwan/wordcount.jar org.myorg.WordCount /usr/ccwan/wordcount/input
/usr/joe/wordcount/output
(4)输出的结果如下。
$ bin/hadoop dfs -cat /usr/ccwan/wordcount/output/part-00000
Bye 1
Goodbye 1
Hadoop 2
Hello 2
World 2
(5)使用-libjars和-files运行wordcount例子如下。
hadoop jar hadoop-examples.jar wordcount -files cachefile.txt -libjars mylib.jar input output
学员评价