前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >2021年大数据Hadoop(二十):MapReduce的排序和序列化

2021年大数据Hadoop(二十):MapReduce的排序和序列化

作者头像
Lansonli
发布2021-10-11 15:47:50
6450
发布2021-10-11 15:47:50
举报
文章被收录于专栏:Lansonli技术博客

MapReduce的排序和序列化

概述

序列化(Serialization)是指把结构化对象转化为字节流。

反序列化(Deserialization)是序列化的逆过程。把字节流转为结构化对象。

当要在进程间传递对象或持久化对象的时候,就需要序列化对象成字节流,反之当要将接收到或从磁盘读取的字节流转换为对象,就要进行反序列化。

Java的序列化(Serializable)是一个重量级序列化框架,一个对象被序列化后,会附带很多额外的信息(各种校验信息,header,继承体系…),不便于在网络中高效传输;所以,hadoop自己开发了一套序列化机制(Writable),精简,高效。不用像java对象类一样传输多层的父子关系,需要哪个属性就传输哪个属性值,大大的减少网络传输的开销。

Writable是Hadoop的序列化格式,hadoop定义了这样一个Writable接口。

一个类要支持可序列化只需实现这个接口即可。

代码语言:javascript
复制
public interface  Writable {

 void write(DataOutput out) throws IOException;

 void readFields(DataInput in) throws IOException;

}

另外 Writable 有一个子接口是 WritableComparable, WritableComparable 是既可实现序列化, 也可以对key进行比较, 我们这里可以通过自定义 Key 实现 WritableComparable 来实现我们的排序功能.

代码语言:javascript
复制
// WritableComparable分别继承Writable和Comparable

public interface WritableComparable<T> extends Writable, Comparable<T> {

}

//Comparable

public interface Comparable<T> {

    int compareTo(T var1);

}

Comparable接口中的comparaTo方法用来定义排序规则,用于将当前对象与方法的参数进行比较。

例如:o1.compareTo(o2);

如果指定的数与参数相等返回0。

如果指定的数小于参数返回 -1。

如果指定的数大于参数返回 1。

返回正数的话,当前对象(调用compareTo方法的对象o1)要排在比较对象(compareTo传参对象o2)后面,返回负数的话,放在前面。

需求

数据格式如下

a   1 a   9 b   3 a   7 b   8 b   10 a   5

要求:

第一列按照字典顺序进行排列

第一列相同的时候, 第二列按照升序进行排列

​​​​​​​分析

实现自定义的bean来封装数据,并将bean作为map输出的key来传输

MR程序在处理数据的过程中会对数据排序(map输出的kv对传输到reduce之前,会排序),排序的依据是map输出的key。所以,我们如果要实现自己需要的排序规则,则可以考虑将排序因素放到key中,让key实现接口:WritableComparable,然后重写key的compareTo方法。

如果自定义的JavaBean要参与MapReduce运算,则必须进行序列化,必须实现Writable接口,如果该JavaBean作为K2,则必须实现WritableComparable接口,让JavaBean具有排序的功能

实现

自定义类型和比较器

代码语言:javascript
复制
public class SortBean implements WritableComparable<SortBean>{



  private String word;

  private int  num;



  public String getWord() {

  return word;

  }



  public void setWord(String word) {

  this.word = word;

  }



  public int getNum() {

  return num;

  }



  public void setNum(int num) {

  this.num = num;

  }



  @Override

  public String toString() {

  return   word + "\t"+ num ;

  }



  //实现比较器,指定排序的规则

  /*

规则:

  第一列(word)按照字典顺序进行排列    //  aac   aad

  第一列相同的时候, 第二列(num)按照升序进行排列

   */

  /*

  a  1

  a  5

  b  3

  b  8

   */

  @Override

  public int compareTo(SortBean sortBean) {

  //先对第一列排序: Word排序

  int result = this.word.compareTo(sortBean.word);

  //如果第一列相同,则按照第二列进行排序

  if(result == 0){

  return  this.num - sortBean.num;

  }

  return result;

  }



