前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >代码实现——MapReduce实现Hadoop序列化

代码实现——MapReduce实现Hadoop序列化

作者头像
栗筝i
发布2022-12-01 08:54:55
2280
发布2022-12-01 08:54:55
举报
文章被收录于专栏:迁移内容

简单介绍

1、什么是序列化

  • 序列化:把内存中的对象,转换成字节序列(或其他数据传输协议)以便于存储到磁盘(持久化)和网络传输。
  • 反序列化:将收到字节序列(或其他数据传输协议)或者是磁盘的持久化数据,转换成内存中的对象。

2、 为什么要序列化

对象的序列化(Serialization)用于将对象编码成一个字节流,以及从字节流中重新构建对象。"将一个对象编码成一个字节流"称为序列化该对象(SeTializing);相反的处理过程称为反序列化(Deserializing)。 序列化有三种主要的用途:

  1. 作为一种持久化格式:一个对象被序列化以后,它的编码可以被存储到磁盘上,供以后反序列化用。
  2. 作为一种通信数据格式:序列化结果可以从一个正在运行的虚拟机,通过网络被传递到另一个虚拟机上。
  3. 作为一种拷贝、克隆(clone)机制:将对象序列化到内存的缓存区中。然后通过反序列化,可以得到一个对已存对象进行深拷贝的新对象。

在分布式数据处理中,主要使用上面提到的前两种功能:数据持久化和通信数据格式

需求

统计每一个手机号耗费的总上行流量、下行流量、总流量(txt文档在/Users/lizhengi/test/input/目录下)

代码语言:javascript
复制
1       13736230513     192.196.2.1     www.shouhu.com  2481    24681   200
2       13846544121     192.196.2.2                     264     0       200
3       13956435636     192.196.2.3                     132     1512    200
4       13966251146     192.168.2.1                     240     0       404
5       18271575951     192.168.2.2     www.shouhu.com  1527    2106    200
6       18240717138     192.168.2.3     www.hao123.com  4116    1432    200
7       13590439668     192.168.2.4                     1116    954     200
8       15910133277     192.168.2.5     www.hao123.com  3156    2936    200
9       13729199489     192.168.2.6                     240     0       200
10      13630577991     192.168.2.7     www.shouhu.com  6960    690     200
11      15043685818     192.168.2.8     www.baidu.com   3659    3538    200
12      15959002129     192.168.2.9     www.hao123.com  1938    180     500
13      13560439638     192.168.2.10                    918     4938    200
14      13470253144     192.168.2.11                    180     180     200
15      13682846555     192.168.2.12    www.qq.com      1938    2910    200
16      13992314666     192.168.2.13    www.gaga.com    3008    3720    200
17      13509468723     192.168.2.14    www.qinghua.com 7335    110349  404
18      18390173782     192.168.2.15    www.sogou.com   9531    2412    200
19      13975057813     192.168.2.16    www.baidu.com   11058   48243   200
20      13768778790     192.168.2.17                    120     120     200
21      13568436656     192.168.2.18    www.alibaba.com 2481    24681   200
22      13568436656     192.168.2.19                    1116    954     200

实现过程

1、新建Maven工程,pom.xml依赖如下

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.lizhengi</groupId>
    <artifactId>Hadoop-API</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.8.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>3.2.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.2.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>3.2.1</version>
        </dependency>
    </dependencies>


</project>

2、src/main/resources目录下,新建一个文件,命名为“log4j.properties”,添加内容如下

代码语言:javascript
复制
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

3、编写Bean类-FlowBean

代码语言:javascript
复制
package com.lizhengi.flow;


import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
 * @author lizhengi
 * @create 2020-07-20
 */
// 1 实现writable接口
public class FlowBean implements Writable {

    private long upFlow;    //上行流量
    private long downFlow;  //下行流量
    private long sumFlow;   //总流量

    //2  反序列化时,需要反射调用空参构造函数,所以必须有
    public FlowBean() {
    }

    @Override
    public String toString() {
        return upFlow + "\t" + downFlow + "\t" + sumFlow;
    }
    public void set(long upFlow, long downFlow) {
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        this.sumFlow = upFlow + downFlow;
    }

