作为Hadoop里重要的分布式计算组件MapReduce到底存在什么样的问题,大家纷纷都转投其他技术栈?我们来一起探个究竟。本文会先详细解析一下整个MapReduce的过程,编程方式,然后再去分析一下存在的问题和其中可以借鉴的点。
① : 每个数据的Split对应一个Map任务作为Map的输入,一般来说是HDFS的一个Block。
② : Map产生的数据会先写入到一个环形的内存的Buffer空间里。
③ : 当Buffer满了以后, 会Spill溢出数据到磁盘里。在溢出之前会先按照Partition函数对数据进行分区(默认是取key的hash值然后根据Reducer的个数进行取模),然后按照Key进行排序(快速排序)。如果设置了Combiner会在写入磁盘前,对数据进行Combine操作,通过减少key的数据量来减轻Reducer拉取数据的网络传输。
④ : 最后将所有的溢出文件合并为一个文件,合并的过程中按照分区按照key进行排序(归并排序), 如果溢出文件超过一定的数量(可配置), 会在合并的前还会执行Combine操作(如果设置了Combiner)。
⑤ : 当Map端有任务完成后,Reducer端就会启动对应的fetch & copy线程去从Map端复制数据。
⑥ : 当Copy过来的数据内存中放不下后,会往磁盘写,写之前会先进行merge和sort操作(归并排序),combiner操作,最终会合并得到一份Reduce的输入数据。
⑦ : 当输入数据准备好后,进行Reduce操作。
⑧ : 输出数据到指定的位置。
Map 输入是<Key, Value>, 输出是一个或者多个<Key, Value>Reduce的输入是<Key, Iteratorable<Value>> 输出是<Key, Value>。总结来说:
input<k1, v1>-->Map--><k2,v2>-->combine<k2,v2>-->Reduce--><k3, v3>(output)
以Wordcount为例:
实现Map接口类
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
private final statck IntWritable one = new IntWritable(1);
private Text word = new Text();
public void Map(Object key, Text value, Context context) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
实现Reducer接口类
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWriterable> {
private IntWritable result = new IntWritable();
public void Reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val: values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
处理任务的Job
public static void main(String[] args)throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.addOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true)?0:1);
}
用Spark来实现Wordcount
object WordCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("WordCount").setMaster("local[*]"))
val spark = SparkSession.builder().config(conf).getOrCreate()
import spark.implicits._
val count = spark.read.textFile("input.txt")
.flatMap(_.split(" "))
.Map(s => (s, 1))
.rdd
.ReduceByKey((a, b) => a + b)
.collect()
}
}
Join
操作,需要开发人员自己写Join
的逻辑实现:总结:个人觉得Map Reduce主要的问题在于函数太过于底层,对用户的使用和操作上来说不够灵活,另外强制约束了需要按key排序和输出到磁盘使得其有性能上损失。但是也并不是全部一无是处,其中
PartitionId
去排序,最终会按照PartiontionId的顺序将一个Map产生的所有文件合成一个文件,来减少碎文件。[2]: https://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations
领取专属 10元无门槛券
私享最新 技术干货