云计算分布式框架 Hadoop

107课时
2.2K学过
8分

课程评价 (0)

请对课程作出评价:
0/300

学员评价

暂无精选评价
15分钟

任务2:计数实例

在深入细节之前,先看一个MapReduce的应用示例,以便对MapReduce的工作方式有一个初步的认识。Hadoop 自带的示例程序 WordCount用于统计一批文本文件中单词出现的频率,完整的代码可在Hadoop 安装包中得到(在 src/examples 目录中)。这个应用适用于单机模式、伪分布式模式或完全分布式模式。

以WordCount字频统计工具为实例,可以更清晰地看到它在Hadoop中是如何进行工作的。首先看wordcount.java的执行过程。

WordCount实例代码如下。

//--WordCount.java程序源码--//
package org.myorg; 
import java.io.IOException;
import java.util.*;
mport org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
public class WordCount {
    /*
    这个类实现 Mapper 接口中的 map方法,
    输入参数中的 value 是文本文件中的一行,
    利用 StringTokenizer将这个字符串拆成单词,
    然后将输出结果<单词,1> 
    写入到 org.apache.hadoop.mapred.OutputCollector 中.
    */
    public static class MapClass extends MapReduceBase implements Mapper<LongWritable, Text, 
    Text, IntWritable>{
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
    
    /*代码中 LongWritable, IntWritable, Text 
    均是 Hadoop 中实现的用于封装 Java 数据类型的类,
        这些类都能够被串行化从而便于在分布式环境中进行数据交换,
        可以将它们分别视为 long, int, String 的替代品
        */
        
        public void map(LongWritable key, Text value, 
            OutputCollector<Text, IntWritable> output, 
            Reporter reporter) throws IOException {
            String line = value.toString();
            StringTokenizer itr = new StringTokenizer(line);
            while (itr.hasMoreTokens()) 
                {
                word.set(itr.nextToken());
                output.collect(word, one);
                }
            }
            }  
            //这个类实现 Reducer 接口中的 reduce 方法, 输入参数中的 key, values 是由 Map 				
            //任务输出的中间结果,values 是一个 Iterator
            public static class Reduce extends MapReduceBase
                implements Reducer<Text, IntWritable, Text, IntWritable> {
                public void reduce(Text key, Iterator<IntWritable> values,
                    OutputCollector<Text, IntWritable> output, 
                    Reporter reporter) throws IOException { 
                    int sum = 0;

                    //遍历这个 Iterator, 就可以得到属于同一个 key 的所有 value. 此处,key 	                      
                    //是一个单词,value是词频。
                    while (values.hasNext()) {
                        sum += values.next().get();
                        }
                        output.collect(key, new IntWritable(sum));
                        }
                }
                //在 Hadoop 中一次计算任务称为一个job, 可以通过一个 JobConf 对象设置如何					    
                //运行这个job。此处定义了输出的 key 的类型是 Text, value 的类型是 IntWritable。
                public int run(String[] args) throws Exception {
                    JobConf conf = new JobConf(getConf(), WordCount.class);
                    conf.setJobName("wordcount");
                    conf.setOutputValueClass(IntWritable.class);
                    conf.setMapperClass(MapClass.class);        
                    conf.setCombinerClass(Reduce.class);
                    conf.setReducerClass(Reduce.class);
                    conf.setInputPath(new Path(args[0]));
                    conf.setOutputPath(new Path(args[1]));    
                    JobClient.runJob(conf);
                    return 0;
                }
                //主函数main
                public static void main(String[] args) throws Exception {
                    if(args.length != 2)
                    {
                        System.err.println("Usage: WordCount <input path> <output path>");
                        System.exit(-1);
                    }
                // ToolRunner的run方法开始,run方法需要三个参数,第一个是一个Configuration					    
                //类的实例。第二个是WorCount类的实例,args就是从控制台接收到的命令行数组。
                int res = ToolRunner.run(new Configuration(), new WordCount(), args);
                System.exit(res);
        }
    }

(1)编译WordCount.java来创建jar包。HADOOP_HOME环境变量对应安装时的根目录,HADOOP_VERSION对应Hadoop的当前安装版本,编译WordCount.java来创建jar包,代码如下。

$ mkdir  wordcount_classes 
$javac-classpath${HADOOP_HOME}/hadoop-${HADOOP_VERSION}-core.jar-dwordcount_classesWord
 Count.java 
$ jar -cvf /usr/joe/wordcount.jar -C wordcount_classes/ 

(2)用实例文本文件作为输入,代码如下。

/usr/ccwan/wordcount/input - 是HDFS中的输入路径 
/usr/ccwan/wordcount/output - 是HDFS中的输出路径 
用示例文本文件做为输入。
$ bin/hadoop dfs -ls /usr/ccwan/wordcount/input/ 
/usr/ccwan/wordcount/input/file01
/usr/ccwan/wordcount/input/file02 
$ bin/hadoop dfs -cat /usr/ccwan/wordcount/input/file01  Hello World Bye World
$ bin/hadoop dfs -cat /usr/ccwan/wordcount/input/file02  Hello Hadoop Goodbye Hadoop 

应用程序能够使用-files选项来指定一个由逗号分隔的路径列表,这些路径是task的当前工作目录。使用选项-libjars可以向map和reduce的classpath中添加jar包。使用-archives选项程序可以传递档案文件作为参数,这些档案文件会被解压并且在task的当前工作目录下创建一个指向解压生成的目录的符号链接(以压缩包的名字命名)。

(3)运行应用程序,代码如下。

$bin/hadoop jar /usr/ccwan/wordcount.jar org.myorg.WordCount /usr/ccwan/wordcount/input
/usr/joe/wordcount/output 

(4)输出的结果如下。

$ bin/hadoop dfs -cat /usr/ccwan/wordcount/output/part-00000
Bye 1
Goodbye 1 
Hadoop 2 
Hello 2 
World 2

(5)使用-libjars和-files运行wordcount例子如下。

hadoop jar hadoop-examples.jar  wordcount -files  cachefile.txt -libjars mylib.jar input output