15分钟
任务4:去重实例
倒排索引的程序代码如下。
//--InvertedIndex.java部分源码--//
public class InvertedIndex {
public static class Map extends Mapper<Object, Text, Text, Text> {
private Text keyInfo = new Text();
// 存储单词和URL组合
private Text valueInfo = new Text();
// 存储词频
private FileSplit split;
// 存储Split对象
// 实现map函数
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
// 获得<key,value>对所属的FileSplit对象
split = (FileSplit) context.getInputSplit();
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens())
{
// key值由单词和URL组成,如"MapReduce:file1.txt"
// 获取文件的完整路径
// keyInfo.set(itr.nextToken()+":"+split.getPath().toString());
// 这里为了好看,只获取文件的名称。
int splitIndex = split.getPath().toString().indexOf("file");
keyInfo.set(itr.nextToken() + ":"
+ split.getPath().toString().substring(splitIndex));
// 词频初始化为1
valueInfo.set("1");
context.write(keyInfo, valueInfo);
}
}
}
public static class Combine extends Reducer<Text, Text, Text, Text> {
private Text info = new Text();
// 实现reduce函数
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
// 统计词频
int sum = 0;
for (Text value : values)
{
sum += Integer.parseInt(value.toString());
}
int splitIndex = key.toString().indexOf(":");
// 重新设置value值,由URL和词频组成
info.set(key.toString().substring(splitIndex + 1) + ":" + sum);
// 重新设置key值,为单词
key.set(key.toString().substring(0, splitIndex));
context.write(key, info);
}
}
public static class Reduce extends Reducer<Text, Text, Text, Text> {
private Text result = new Text();
// 实现reduce函数
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
// 生成文档列表
String fileList = new String();
for (Text value : values)
{
fileList += value.toString() + ";";
}
result.set(fileList);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
// 这句话很关键
conf.set("mapred.job.tracker", "192.168.1.2:9001");
String[] ioArgs = new String[] { "index_in", "index_out" };
String[] otherArgs = new GenericOptionsParser(conf, ioArgs)
.getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: Inverted Index <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "Inverted Index");
job.setJarByClass(InvertedIndex.class);
// 设置Map、Combine和Reduce处理类
job.setMapperClass(Map.class);
job.setCombinerClass(Combine.class);
job.setReducerClass(Reduce.class);
// 设置Map输出类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
// 设置Reduce输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// 设置输入和输出目录
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}代码分析如下。
(1)首先设置Map类,使用默认的TextInputFormat类对输入文件进行处理,得到文本中每行的偏移量及其内容。Map过程必须首先分析输入的<key,value>对,倒排索引中需要的三个信息:单词、文档URL和词频。实现代码如下。
//定义map类
public static class Map extends Mapper<Object, Text, Text, Text> {
private Text keyInfo = new Text();
// 存储单词和URL组合
private Text valueInfo = new Text();
// 存储词频
private FileSplit split;
// 存储Split对象
// 实现map函数
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
// 获得<key,value>对所属的FileSplit对象
split = (FileSplit) context.getInputSplit();
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
/*
key值由单词和URL组成,如"MapReduce:file1.txt",获取文件的完整路径
keyInfo.set(itr.nextToken()+":"+split.getPath().toString());
这里为了好看,只获取文件的名称。
*/
int splitIndex = split.getPath().toString().indexOf("file");
keyInfo.set(itr.nextToken() + ":"+ split.getPath().toString().substring(splitIndex));
// 词频初始化为1
valueInfo.set("1");
context.write(keyInfo, valueInfo);
}
}
}(2)其次是Combine过程。经过Map方法处理后,Combine过程将key值相同的value值累加,得到一个单词在文档中的词频。修改key值和value值。此处将单词作为key值,URL和词频组成value值(如"file1.txt:1"),以利用MapReduce框架默认的HashPartitioner类完成Shuffle过程,将相同单词的所有记录发送给同一个Reducer进行处理。实现代码如下:
public static class Combine extends Reducer<Text, Text, Text, Text> {
private Text info = new Text();
// 实现reduce函数
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
// 统计词频
int sum = 0;
for (Text value : values) {
sum += Integer.parseInt(value.toString());
}
int splitIndex = key.toString().indexOf(":");
// 重新设置,value值由URL和词频组成
info.set(key.toString().substring(splitIndex + 1) + ":" + sum);
// 重新设置,key值为单词
key.set(key.toString().substring(0, splitIndex));
context.write(key, info);
}
}(3)然后是Reduce过程, Reduce过程只需将相同key值的value值组合成倒排索引文件所需的格式即可,剩下的任务由MapReduce框架处理。实现代码如下:
public static class Reduce extends Reducer<Text, Text, Text, Text> {
private Text result = new Text();
// 实现reduce函数
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
// 生成文档列表
String fileList = new String();
for (Text value : values) {
fileList += value.toString() + ";";
}
result.set(fileList);
context.write(key, result);
}
}
学员评价