序列化概述
什么是序列化
序列化:就是把内存中的对象,转换成字节序列(或其他数据传输协议)以便于存储到磁盘(持久化)和网络传输。
反序列化:就是将收到字节序列(或其他数据传输协议)或者是磁盘的持久化数据,转换成内存中的对象。
为什么要序列化
一般来说,“活的”对象只生存在内存里,关机断电就没有了。而且“活的”对象只能由本地的进程使用,不能被发送到网络上的另外一台计算机。 然而序列化可以存储“活的”对象,可以将“活的”对象发送到远程计算机。
为什么不用Java的序列化
Java
的序列化是一个重量级序列化框架(Serializable
),一个对象被序列化后,会附带很多额外的信息(各种校验信息,Header
,继承体系等),不便于在网络中高效传输。所以,Hadoop
自己开发了一套序列化机制(Writable
)。
Hadoop序列化特点
- 紧凑 :高效使用存储空间。
- 快速:读写数据的额外开销小。
- 互操作:支持多语言的交互。
自定义Bean对线实现序列化(Writable)
在企业开发中往往常用的基本序列化类型不能满足所有需求,比如在Hadoop
框架内部传递一个bean
对象,那么该对象就需要实现序列化接口。
具体实现bean对象序列化步骤如下7步。
必须实现Writable接口。
1 2 3
| public class FlowBean implements Writable {
}
|
反序列化时,需要反射调用空参构造函数,所以必须有空参构造。
1 2 3
| public FlowBean() { super(); }
|
重写序列化方法
1 2 3 4 5 6
| @Override public void write(DataOutput out) throws IOException { out.writeLong(upFlow); out.writeLong(downFlow); out.writeLong(sumFlow); }
|
重写反序列化方法
1 2 3 4 5 6
| @Override public void readFields(DataInput in) throws IOException { upFlow = in.readLong(); downFlow = in.readLong(); sumFlow = in.readLong(); }
|
注意反序列化的顺序和序列化的顺序完全一致。
要想把结果显示在文件中,需要重写toString()
,可用”\t
“分开,方便后续用。
如果不重写toString方法那么在文件中写入的是对象在内存中存储的位置。
完整代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78
| package com.tipdm.mr.writable2;
import org.apache.hadoop.io.Writable;
import java.io.DataInput; import java.io.DataOutput; import java.io.IOException;
public class FlowBean implements Writable { private long upFlow; private long downFlow; private long sumFlow;
public FlowBean() { }
public FlowBean(long upFlow, long downFlow, long sumFlow) { this.upFlow = upFlow; this.downFlow = downFlow; this.sumFlow = sumFlow; }
public long getUpFlow() { return upFlow; }
public void setUpFlow(long upFlow) { this.upFlow = upFlow; }
public long getDownFlow() { return downFlow; }
public void setDownFlow(long downFlow) { this.downFlow = downFlow; }
public long getSumFlow() { return sumFlow; }
public void setSumFlow(long sumFlow) { this.sumFlow = sumFlow; }
@Override public void write(DataOutput out) throws IOException { out.writeLong(upFlow); out.writeLong(downFlow); out.writeLong(sumFlow); }
@Override public void readFields(DataInput in) throws IOException { upFlow = in.readLong(); downFlow = in.readLong(); sumFlow = in.readLong(); }
@Override public String toString() { return this.upFlow + " " + this.downFlow + " " + this.sumFlow; } }
|
序列化案例实操
例如对用户网络流量的使用进行统计,数据如下所示:
下载链接
任务:统计每个手机号【中间有重复的手机号,表示一个手机号访问了多个页面】,使用的上行流量、下行流量和总流量。
字段意义如下图所示:
Mapper代码编写
由于Mapper
的输出Value
需要为每个手机号访问页面使用的上行流量,下行流量和总流量。在这里我们一般是不会将三个字段直接返回,而是将所有字段放入一个对象中返回出来。要使用对象返回就需要编写一个序列化对象,这里采用的方法就是上方定义的FlowBean
对象。
总流量在原始的数据中没有现成的字段,需要自己计算,计算方法为:
Mapper
阶段代码如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| package com.tipdm.mr.writable2;
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class FlowMapper extends Mapper <LongWritable, Text, Text, FlowBean>{ private Text outKey = new Text(); private FlowBean outValue = new FlowBean(); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, FlowBean>.Context context) throws IOException, InterruptedException { String inKey = value.toString(); String[] phoneInfo = inKey.split("\t"); outKey.set(phoneInfo[1]); outValue.setUpFlow(Long.parseLong(phoneInfo[phoneInfo.length-3])); outValue.setDownFlow(Long.parseLong(phoneInfo[phoneInfo.length-2])); outValue.setSumFlow(outValue.getUpFlow()+outValue.getDownFlow()); context.write(outKey, outValue); } }
|
Reducer代码编写
Reducer
阶段的代码如下所示,其作用为将所有的相同手机号的流量求和:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| package com.tipdm.mr.writable2;
import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class FlowReducer extends Reducer<Text, FlowBean, Text, FlowBean>{ private FlowBean outValue = new FlowBean(); @Override protected void reduce(Text key, Iterable<FlowBean> values, Reducer<Text, FlowBean, Text, FlowBean>.Context context) throws IOException, InterruptedException { long sumUpFlow = 0; long sumDownFlow = 0; for (FlowBean value : values) { sumUpFlow += value.getUpFlow(); sumDownFlow += value.getDownFlow(); } outValue.setUpFlow(sumUpFlow); outValue.setDownFlow(sumDownFlow); outValue.setSumFlow(sumUpFlow+sumDownFlow); context.write(key, outValue); } }
|
Driver代码编写
这里Driver的编写和前面WCCount
案例类似:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| package com.tipdm.mr.writable2;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class FlowDriver { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf);
job.setJarByClass(FlowDriver.class); job.setMapperClass(FlowMapper.class); job.setReducerClass(FlowReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); FileInputFormat.setInputPaths(job, new Path("..\\ioText\\input")); FileOutputFormat.setOutputPath(job, new Path("..\\ioText\\output")); job.waitForCompletion(true); } }
|
运行FlowDriver
代码
结果如下所示:
注意:如果没有在FlowBean
中重写toString()
方法,那么得到的是以下文件内容