对于java来说,读取本地文件再正常不过。但是对于mapreduce程序来说,读取本地文件常常会陷入误区。本地明明有这个文件,在本地运行jar包,mapreduce为什么读不到?因为我们知道,mapreduce程序本来就不是在本地执行的,程序会分布式的在各个机器上执行,你当然读不到文件,那所谓的“本地文件”就不叫“本地文件”,当然只有一个例外:你的hadoop集群是伪集群。
比如下面的示例:
package test;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class FileTest
{
public static void main(String args[])
{
int mr = 0;
try
{
mr = ToolRunner
.run(new Configuration(), new FileTestDriver(), args);
}
catch (Exception e)
{
e.printStackTrace();
}
System.exit(mr);
}
}
class FileTestDriver extends Configured implements Tool
{
@Override
public int run(String[] arg0) throws Exception
{
Configuration config = getConf();
JobConf conf = new JobConf(config, FileTestDriver.class);
String[] otherArgs = new GenericOptionsParser(config, arg0)
.getRemainingArgs();
String input = otherArgs[0];
String ouput = otherArgs[1];
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
conf.set("mapred.task.timeout", "6000000");
conf.setMapperClass(FileTestMapper.class);
conf.setReducerClass(FileTestReducer.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(input));
FileOutputFormat.setOutputPath(conf, new Path(ouput));
JobClient.runJob(conf);
return 0;
}
}
class FileTestMapper extends MapReduceBase implements
Mapper<LongWritable, Text, Text, Text>
{
private String filepath = "";
public void configure(JobConf job)
{
filepath = job.get("files");
}
public void map(LongWritable key, Text value,
OutputCollector<Text, Text> output, Reporter reporter)
throws IOException
{
String url = "qq.com";
String host = getTop100DomainTest(url, filepath);
output.collect(new Text(url + "\t" + host), new Text(""));
}
public String getTop100DomainTest(String url, String filepath)
{
try
{
BufferedReader reader = new BufferedReader(new FileReader(new File(
filepath)));
String line = "";
while ((line = reader.readLine()) != null)
{
// splitLine[0]为host 后面跟着域名
line = line.replaceAll("( )+", " ");
String[] splitLine = line.split(" ");
for (int i = 1; i < splitLine.length; i++)
{
String host = splitLine[i];
if (url.equals(host))
{
return splitLine[0];
}
}
}
return "";
}
catch (FileNotFoundException e)
{
return "";
}
catch (IOException e)
{
return "";
}
}
}
class FileTestReducer extends MapReduceBase implements
Reducer<Text, Text, Text, Text>
{
public void reduce(Text key, Iterator<Text> values,
OutputCollector<Text, Text> output, Reporter reporter)
throws IOException
{
output.collect(key, new Text(""));
}
}
public String getTop100DomainTest(String url, String filepath)方法读取文件,并根据url返回url的domain。
将上述程序打包test.jar后,
运行命令:
hadoop jar test.jar test.FileTest -D files="/opt/top100.txt" /test/test /test/test1
如果您是伪集群,那么恭喜,程序成功运行,如果您是分布式,那么程序很可能运行不成功?
我们知道原理后,这段代码在分布式的情况下,也可以运行成功,怎么办?那就把集群的所有机器都拷贝top100.txt到/opt下!
程序运行成功了吧?但其实是很老土的。当你集群数多,你要一一拷贝,那是多么麻烦的一件事,而且所有的配置文件必须在同样的文件夹下,如果你能忍受,那go ahead。
实际上mapreduce提供了一个缓存方法DistributedCache。
只需在配置阶段加入:
DistributedCache.addCacheFile(new URI("/test/top100.txt"), conf);
即可,但此处的"/test/top100.txt"为hdfs的路径。
然后在mapper 的public void configure(JobConf job)方法中加入
public void configure(JobConf job)
{
try
{
localFiles = DistributedCache.getLocalCacheFiles(job);
}
catch (IOException e)
{
e.printStackTrace();
}
}
即可。
map中引用,通过 path.toUri().getPath()即可访问到file。