在之前的文章《Hadoop-本地模式搭建》小编提到了wordcount对单词的统计,当时使用Hadoop自带的hadoop-mapreduce-examples-2.7.3.jar的实现了统计,
命令如下:hadoop jar hadoop-mapreduce-examples-2.7.3.jar wordcount /root/temp/data.txt /root/temp/dataout看了效果,现在小编通过自己编写示例来完成这个功能。
操作的环境:
伪分布式环境:如环境未搭建,请返回文章《伪分布式模式搭建》查看。
需要的jar包
/root/training/hadoop-2.7.3/share/hadoop/common
/root/training/hadoop-2.7.3/share/hadoop/common/lib
/root/training/hadoop-2.7.3/share/hadoop/mapreduce
/root/training/hadoop-2.7.3/share/hadoop/mapreduce/lib
(/root/training/hadoop-2.7.3 此处是小编在搭建环境中约定的文件路径)
实现思路
演示程序中的data.txt需要自己创建,其中的内容为:
I love Beijing
I love China
Beijing is the capital of China
MapReduce的wordcount功能主要分两个功能:一个是分词,一个是统计,分别对应这个Mapper阶段和Reduce阶段
1.在Mapper阶段,将data.txt文件读入,并对一行行的数据进行读取,分词
对应着下面的实现类:WordCountMapper
2.在Reduce阶段,将Mapper的输出作为输入,进行单词的统计,并排序。
对应着项目的实现类:WordCountReducer
3.在两个阶段需要一个主类,实现job
Mapper类的实现
package demo.wc;
importjava.io.IOException;
importorg.apache.hadoop.mapreduce.Mapper;
//public classWordCountMapper extends Mapper {
public classWordCountMapper extends Mapper {
/*
* map方法是提供给map task进程来调用的,map task进程是每读取一行文本来调用一次我们自定义的map方法
* map task在调用map方法时,传递的参数:
* 一行的起始偏移量LongWritable作为key
* 一行的文本内容Text作为value1
*/
@Override
protectedvoid map(LongWritable key1, Text value1, Context context)
throwsIOException, InterruptedException {
/**
* context 代表Mapper的上下文
* 上文 HDFS
* 下文 Reducer
*/
//取出数据 I Love Beijing
Stringdata = value1.toString();
//分词
String[]words = data.split(" ");
for(Stringword: words) {
//k2为单词, v2:为计数
context.write(newText(word), new IntWritable(1));
}
}
}
Reduce类的实现
package demo.wc;
importjava.io.IOException;
//public classWordCountReducer extends Reducer {
public classWordCountReducer extends Reducer {
/*
* reduce方法提供给reduce task进程来调用
*
* reduce task会将shuffle阶段分发过来的大量kv数据对进行聚合,聚合的机制是相同key的kv对聚合为一组
* 然后reduce task对每一组聚合kv调用一次我们自定义的reduce方法
* 调用时传递的参数:
*k3:一组kv中的key
*v3:一组kv中所有value的迭代器
*/
@Override
protectedvoid reduce(Text k3, Iterable v3,
Contextcontext) throws IOException, InterruptedException {
/**
* context 待办reduce的上下文
* 上文: Mapper
* 下文: HDFS
*/
//对V3进行求和
inttotal = 0;
for(IntWritablev: v3) {
total+= v.get();
}
//输出 K4单词 V4频率
context.write(k3,new IntWritable(total));
}
}
job主类的实现
package demo.wc;
public classWordCountMain {
publicstatic void main(String[] args) throws Exception {
//创建Job
Jobjob = Job.getInstance(new Configuration());
//指定任务的入口
job.setJarByClass(WordCountMain.class);
//指任务的map和输出的数据类型 k2 v2
job.setMapperClass(WordCountMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//指定任务的reduce和输出的数据类型 k4 v4
job.setReducerClass(WordCountReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//指定输入的路径(map) 输出的路径(reduce)
FileInputFormat.setInputPaths(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
//执行任务
job.waitForCompletion(true);
}
}
打包
打包内容其实很简单,只是罗列下注意的地方点就好,在这个界面Main class要选择入口类。
运行
要将打包的文件上传到自己定义的目录下(小编放在/root/tmp目录下)
[root@bigdata11temp]# hadoop jar wc.jar /input/data.txt /output/day1215/wc
查看效果:
此篇文章主要是通过编写代码的形式来完成WordCount功能,其中将如何实现做了简单的介绍,如果大家有疑问,可以在文章后面留言一起讨论学习。
在公众号回复 3:获取最新的大数据学习路线,当前获取的大数据学习路线,还在完善中。
领取专属 10元无门槛券
私享最新 技术干货