    public long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }

    public long getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }

    public long getSumFlow() {
        return sumFlow;
    }

    public void setSumFlow(long sumFlow) {
        this.sumFlow = sumFlow;
    }


    //3  写序列化方法
    public void write(DataOutput out) throws IOException {
        out.writeLong(upFlow);
        out.writeLong(downFlow);
        out.writeLong(sumFlow);
    }

    //4 反序列化方法
    //5 反序列化方法读顺序必须和写序列化方法的写顺序必须一致
    public void readFields(DataInput in) throws IOException {
        upFlow = in.readLong();
        downFlow = in.readLong();
        sumFlow = in.readLong();
    }

}

4、编写Mapper类-FlowMapper

代码语言:javascript
复制
package com.lizhengi.flow;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;


/**
 * @author lizhengi
 * @create 2020-07-20
 */
public class FlowMapper extends Mapper<LongWritable, Text, Text, FlowBean> {

    private Text phone = new Text();
    private FlowBean flow = new FlowBean();

    @Override
    protected void map(LongWritable key, Text value, Context context)	throws IOException, InterruptedException {


        String[] fields = value.toString().split("\t");

        phone.set(fields[1]);


        long upFlow = Long.parseLong(fields[fields.length - 3]);
        long downFlow = Long.parseLong(fields[fields.length - 2]);


        flow.set(upFlow,downFlow);
        context.write(phone, flow);
    }
}

5、编写Reducer类-FlowReducer

代码语言:javascript
复制
package com.lizhengi.flow;


import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;


/**
 * @author lizhengi
 * @create 2020-07-20
 */
public class FlowReducer extends Reducer<Text, FlowBean, Text, FlowBean> {

    private FlowBean sunFlow = new FlowBean();

    @Override
    protected void reduce(Text key, Iterable<FlowBean> values, Context context)throws IOException, InterruptedException {

        long sum_upFlow = 0;
        long sum_downFlow = 0;

        // 1 遍历所用bean,将其中的上行流量,下行流量分别累加
        for (FlowBean value : values) {
            sum_upFlow += value.getUpFlow();
            sum_downFlow += value.getDownFlow();
        }

        // 2 封装对象
        sunFlow.set(sum_upFlow, sum_downFlow);

        // 3 写出
        context.write(key, sunFlow);
    }
}

6、编写Drvier类-FlowDriver

代码语言:javascript
复制
package com.lizhengi.flow;

/**
 * @author lizhengi
 * @create 2020-07-20
 */


import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class FlowDriver {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        // 1 获取job实例
        Job job = Job.getInstance(new Configuration());

        // 2.设置类路径
        job.setJarByClass(FlowDriver.class);

        // 3 指定本业务job要使用的mapper/Reducer业务类
        job.setMapperClass(FlowMapper.class);
        job.setReducerClass(FlowReducer.class);

        // 4 指定mapper输出数据的kv类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);

        // 5 指定最终输出的数据的kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        // 6 指定job的输入原始文件所在目录
        FileInputFormat.setInputPaths(job, "/Users/marron27/test/input");
        FileOutputFormat.setOutputPath(job, new Path("/Users/marron27/test/output"));
        //FileInputFormat.setInputPaths(job, new Path(args[0]));
        //FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

结果展示

代码语言:javascript
复制
Carlota:output marron27$ pwd
/Users/marron27/test/output
Carlota:output marron27$ cat part-r-00000 
13470253144	180	180	360
13509468723	7335	110349	117684
13560439638	918	4938	5856
13568436656	3597	25635	29232
13590439668	1116	954	2070
13630577991	6960	690	7650
13682846555	1938	2910	4848
13729199489	240	0	240
13736230513	2481	24681	27162
13768778790	120	120	240
13846544121	264	0	264
13956435636	132	1512	1644
13966251146	240	0	240
13975057813	11058	48243	59301
13992314666	3008	3720	6728
15043685818	3659	3538	7197
15910133277	3156	2936	6092
15959002129	1938	180	2118
18240717138	4116	1432	5548
18271575951	1527	2106	3633
18390173782	9531	2412	11943
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2020-07-21,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 简单介绍
  • 需求
  • 实现过程
  • 结果展示
相关产品与服务
对象存储
对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档