  //实现序列化

  @Override

  public void write(DataOutput out) throws IOException {

  out.writeUTF(word);

  out.writeInt(num);

  }



  //实现反序列

  @Override

  public void readFields(DataInput in) throws IOException {

  this.word = in.readUTF();

  this.num = in.readInt();

  }

}

​​​​​​​编写Mapper代码

代码语言:javascript
复制
public class SortMapper extends Mapper<LongWritable,Text,SortBean,NullWritable> {

  /*

map方法将K1和V1转为K2和V2:



K1            V1

0            a  3

5            b  7

----------------------

K2                         V2

SortBean(a  3)         NullWritable

SortBean(b  7)         NullWritable

   */

  @Override

  protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

  //1:将行文本数据(V1)拆分,并将数据封装到SortBean对象,就可以得到K2

  String[] split = value.toString().split("\t");



  SortBean sortBean = new SortBean();

  sortBean.setWord(split[0]);

  sortBean.setNum(Integer.parseInt(split[1]));



  //2:将K2和V2写入上下文中

  context.write(sortBean, NullWritable.get());

  }

 }

​​​​​​​编写Reducer代码

代码语言:javascript
复制
public class SortReducer extends Reducer<SortBean,NullWritable,SortBean,NullWritable> {



   //reduce方法将新的K2和V2转为K3和V3

   @Override

   protected void reduce(SortBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {

      context.write(key, NullWritable.get());

   }

}

​​​​​​​编写主类代码

代码语言:javascript
复制
public class SortRunner {

      public static void main(String[] args) throws Exception {

  

  Configuration conf = new Configuration();

          //1:创建job对象

          Job job = Job.getInstance(conf, "mapreduce_sort");

   

       

       //2:指定job所在的jar包

  job.setJarByClass(SortRunner.class);

    

  //3:指定源文件的读取方式类和源文件的读取路径

  job.setInputFormatClass(TextInputFormat.class);

  ///TextInputFormat.addInputPath(job, new Path("hdfs://node01:8020/input/sort_input"));

  TextInputFormat.addInputPath(job, new Path("file:///D:\\input\\sort_input"));



  //4:指定自定义的Mapper类和K2、V2类型

  job.setMapperClass(SortMapper.class);

  job.setMapOutputKeyClass(SortBean.class);

  job.setMapOutputValueClass(NullWritable.class);





  //5:指定自定义的Reducer类和K3、V3的数据类型

  job.setReducerClass(SortReducer.class);

  job.setOutputKeyClass(SortBean.class);

  job.setOutputValueClass(NullWritable.class);





  //6:指定输出方式类和结果输出路径

  job.setOutputFormatClass(TextOutputFormat.class);

  TextOutputFormat.setOutputPath(job, new Path("file:///D:\\out\\sort_out"));



   

         //7:将job提交给yarn集群

         boolean bl = job.waitForCompletion(true);



         System.exit(bl?0:1);

      }

  }

  • 📢博客主页:https://lansonli.blog.csdn.net
  • 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
  • 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
  • 📢大数据系列文章会每天更新,停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2021/05/30 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • MapReduce的排序和序列化
    • 概述
      • 需求
        • ​​​​​​​分析
          • 实现
            • ​​​​​​​编写Mapper代码
              • ​​​​​​​编写Reducer代码
                • ​​​​​​​编写主类代码
                相关产品与服务
                文件存储
                文件存储(Cloud File Storage,CFS)为您提供安全可靠、可扩展的共享文件存储服务。文件存储可与腾讯云服务器、容器服务、批量计算等服务搭配使用,为多个计算节点提供容量和性能可弹性扩展的高性能共享存储。腾讯云文件存储的管理界面简单、易使用,可实现对现有应用的无缝集成;按实际用量付费,为您节约成本,简化 IT 运维工作。
                领券
                问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档