0%

Hadoop-8-Hadoop序列化

序列化概述

什么是序列化

序列化:就是把内存中的对象,转换成字节序列(或其他数据传输协议)以便于存储到磁盘(持久化)和网络传输。

反序列化:就是将收到字节序列(或其他数据传输协议)或者是磁盘的持久化数据,转换成内存中的对象。

为什么要序列化

一般来说,“活的”对象只生存在内存里,关机断电就没有了。而且“活的”对象只能由本地的进程使用,不能被发送到网络上的另外一台计算机。 然而序列化可以存储“活的”对象,可以将“活的”对象发送到远程计算机。

为什么不用Java的序列化

Java的序列化是一个重量级序列化框架(Serializable),一个对象被序列化后,会附带很多额外的信息(各种校验信息,Header,继承体系等),不便于在网络中高效传输。所以,Hadoop自己开发了一套序列化机制(Writable)。

Hadoop序列化特点

  1. 紧凑 高效使用存储空间。
  2. 快速:读写数据的额外开销小。
  3. 互操作:支持多语言的交互。

自定义Bean对线实现序列化(Writable)

在企业开发中往往常用的基本序列化类型不能满足所有需求,比如在Hadoop框架内部传递一个bean对象,那么该对象就需要实现序列化接口。

具体实现bean对象序列化步骤如下7步。

  1. 必须实现Writable接口。

    1
    2
    3
    public class FlowBean implements Writable {

    }
  2. 反序列化时,需要反射调用空参构造函数,所以必须有空参构造。

    1
    2
    3
    public FlowBean() {
    super();
    }
  3. 重写序列化方法

    1
    2
    3
    4
    5
    6
    @Override
    public void write(DataOutput out) throws IOException {
    out.writeLong(upFlow);
    out.writeLong(downFlow);
    out.writeLong(sumFlow);
    }
  4. 重写反序列化方法

    1
    2
    3
    4
    5
    6
    @Override
    public void readFields(DataInput in) throws IOException {
    upFlow = in.readLong();
    downFlow = in.readLong();
    sumFlow = in.readLong();
    }

    注意反序列化的顺序和序列化的顺序完全一致。

  5. 要想把结果显示在文件中,需要重写toString(),可用”\t“分开,方便后续用。

    如果不重写toString方法那么在文件中写入的是对象在内存中存储的位置。

完整代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
package com.tipdm.mr.writable2;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/*
JavaBean:
1. 自定义类并实现Writable接口
2. 重写write和readFields方法
3. 重写toString方法
*/
public class FlowBean implements Writable {
private long upFlow;
private long downFlow;
private long sumFlow;

public FlowBean() {
}

public FlowBean(long upFlow, long downFlow, long sumFlow) {
this.upFlow = upFlow;
this.downFlow = downFlow;
this.sumFlow = sumFlow;
}

public long getUpFlow() {
return upFlow;
}

public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}

public long getDownFlow() {
return downFlow;
}

public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}

public long getSumFlow() {
return sumFlow;
}

public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
}


/*
序列化时调用该方法
*/
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}
/*
反序列化调用该方法
注意:反序列化时的顺序要和序列化时的顺序保持一致
*/
@Override
public void readFields(DataInput in) throws IOException {
upFlow = in.readLong();
downFlow = in.readLong();
sumFlow = in.readLong();
}

@Override
public String toString() {
return this.upFlow + " " + this.downFlow + " " + this.sumFlow;
}
}

序列化案例实操

例如对用户网络流量的使用进行统计,数据如下所示:

image-20230717124851629

下载链接

任务:统计每个手机号【中间有重复的手机号,表示一个手机号访问了多个页面】,使用的上行流量、下行流量和总流量。

字段意义如下图所示:

image-20230717125119629

Mapper代码编写

由于Mapper的输出Value需要为每个手机号访问页面使用的上行流量,下行流量和总流量。在这里我们一般是不会将三个字段直接返回,而是将所有字段放入一个对象中返回出来。要使用对象返回就需要编写一个序列化对象,这里采用的方法就是上方定义的FlowBean对象。

总流量在原始的数据中没有现成的字段,需要自己计算,计算方法为:

Mapper阶段代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package com.tipdm.mr.writable2;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class FlowMapper extends Mapper <LongWritable, Text, Text, FlowBean>{
private Text outKey = new Text();
private FlowBean outValue = new FlowBean();
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, FlowBean>.Context context) throws IOException, InterruptedException {
// 获取所有的输入数据
String inKey = value.toString();
// 切片
String[] phoneInfo = inKey.split("\t");
// 封装Key
outKey.set(phoneInfo[1]);
// 封装Value
outValue.setUpFlow(Long.parseLong(phoneInfo[phoneInfo.length-3]));
outValue.setDownFlow(Long.parseLong(phoneInfo[phoneInfo.length-2]));
outValue.setSumFlow(outValue.getUpFlow()+outValue.getDownFlow());
// 写出数据
context.write(outKey, outValue);
}
}

Reducer代码编写

Reducer阶段的代码如下所示,其作用为将所有的相同手机号的流量求和:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package com.tipdm.mr.writable2;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class FlowReducer extends Reducer<Text, FlowBean, Text, FlowBean>{
private FlowBean outValue = new FlowBean();
@Override
protected void reduce(Text key, Iterable<FlowBean> values, Reducer<Text, FlowBean, Text, FlowBean>.Context context) throws IOException, InterruptedException {
long sumUpFlow = 0;
long sumDownFlow = 0;
for (FlowBean value : values) {
sumUpFlow += value.getUpFlow();
sumDownFlow += value.getDownFlow();
}
outValue.setUpFlow(sumUpFlow);
outValue.setDownFlow(sumDownFlow);
outValue.setSumFlow(sumUpFlow+sumDownFlow);
// 将key,value写出
context.write(key, outValue);
}
}

Driver代码编写

这里Driver的编写和前面WCCount案例类似:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package com.tipdm.mr.writable2;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class FlowDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
// 1. 创建Job示例
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);

// 2. 给Job赋值
// 2.1 关联本程序的Jar -- 如果是本地可以不写,在集群上运行必须写
job.setJarByClass(FlowDriver.class);
// 2.2 设置Mapper类和Reduce类
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReducer.class);
// 2.3 设置Mapper输出的key, value的类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
// 2.4 设置最终输出的key,value的类型(在这是reducer输出的key,value的类型)
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
// 2.5 设置输入和输出路径
FileInputFormat.setInputPaths(job, new Path("..\\ioText\\input"));
// 输出路径一定不能存在否则报错
FileOutputFormat.setOutputPath(job, new Path("..\\ioText\\output"));
// 3. 运行Job
// waitForCompletion(boolean verbose)
// verbose:是否打印执行过程信息
// 返回值:如果job执行成功返回true
job.waitForCompletion(true);
}
}

运行FlowDriver代码

image-20230717131101631

结果如下所示:

image-20230717131130523

注意:如果没有在FlowBean中重写toString()方法,那么得到的是以下文件内容

image-20230717131328212

-------------本文结束感谢您的阅读-------------