0%

Hadoop(3)MapReduce

4. MapReduce

计算扑克牌中的黑桃个数

就是我们平时打牌时用的扑克牌,现在呢,有一摞牌,我想知道这摞牌中有多少张黑桃

最直接的方式是一张一张检查并且统计出有多少张是黑桃,但是这种方式的效率比较低,如果说这一摞牌只有几十张也就无所谓了,如果这一摞拍有上千张呢?你一张一张去检查还不疯了?

这个时候我们可以使用MapReduce的计算方法

第一步:把这摞牌分配给在座的所有玩家
第二步:让每个玩家查一下自己手中的牌有多少张是黑桃,然后把这个数目汇报给你
第三步:你把所有玩家告诉你的数字加起来,得到最终的结果

之前是一张一张的串行计算,现在使用mapreduce是把数据分配给多个人,并行计算,每一个人获得一
个局部聚合的临时结果,最终再统一汇总一下。

这样就可以快速得到答案了,这其实就是MapReduce的计算思想。

下面我们再通过具体的案例分析MapReduce的计算思想

4.1 分布式计算介绍

传统的计算方式为将数据拉取到本地,在本地执行计算程序。

方法创新,从移动数据修改至移动计算程序到数据存储机器

image-20211202142726873image-20211202142823994

目的:节省网络IO

image-20211202142739771

  1. 第一步:对每个节点上面的数据进行局部计算
  2. 第二步:对每个节点上面计算的局部结果进行最终全局汇总

4.2 MapReduce原理剖析

MapReduce是一种分布式计算模型,由Google提出,主要用于搜索领域,解决海量数据的计算问题。

MapReduce由两个阶段组成:MapReduce

Map—>第一阶段 Reduce—>第二阶段

image-20211202143228034

这是一个Hadoop集群,一共5个节点

一个主节点,四个从节点

这里面我们只列出来了HDFS相关的进程信息

假设我们有一个512M的文件,这个文件会产生4个block块,假设这4个block块正好分别存储到了集群的4个节点上,我们的计算程序会被分发到每一个数据所在的节点,然后开始执行计算,在map阶段,针对每一个block块对应的数据都会产生一个map任务(这个map任务其实就是执行这个计算程序的),在这里也就意味着会产生4个map任务并行执行,4个map阶段都执行完毕以后,会执行reduce阶段,在reduce阶段中会对这4个map任务的输出数据进行汇总统计,得到最终的结果。

下面看一个官方的mapreduce原理图

image-20211202143542134

左下角是一个文件,文件最下面是几个block块,说明这个文件被切分成了这几个block块,文件上面是一些split,注意,咱们前面说的每个block产生一个map任务,其实这是不严谨的,其实严谨一点来说的话应该是一个split产生一个map任务。

那这里的blocksplit之间有什么关系吗? 我们来分析一下block块是文件的物理切分,在磁盘上是真实存在的。是对文件的真正切分而split是逻辑划分,不是对文件真正的切分,默认情况下我们可以认为一个split的大小和一个block的大小是一样的,所以实际上是一个split会产生一个map task

这里面的map Task就是咱们前面说的map任务,看后面有一个reduce Taskreduce会把结果数据输出到hdfs上,有几个reduce任务就会产生几个文件,这里有三个reduce任务,就产生了3个文件,咱们前面分析的案例中只有一个reduce任务做全局汇总

注意看map的输入 输出reduce的输入 输出

map的输入是k1,v1输出是k2,v2
reduce的输入是k2,v2输出是k3,v3都是键值对的形式。
在这注意一下,为什么在这是1,2,3呢? 这个主要是为了区分数据,方便理解,没有其它含义,这是我们人为定义的。

4.3 案例分析

mapreduce主要分为两大步骤mapreducemapreduce在代码层面对应的就是两个类,map对应的是mapper类,reduce对应的是reducer类,下面我们就来根据一个案例具体分析一下这两个步骤

假设我们有一个文件,文件里面有两行内容
第一行是hello you
第二行是hello me

我们想统计文件中每个单词出现的总次数

4.3.1 map阶段

第一步:框架会把输入文件(夹)划分为很多InputSplit,这里的inputsplit就是前面我们所说的split【对文件进行逻辑划分产生的】,默认情况下,每个HDFSBlock对应一个InputSplit。再通过RecordReader类,把每个InputSplit解析成一个一个的<k1,v1>。默认情况下,每一行数据,都会被解析成一个<k1,v1>这里的k1是指每一行的起始偏移量,v1代表的是那一行内容

1
2
<0, hello you>
<10, hello me>

注意:map第一次执行会产生<0,hello you>,第二次执行会产生<10,hello me>,并不是执行一次就
获取到这两行结果了,因为每次只会读取一行数据,我在这里只是把这两行执行的最终结果都列出来了

第二步:框架调用Mapper类中的map(…)函数,map函数的输入是<k1,v1>,输出是<k2,v2>。一个InputSplit对应一个map task。程序员需要自己覆盖Mapper类中的map函数,实现具体的业务逻辑。

因为我们需要统计文件中每个单词出现的总次数,所以需要先把每一行内容中的单词切开,然后记录出现
次数为1,这个逻辑就需要我们在map函数中实现了

1
2
3
4
5
6
# 针对<0, hello you>
<hello, 1>
<you, 1>
# 针对<10, hello me>
<hello, 1>
<me, 1>

第三步:框架对map函数输出的`<k2,v2>进行分区。不同分区中的<k2,v2>由不同的reduce task处理,默认只有1个分区,所以所有的数据都在一个分区,最后只会产生一个reduce task

经过这个步骤之后,数据没什么变化,如果有多个分区的话,需要把这些数据根据分区规则分开,在这里
默认只有1个分区。

1
2
3
4
<hello,1>
<you,1>
<hello,1>
<me,1>

咱们在这所说的单词计数,其实就是把每个单词出现的次数进行汇总即可,需要进行全局的汇总,不需要进行分区,所以一个redeuce任务就可以搞定,如果你的业务逻辑比较复杂,需要进行分区,那么就会产生多个reduce任务了,那么这个时候,map任务输出的数据到底给哪个reduce使用?这个就需要划分一下,要不然就乱套了。

假设有两个reducemap的输出到底给哪个reduce,如何分配,这是一个问题。

这个问题,由分区来完成。

map输出的那些数据到底给哪个reduce使用,这个就是分区干的事了。

第四步:框架对每个分区中的数据,都会按照k2进行排序、分组。分组指的是相同k2v2分成一个组。先按照k2排序

1
2
3
4
<hello,1>
<hello,1>
<me,1>
<you,1>

然后按照k2进行分组,把相同k2v2分成一个组

1
2
3
<hello,{1,1}>
<me,{1}>
<you,{1}>

第五步:在map阶段,框架可以选择执行Combiner过程

Combiner可以翻译为规约,规约是什么意思呢? 在刚才的例子中,咱们最终是要在reduce端计算单词出现的总次数的,所以其实是可以在map端提前执行reduce的计算逻辑,先对在map端对单词出现的次数进行局部求和操作,这样就可以减少map端到reduce端数据传输的大小,这就是规约的好处,当然了,并不是所有场景都可以使用规约,针对求平均值之类的操作就不能使用规约了,否则最终计算的结果就不准确了。

Combiner一个可选步骤,默认这个步骤是不执行的。

第六步:框架会把map task输出的<k2,v2>写入到linux 的磁盘文件中

1
2
3
<hello,{1,1}>
<me,{1}>
<you,{1}>

至此,整个map阶段执行结束

最后注意一点:

MapReduce程序是由mapreduce这两个阶段组成的,但是reduce阶段不是必须的,也就是说有的mapreduce任务只有map阶段,为什么会有这种任务呢?

是这样的,咱们前面说过,其实reduce主要是做最终聚合的,如果我们这个需求是不需要聚合操作,直接对数据做过滤处理就行了,那也就意味着数据经过map阶段处理完就结束了,所以如果reduce阶段不存在的话,map的结果是可以直接保存到HDFS中的。

注意,如果没有reduce阶段,其实map阶段只需要执行到第二步就可以,第二步执行完成以后,结果就可以直接输出到HDFS了。

针对我们这个单词计数的需求是存在reduce阶段的,所以我们继续往下面分析。

4.3.2 reduce阶段

第一步:框架对多个map任务的输出,按照不同的分区,通过网络copy到不同的reduce节点。这个过程称作shuffle

针对我们这个需求,只有一个分区,所以把数据拷贝到reduce端之后还是老样子

1
2
3
<hello,{1,1}>
<me,{1}>
<you,{1}>

第二步:框架对reduce端接收的相同分区的<k2,v2>数据进行合并、排序、分组。

reduce端接收到的是多个map的输出,对多个map任务中相同分区的数据进行合并 排序 分组注意,之前在map中已经做了排序 分组,这边也做这些操作 重复吗?

不重复,因为map端是局部的操作 reduce端是全局的操作

之前是每个map任务内进行排序,是有序的,但是多个map任务之间就是无序的了。

不过针对我们这个需求只有一个map任务一个分区,所以最终的结果还是老样子

1
2
3
<hello,{1,1}>
<me,{1}>
<you,{1}>

第三步:框架调用Reducer类中的reduce方法,reduce方法的输入是,输出是。一个调用一次reduce函数。程序员需要覆盖reduce函数,实现具体的业务逻辑。

那我们在这里就需要在reduce函数中实现最终的聚合计算操作了,将相同k2的{v2}累加求和,然后再转
化为k3,v3写出去,在这里最终会调用三次reduce函数

1
2
3
<hello,2>
<me,1>
<you,1>

第四步:框架把reduce的输出结果保存到HDFS中。

1
2
3
hello	2
me 1
you 1

至此整个reduce阶段结束

4.4 案例开发

接下来看这个图再重新梳理一下单词计数的执行流程

单文件WordCount分析

image-20211202151847412

多文件WordCount分析

image-20211202152149646

image-20211202152258220

4.4.1 脚本开发

代码开发的整个过程分为3个阶段:

  1. Map阶段代码开发
  2. Reduce阶段代码开发
  3. 组装Job
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package tipdm.mr;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
* 需求:读取hdfs上的hello.txt文件,计算文件中每个单词出现的总次数
*
* 原始文件hello.txt内容如下:
* hello you
* hello me
*
* 最终需要的结果形式如下:
* hello 2
* me 1
* you 1
*/


public class WordCountJob {
/**
* Map阶段
*/
public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
/**
* 需要实现map函数
* 这个map函数就是可以接收<k1, v1>, 产生<k2, v2>
* @param k1
* @param v1
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void map(LongWritable k1, Text v1, Context context)
throws IOException, InterruptedException {
//k1 代表的是每一行数据的行首偏移量,v1代表的是每一行内容
//对获取到的每一行数据进行切割,把单词切割出来
String[] words = v1.toString().split(" ");
//迭代切割出来的单词数据
for (String word : words){
//把迭代出来的单词封装成<k2,v2>的形式
Text k2 = new Text(word);
LongWritable v2 = new LongWritable(1L);
//把<k2,v2>写出去
context.write(k2, v2);
}
}
}

/**
* Reduce阶段
*/
public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
/**
* 针对<k2,{v2...}>的数据进行累加求和
* 并且最终把数据转化为k3,v3写出去
* @param k2
* @param v2s
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void reduce(Text k2, Iterable<LongWritable> v2s, Context context)
throws IOException, InterruptedException {
//创建一个sum变量,保存v2s的和
long sum = 0L;
//对v2s中的数据进行累加求和
for (LongWritable v2: v2s){
sum += v2.get();
}
// 组装k3,v3
Text k3 = k2;
LongWritable v3 = new LongWritable(sum);
// 把结果写出去
context.write(k3, v3);
}
}

/**
* 组装Job=Map+Reduce
*/
public static void main(String[] args) {
try{
if(args.length!=2){
//如果传递的参数不够,程序直接退出
System.exit(100);
}

//指定Job需要的配置参数
Configuration conf = new Configuration();
//创建一个Job
Job job = Job.getInstance(conf);

//注意了:这一行必须设置,否则在集群中执行的时候是找不到WordCountJob类的

//指定输入路径(可以是文件,也可以是目录)
FileInputFormat.setInputPaths(job, new Path(args[0]));
//指定输出路径(只能指定一个不存在的目录)
FileOutputFormat.setOutputPath(job, new Path(args[1]));

//指定map相关代码
job.setMapperClass(MyMapper.class);
//指定k2的类型
job.setMapOutputKeyClass(Text.class);
//指定v2的类型
job.setMapOutputValueClass(LongWritable.class);

//指定reduce相关代码
job.setReducerClass(MyReducer.class);
//指定k3类型
job.setOutputKeyClass(Text.class);
//指定v3类型
job.setOutputValueClass(LongWritable.class);

//提交job
job.waitForCompletion(true);

} catch (Exception e){
e.printStackTrace();
}

}

}

4.4.2 jar打包

pom文件中添加maven的编译打包插件。添加到</prject>下面

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
<build>
<plugins>
<!-- compiler插件, 设定JDK版本 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<encoding>UTF-8</encoding>
<source>1.8</source>
<target>1.8</target>
<showWarnings>true</showWarnings>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass></mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

注意了,这些添加完以后还有一个地方需要修改,需要在pom中的hadoop-clientlog4j依赖中增加scope属性,值为provided,表示只在编译的时候使用这个依赖,在执行以及打包的时候都不使用,因为hadoop-clientlog4j依赖在集群中都是有的,所以在打jar包的时候就不需要打进去了,如果我们使用到了集群中没有的第三方依赖包就不需要增加这个provided属性了,不增加provided就可以把对应的第三方依赖打进jar包里面了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
<!-- hadoop依赖 -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.2.0</version>
<!-- provided表示这个依赖只在编译的时候,执行或或者打包jar包时不使用 -->
<scope>provided</scope>
</dependency>

<!-- log4j的依赖 -->
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.10</version>
<scope>provided</scope>
</dependency>

<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.10</version>
<scope>provided</scope>
</dependency>

打包:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
D:\IdeaProjects\db_hadoop>mvn clean package -DskipTests
[INFO] Scanning for projects...
[INFO]
[INFO] ----------------------------------------------------------------------
[INFO] Building db_hadoop 1.0-SNAPSHOT
[INFO] ----------------------------------------------------------------------
..............................
[INFO] --- maven-jar-plugin:2.3.2:jar (default-jar) @ db_hadoop ---
[INFO] Building jar: D:\IdeaProjects\db_hadoop\target\db_hadoop-1.0-SNAPSHOT.
[INFO]
[INFO] --- maven-assembly-plugin:2.2-beta-5:single (make-assembly) @ db_hadoo
[INFO] Building jar: D:\IdeaProjects\db_hadoop\target\db_hadoop-1.0-SNAPSHOT-
[INFO] ----------------------------------------------------------------------
[INFO] ----------------------------------------------------------------------
[INFO] Total time: 5.410s
[INFO] Finished at: Wed Apr 22 11:00:45 CST 2020
[INFO] Final Memory: 24M/375M
[INFO] ----------------------------------------------------------------------

命令执行成功之后,就可以到target目录下获取对应的jar包了,需要使用jar-with-dependencies结尾的
那个jar包。

1
D:\IdeaProjects\db_hadoop\target\db_hadoop-1.0-SNAPSHOT-jar-with-dependencies.jar

把这个jar包上传到集群的任意一台机器上面或者是hadoop客户端机器上都可以,只要这台机器可以和集群进行交互即可。

注意,这个jar包不能使用java -jar的方式执行,需要使用集群特有的执行方式。

4.4.3 在集群中执行jar包

首先,把jar包上传到集群

确认是否存在

1
2
3
4
5
6
7
8
9
10
11
12
13
14
[root@master hadoop-3.2.0]# ll
total 196
drwxr-xr-x. 2 1001 1002 203 Jan 8 2019 bin
-rw-r--r--. 1 root root 5716 Apr 22 11:00 db_hadoop-1.0-SNAPSHOT-jar-with-dependencies.jar
drwxr-xr-x. 3 1001 1002 20 Jan 8 2019 etc
drwxr-xr-x. 2 1001 1002 106 Jan 8 2019 include
drwxr-xr-x. 3 1001 1002 20 Jan 8 2019 lib
drwxr-xr-x. 4 1001 1002 4096 Jan 8 2019 libexec
-rw-rw-r--. 1 1001 1002 150569 Oct 19 2018 LICENSE.txt
-rw-rw-r--. 1 1001 1002 22125 Oct 19 2018 NOTICE.txt
-rw-rw-r--. 1 1001 1002 1361 Oct 19 2018 README.txt
-rw-r--r--. 1 root root 1361 Apr 19 12:29 README.txt.bak
drwxr-xr-x. 3 1001 1002 4096 Apr 18 23:13 sbin
drwxr-xr-x. 4 1001 1002 31 Jan 8 2019 share

在向集群中正式提交任务jar包之前需要先把测试数据准备好

在本地创建一个hello.txt文件,内容是

1
2
3
[root@master hadoop-3.2.0]# vi hello.txt
hello you
hello me

单词中间用空格隔开,因为我们在MapReduce代码中是使用空格进行切割单词的。

然后把hello.txt上传到hdfstest目录下

1
2
3
4
5
[root@master hadoop-3.2.0]# hdfs dfs -mkdir /test
[root@master hadoop-3.2.0]# hdfs dfs -put hello.txt /test
[root@master hadoop-3.2.0]# hdfs dfs -ls /test
Found 1 items
-rw-r--r-- 2 root supergroup 19 2020-04-22 11:16 /test/hello.txt

接下来就可以向集群提交MapReduce任务了

具体的命令是这样的

1
hadoop jar db_hadoop-1.0-SNAPSHOT-jar-with-dependencies.jar tipdm.mr.WordCountJob /test/hello.txt /out
  • hadoop:表示使用hadoop脚本提交任务,其实在这里使用yarn脚本也是可以的,从hadoop2开始支持使用yarn,不过也兼容hadoop1,也继续支持使用hadoop脚本,所以在这里使用哪个都可以,具体就看你个人的喜好了,我是习惯于使用hadoop脚本

  • jar:表示执行jar

  • db_hadoop-1.0-SNAPSHOT-jar-with-dependencies.jar:指定具体的jar包路径信息

  • tipdm.mr.WordCountJob:指定要执行的mapreduce代码的全路径

  • /test/hello.txt:指定mapreduce接收到的第一个参数,代表的是输入路径,这里的输入路径可以直接指定hello.txt的路径,也可以直接指定它的父目录,因为它的父目录里面也没有其它无关的文件,如果指定目录的话就意味着hdfs会读取这个目录下所有的文件,所以后期如果我们需要处理一批文件,那就可以把他们放到同一个目录里面,直接指定目录即可。

  • /out:指定mapreduce接收到的第二个参数,代表的是输出目录,这里的输出目录必须是不存在的,MapReduce程序在执行之前会检测这个输出目录,如果存在会报错,因为它每次执行任务都需要一个新的输出目录来存储结果数据

任务提交到集群上面之后,可以在shell窗口中看到如下日志信息,最终map执行到100%,reduce执行到100%,说明任务执行成功了。

1
2
3
2020-04-22 15:12:59,887 INFO mapreduce.Job: map 0% reduce 0%
2020-04-22 15:13:08,050 INFO mapreduce.Job: map 100% reduce 0%
2020-04-22 15:13:16,261 INFO mapreduce.Job: map 100% reduce 100%

当然了,也可以到web界面中查看任务执行情况。

访问 http://master:8088

image-20211203103945252

那我们来查看一下任务输出的结果,

1
2
3
4
[root@master hadoop-3.2.0]# hdfs dfs -ls /out
Found 2 items
-rw-r--r-- 2 root supergroup 0 2020-04-22 15:13 /out/_SUCCESS
-rw-r--r-- 2 root supergroup 19 2020-04-22 15:13 /out/part-r-00000

out输出目录中,_SUCCESS是一个标记文件,有这个文件表示这个任务执行成功了。

part-r-00000是具体的数据文件,如果有多个reduce任务会产生多个这种文件,多个文件的话会按照从0开始编号,00001,00002等等…

还有一点需要注意的,part后面的r表示这个结果文件是reduce步骤产生的,如果一个mapreduce只有
map阶段没有reduce阶段,那么产生的结果文件是part-m-00000这样的。

查看数据文件part-r-00000

1
2
3
4
[root@master hadoop-3.2.0]# hdfs dfs -cat /out/part-r-00000
hello 2
me 1
you 1

4.4.4 增加日志输出

4.4.4.1 使用System.out.println()进行日志输出

首先,在mapreduce类中增加输出语句

map类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
protected void map(LongWritable k1, Text v1, Context context)
throws IOException, InterruptedException {

// 输出k1, v1的值
System.out.println("<k1, v1> = <" + k1.get() + "," + v1.toString() + ">");



//k1 代表的是每一行数据的行首偏移量,v1代表的是每一行内容
//对获取到的每一行数据进行切割,把单词切割出来
String[] words = v1.toString().split(" ");
//迭代切割出来的单词数据
for (String word : words){
//把迭代出来的单词封装成<k2,v2>的形式
Text k2 = new Text(word);
LongWritable v2 = new LongWritable(1L);
//把<k2,v2>写出去
context.write(k2, v2);
}
}

reduce

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
protected void reduce(Text k2, Iterable<LongWritable> v2s, Context context)
throws IOException, InterruptedException {
//创建一个sum变量,保存v2s的和
long sum = 0L;
//对v2s中的数据进行累加求和
for (LongWritable v2: v2s){


// 输出k2, v2的值
System.out.println("<k2, v2> = <" + k2.toString() + "," + v2.get() + ">");



sum += v2.get();
}
// 组装k3,v3
Text k3 = k2;
LongWritable v3 = new LongWritable(sum);


// 输出k3, v3的值
System.out.println("<k3, v3> = <" + k3.toString() + "," + v3.get() + ">");


// 把结果写出去
context.write(k3, v3);
}

然后,进入yarn资源管理页面master:8088

点击History

image-20211216161443057

发现无法访问

image-20211216161458993

需要开启YARN的日志聚合功能,把散落在`NodeManager节点上的日志统一收集管理,方便查看日志。

修改yarn-site.xml文件配置,增加yarn.log-aggregation-enableyarn.log.server.url这两个参数

1
2
3
4
5
6
7
8
<property>
<name>yarn.log-aggregation-enable</name>
<value>true</value>
</property>
<property>
<name>yarn.log.server.url</name>
<value>http://master:19888/jobhistory/logs/</value>
</property>

修改后重启集群生效。

1
2
[root@master hadoop-3.2.0]# sbin/stop-all.sh 
[root@master hadoop-3.2.0]# vim etc/hadoop/yarn-site.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
  You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
<configuration>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<property>
<name>yarn.nodemanager.env-whitelist</name>
<value>JAVA_HOME,HADOOP_COMMON_HOME,HADOOP_HDFS_HOME,HADOOP_CONF_DIR,CLASSPATH_PREPEND_DISTCACHE,HADOOP_YARN_HOME,HADOOP_MAPRED_HOME</value>
</property>
<property>
<name>yarn.resourcemanager.hostname</name>
<value>master</value>
</property>
<property>
<name>yarn.log-aggregation-enable</name>
<value>true</value>
</property>
<property>
<name>yarn.log.server.url</name>
<value>http://master:19888/jobhistory/logs/</value>
</property>
</configuration>

同步到其他两个节点:

1
2
[root@master hadoop-3.2.0]# scp -rq etc/hadoop/yarn-site.xml slave1:/data/soft/hadoop-3.2.0/etc/hadoop/
[root@master hadoop-3.2.0]# scp -rq etc/hadoop/yarn-site.xml slave2:/data/soft/hadoop-3.2.0/etc/hadoop/

重新启动hadoop集群

1
[root@master hadoop-3.2.0]# sbin/start-all.sh 

启动historyserver进程,需要在集群的所有节点上都启动这个进程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
[root@master hadoop-3.2.0]# bin/mapred --daemon start historyserver
[root@master hadoop-3.2.0]# jps
13430 NameNode
14006 ResourceManager
14650 JobHistoryServer
13723 SecondaryNameNode
14717 Jps
[root@slave1 hadoop-3.2.0]# bin/mapred --daemon start historyserver
[root@slave1 hadoop-3.2.0]# jps
4722 JobHistoryServer
4789 Jps
4168 NodeManager
4045 DataNode
[root@slave2 hadoop-3.2.0]# bin/mapred --daemon start historyserver
[root@slave2 hadoop-3.2.0]# jps
5217 JobHistoryServer
4803 NodeManager
5284 Jps
4681 DataNode

重新提交mapreduce任务

1
[root@master hadoop-3.2.0]# hadoop jar dbhadoop-1.0-SNAPSHOT-jar-with-dependencies.jar tipdm.mr.WordCountJob /test/hello.txt /out2

再次进去yarn的8088界面,即可进入history

image-20211216163833769

点击map中的Successful

image-20211216164119311

再点击logs

image-20211216164211659

该处即可看到我们控制台输入的内容。

4.4.4.2 使用Logger进行输出

修改map代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
Logger logger = LoggerFactory.getLogger(MyMapper.class);
/**
* 需要实现map函数
* 这个map函数就是可以接收<k1, v1>, 产生<k2, v2>
* @param k1
* @param v1
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void map(LongWritable k1, Text v1, Context context)
throws IOException, InterruptedException {
// 输出k1, v1的值
// System.out.println("<k1, v1> = <" + k1.get() + "," + v1.toString() + ">");
logger.info("<k1, v1> = <" + k1.get() + "," + v1.toString() + ">");
//k1 代表的是每一行数据的行首偏移量,v1代表的是每一行内容
//对获取到的每一行数据进行切割,把单词切割出来
String[] words = v1.toString().split(" ");
//迭代切割出来的单词数据
for (String word : words){
//把迭代出来的单词封装成<k2,v2>的形式
Text k2 = new Text(word);
LongWritable v2 = new LongWritable(1L);
//把<k2,v2>写出去
context.write(k2, v2);
}
}
}

修改reduce代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
Logger logger = LoggerFactory.getLogger(MyMapper.class);
/**
* 针对<k2,{v2...}>的数据进行累加求和
* 并且最终把数据转化为k3,v3写出去
* @param k2
* @param v2s
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void reduce(Text k2, Iterable<LongWritable> v2s, Context context)
throws IOException, InterruptedException {
//创建一个sum变量,保存v2s的和
long sum = 0L;
//对v2s中的数据进行累加求和
for (LongWritable v2: v2s){
// 输出k2, v2的值
// System.out.println("<k2, v2> = <" + k2.toString() + "," + v2.get() + ">");
logger.info("<k2, v2> = <" + k2.toString() + "," + v2.get() + ">");
sum += v2.get();
}
// 组装k3,v3
Text k3 = k2;
LongWritable v3 = new LongWritable(sum);
// 输出k3, v3的值
// System.out.println("<k3, v3> = <" + k3.toString() + "," + v3.get() + ">");
logger.info("<k3, v3> = <" + k3.toString() + "," + v3.get() + ">");
// 把结果写出去
context.write(k3, v3);
}
}

重新打包:

1
mvn clean package -DskipTests

上传到hadoop集群重新编译

1
[root@master hadoop-3.2.0]# hadoop jar dbhadoop-1.0-SNAPSHOT-jar-with-dependencies.jar tipdm.mr.WordCountJob /test/hello.txt /out1

再次进入logs

image-20211216165710158

可以看到日志信息再下方输出

4.4.4.3 使用命令查看日志

1
[root@master hadoop-3.2.0]# yarn logs -applicationId application_1639643066888_0001

可重定向到文件中,在本地查看日志信息。

4.5 停止集群中的任务

1
2
3
hadoop job -kill job_1639643066888_0002
# 或者
yarn application -kill application_1639643066888_0002

4.6 MapReduce程序扩展

咱们前面说过MapReduce任务是由map阶段和reduce阶段组成的

但是我们也说过,reduce阶段不是必须的,那也就意味着MapReduce程序可以只包含map阶段。

什么场景下会只需要map阶段呢?

  • 当数据只需要进行普通的过滤、解析等操作,不需要进行聚合,这个时候就不需要使用reduce阶段了。

在代码层面该如何设置呢?

  • 很简单,在组装Job的时候设置reduce的task数目为0就可以了。并且Reduce代码也不需要写了。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package tipdm.mr;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

/**
* 只有Map阶段,没有Reduce阶段
*/


public class WordCountJobnoReduce {
/**
* Map阶段
*/
public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
Logger logger = LoggerFactory.getLogger(MyMapper.class);
/**
* 需要实现map函数
* 这个map函数就是可以接收<k1, v1>, 产生<k2, v2>
* @param k1
* @param v1
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void map(LongWritable k1, Text v1, Context context)
throws IOException, InterruptedException {
// 输出k1, v1的值
// System.out.println("<k1, v1> = <" + k1.get() + "," + v1.toString() + ">");
logger.info("<k1, v1> = <" + k1.get() + "," + v1.toString() + ">");
//k1 代表的是每一行数据的行首偏移量,v1代表的是每一行内容
//对获取到的每一行数据进行切割,把单词切割出来
String[] words = v1.toString().split(" ");
//迭代切割出来的单词数据
for (String word : words){
//把迭代出来的单词封装成<k2,v2>的形式
Text k2 = new Text(word);
LongWritable v2 = new LongWritable(1L);
//把<k2,v2>写出去
context.write(k2, v2);
}
}
}

/**
* 组装Job=Map+Reduce
*/
public static void main(String[] args) {
try{
if(args.length!=2){
//如果传递的参数不够,程序直接退出
System.exit(100);
}

//指定Job需要的配置参数
Configuration conf = new Configuration();
//创建一个Job
Job job = Job.getInstance(conf);

//注意了:这一行必须设置,否则在集群中执行的时候是找不到WordCountJob类的
job.setJarByClass(WordCountJobnoReduce.class);

//指定输入路径(可以是文件,也可以是目录)
FileInputFormat.setInputPaths(job, new Path(args[0]));
//指定输出路径(只能指定一个不存在的目录)
FileOutputFormat.setOutputPath(job, new Path(args[1]));

//指定map相关代码
job.setMapperClass(MyMapper.class);
//指定k2的类型
job.setMapOutputKeyClass(Text.class);
//指定v2的类型
job.setMapOutputValueClass(LongWritable.class);

// //指定reduce相关代码
// job.setReducerClass(MyReducer.class);
// //指定k3类型
// job.setOutputKeyClass(Text.class);
// //指定v3类型
// job.setOutputValueClass(LongWritable.class);

// 禁用Reduce
job.setNumReduceTasks(0);

//提交job
job.waitForCompletion(true);

} catch (Exception e){
e.printStackTrace();
}
}

}

重新打包后上传到集群运行:

1
[root@master hadoop-3.2.0]# hadoop jar dbhadoop-1.0-SNAPSHOT-jar-with-dependencies.jar tipdm.mr.WordCountJobnoReduce /test/hello.txt /out3

此时可见reduce阶段0%就结束了

1
2
3
4
5
[root@master hadoop-3.2.0]# hdfs dfs -cat /out3/part-m-00000
hello 1
you 1
hello 1
me 1

可见结果只有reduce部分的结果。

4.7 Shuffer执行过程级源码分析输入输出

4.7.1 Shuffer过程详解

咱们前面简单说过,shuffer是一个网络拷贝的过程,是指通过网络把数据从map端拷贝到reduce端的过
程,下面我们来详细分析一下这个过程。

看这张图

image-20211216185356596

接下来我们来根据这张图分析一下shuffle的一些细节信息,首先看map阶段,最左边有一个inputsplit,最终会产生一个map任务,map任务在执行的时候会把k1,v1转化为k2,v2,这些数据会先临时存储到一个内存缓冲区中,这个内存缓冲区的大小默认是100Mio.sort.mb属性),当达到内存缓冲区大小的80%io.sort.spill.percent)也就是80M的时候,会把内存中的数据溢写到本地磁盘中(mapred.local.dir),一直到map把所有的数据都计算完,最后会把内存缓冲区中的数据一次性全部刷新到本地磁盘文件中,在这个图里面表示产生了3个临时文件,每个临时文件中有3个分区,这是由于map阶段中对数据做了分区,所以数据在存储的时候,在每个临时文件中也划分为了3块,最后需要对这些临时文件进行合并,合并为一个大文件,因为一个map任务最终只会产生一个文件,这个合并之后的文件也是有3个分区的,这3个分区的数据会被shuffle线程分别拷贝到三个不同的reduce节点,图里面只显示了一个reduce节点,下面还有两个没有显示。不同map任务中的相同分区的数据会在同一个reduce节点进行合并,合并以后会执行reduce的功能,最终产生结果数据。

在这里shuffle其实是横跨map端和reduce端的,它主要是负责把map端产生的数据通过网络拷贝到reduce阶段进行统一聚合计算。

4.7.2 Hadoop中序列化机制

咱们前面在开发MapReduce程序的时候使用到了LongWritableText这些数据类型,这些数据类型对应的是Java中的LongString,那MapReduce为什么不直接使用Java中的这些数据类型呢?那肯定是嫌弃Java中的这些数据类型使用起来不爽,那具体不爽在什么地方呢?

这个其实就涉及到序列化这个知识点了,下面我们来分析一下,来看这张图,

image-20211216185918677

我们的map阶段在读取数据的是需要从hdfs中读取的,这里面需要经过磁盘IO和网络IO,不过正常情况下map任务会执行本地计算,也就是map任务会被分发到数据所在的节点进行计算,这个时候,网络io几乎就没有了,就剩下了磁盘io,再往后面看,map阶段执行完了以后,数据会被写入到本地磁盘文件,这个时候也需要经过磁盘io,后面的shuffle拷贝数据其实也需要先经过磁盘io把数据从本地磁盘读出来再通过网络发送到reduce节点,再写入reduce节点的本地磁盘,然后reduce阶段在执行的时候会经过磁盘io读取本地文件中的数据,计算完成以后还会经过磁盘io和网络io把数据写入到hdfs中。经过我们刚才的分析,其实在这里面占得比重最高的是磁盘io,所以说影响mapreduce任务执行效率的主要原因就是磁盘io,如果想要提高任务执行效率,就需要从这方面着手分析。当程序在向磁盘中写数据以及从磁盘中读取数据的时候会对数据进行序列化和反序列化,磁盘io这些步骤我们省略不了,但是我们可以从序列化和反序列化这一块来着手做一些优化。

首先我们分析一下序列化和反序列化,看这个图,当我们想把内存中的数据写入到文件中的时候,会对数据序列化,然后再写入,这个序列化其实就是把内存中的对象信息转成二进制的形式,方便存储到文件中,默认java中的序列化会把对象及其父类、超类的整个继承体系信息都保存下来,这样存储的信息太大了,就会导致写入文件的信息过大,这样写入是会额外消耗性能的。

反序列化也是一样,reduce端想把文件中的对象信息加载到内存中,如果文件很大,在加载的时候也会额外消耗很多性能,所以如果我们把对象存储的信息尽量精简,那么就可以提高数据写入和读取消耗的性能。

基于此,hadoop官方实现了自己的序列化和反序列化机制,没有使用java中的序列化机制,所以hadoop中的数据类型没有沿用java中的数据类型,而是自己单独设计了一些writable的实现了,例如、longwritable、text等
那我们来看一下Hadoop中提供的常用的基本数据类型的序列化类。

image-20211216190154745

在这需要注意一下

Text等价于java.lang.String的Writable,针对UTF-8序列

NullWritable是单例,获取实例使用NullWritable.get()

那下面我们来总结一下hadoop自己实现的序列化有什么特点:

  1. 紧凑: 高效使用存储空间
  2. 快速: 读写数据的额外开销小
  3. 可扩展: 可透明地读取老格式的数据
  4. 互操作: 支持多语言的交互

对应的我们也对java中序列化的不足之后做了一个总结:

  1. 不精简,附加信息多,不太适合随机访问
  2. 存储空间大,递归地输出类的超类描述直到不再有超类

前面我们分析了Java中的序列化和Hadoop中的序列化,其实最主要的区别就是针对相同的数据,Java中的序列化会占用较大的存储空间,而Hadoop中的序列化可以节省很多存储空间,这样在海量数据计算的场景下,可以减少数据传输的大小,极大的提高计算效率,下面我们就来具体实战分析一下。

先创建一个Java的序列化代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package tipdm.mr;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.*;

/**
* Java中的序列化
*/
public class JavaSerialize {
public static void main(String[] args) throws IOException {
// 创建Student对象,并设置id和name属性
StudentJava studentJava = new StudentJava();
studentJava.setId(1L);
studentJava.setName("Hadoop");

// 将Student对象的当前状态信息写入本地文件中
FileOutputStream fos = new FileOutputStream("D:\\student_java.txt");
ObjectOutputStream oos = new ObjectOutputStream(fos);
oos.writeObject(studentJava);
oos.close();
fos.close();
}
}

class StudentJava implements Serializable{
private static final long serialVersionUID = 1L;
private Long id;
private String name;

public static long getSerialVersionUID() {
return serialVersionUID;
}

public Long getId() {
return id;
}

public void setId(Long id) {
this.id = id;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}
}

接下来再开发Hadoop的序列化代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package tipdm.mr;

import org.apache.hadoop.io.Writable;

import java.io.*;

/**
* Hadoop序列化机制
*/
public class HadoopSerialize {
public static void main(String[] args) throws IOException {
// 创建Student对象,设置id和name属性
StudentWritable studentWritable = new StudentWritable();
studentWritable.setId(1L);
studentWritable.setName("Hadoop");

// 将Student对象的当前状态写入本地文件中
FileOutputStream fos = new FileOutputStream("D:\\sudent_hadoop.txt");
ObjectOutputStream oos = new ObjectOutputStream(fos);
studentWritable.write(oos);
oos.close();
fos.close();
}
}

class StudentWritable implements Writable{
private Long id;
private String name;

public Long getId() {
return id;
}

public void setId(Long id) {
this.id = id;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

@Override
public void write(DataOutput out) throws IOException {
out.writeLong(this.id);
out.writeUTF(this.name);
}

@Override
public void readFields(DataInput in) throws IOException {
this.id = in.readLong();
this.name = in.readUTF();
}
}

执行这两个代码,最终会在D盘下产生两个文件,查看这两个文件的大小,最终发现Java序列化的文件大小是Hadoop序列化文件大小的10倍左右。

image-20211216190524861

4.7.3 InputFormat层级分析

源码分析。。。。

4.8 MapReduce性能优化

4.8.1 小文件问题

Hadoop的HDFS和Mapreduce框架是针对大数据文件来设计的,在小文件的处理上不但效率低下,而且十分消耗内存资源。

解决该问题通常是找一个容器将小文件打包起来。

HDFS提供了两种类型的容器,SequenceFile和MapFile

SequenceFile是Hadoop提供的一种二进制文件,这种二进制文件直接将对序列化到文件中。

一般对小文件可以使用这种文件合并,即将文件名作为key,文件内容作为value序列化到大文件中。

  • 注意:SequeceFile需要一个合并文件的过程,文件较大,且合并后的文件将不方便查看,必须通过遍历查看每一个小文件。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
package tipdm.mr;

import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;

import java.io.File;
import java.io.IOException;
import java.sql.SQLOutput;

/**
* 小文件解决方案之SequenceFile
*
*/
public class SmallFileSeq {
public static void main(String[] args) throws Exception {
// 生成SequenceFile文件
write("D:\\smallFile", "/seqFile");

// 读取SequenceFile文件
read("/seqFile");
}
/**
* 生成SequenceFile文件
* @param inputDir 输入目录-windows目录
* @param outputFile 输出文件-hdfs文件
* @throws Exception
*/
public static void write(String inputDir, String outputFile) throws Exception{
// 创建一个配置对象
Configuration conf = new Configuration();
// 指定HDFS的地址
conf.set("fs.defaultFS", "hdfs://master:9000");
// 获取操作HDFS的对象
FileSystem fileSystem = FileSystem.get(conf);

// 删除HDFS上的输出文件
fileSystem.delete(new Path(outputFile), true);

// 构造opts数组,有三个元素
/*
第一个是输出路径【文件】
第二个是key的类型
第三个是value的类型
*/
SequenceFile.Writer.Option[] opts = new SequenceFile.Writer.Option[]{
SequenceFile.Writer.file(new Path(outputFile)),
SequenceFile.Writer.keyClass(Text.class),
SequenceFile.Writer.valueClass(Text.class)
};
// 创建了一个write实例
SequenceFile.Writer writer = SequenceFile.createWriter(conf, opts);
// 指定需要压缩的文件目录
File InputDirPath = new File(inputDir);
if(InputDirPath.isDirectory()){
File[] files = InputDirPath.listFiles();
// 迭代文件
for (File file : files) {
// 获取文件的全部内容
String content = FileUtils.readFileToString(file, "UTF-8");
// 获取文件名
String filename = file.getName();
Text key = new Text(filename);
Text value = new Text(content);
// 向SequenceFile中写入数据
writer.append(key, value);
}
}
writer.close();
}

/**
* 读取SequenceFile文件
* @param inputFile SequenceFile文件路径
* @throws IOException
*/
private static void read(String inputFile) throws IOException {
// 创建一个配置对象
Configuration conf = new Configuration();
// 指定HDFS的地址
conf.set("fs.defaultFS", "hdfs://master:9000");
// 创建阅读器
SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(new Path(inputFile)));
Text key = new Text();
Text value = new Text();
while(reader.next(key, value)){
// 输出文件名称
System.out.println("文件名:" + key.toString() + ",");
// 输出文件内容
System.out.println("文件内容:" + value.toString() + ",");
}
reader.close();
}
}

MapFile是排序后的SequenceFile,MapFile由两部分组成,分别是index和data

index作为小文件的数据索引,主要记录了每个Record的key值,以及该Record在文件中的偏移位置。

在MapFile被访问的时候,索引文件回被加载到内存,通过索引映射关系可迅速定位到指定Record所在文件位置。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
package tipdm.mr;

import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;

import java.io.File;
import java.io.IOException;

/**
* 小文件解决方案之MapFile
*/
public class SmallFileMap {
public static void main(String[] args) throws Exception {
// 生成MapFile文件
write("D:\\smallFile", "/mapFile");

// 读取MapFile文件
read("/mapFile");
}

/**
* 生成MapFile文件
* @param inputDir 输入目录 - windows目录
* @param outputDir 输出目录 - hdfs目录
* @throws Exception
*/
private static void write(String inputDir, String outputDir) throws Exception{
// 创建一个配置对象
Configuration conf = new Configuration();
// 指定HDFS的地址
conf.set("fs.defaultFS", "hdfs://master:9000");
// 获取操作HDFS的对象
FileSystem fileSystem = FileSystem.get(conf);

// 删除HDFS上的输出文件
fileSystem.delete(new Path(outputDir), true);

// 构造opts数组,有两个元素
/*
第一个是key的类型
第二个是value的类型
*/
SequenceFile.Writer.Option[] opts = new SequenceFile.Writer.Option[]{
MapFile.Writer.keyClass(Text.class),
MapFile.Writer.valueClass(Text.class)
};
// 创建了一个write实例
MapFile.Writer writer = new MapFile.Writer(conf, new Path(outputDir), opts);
// 指定需要压缩的文件目录
File InputDirPath = new File(inputDir);
if(InputDirPath.isDirectory()){
File[] files = InputDirPath.listFiles();
// 迭代文件
for (File file : files) {
// 获取文件的全部内容
String content = FileUtils.readFileToString(file, "UTF-8");
// 获取文件名
String filename = file.getName();
Text key = new Text(filename);
Text value = new Text(content);
// 向SequenceFile中写入数据
writer.append(key, value);
}
}
writer.close();
}

/**
* 读取MapFile文件
* @param inputDir - MapFile文件路径
* @throws IOException
*/
private static void read(String inputDir) throws IOException {
// 创建一个配置对象
Configuration conf = new Configuration();
// 指定HDFS的地址
conf.set("fs.defaultFS", "hdfs://master:9000");
// 创建阅读器
MapFile.Reader reader = new MapFile.Reader(new Path(inputDir), conf);
Text key = new Text();
Text value = new Text();
while(reader.next(key, value)){
// 输出文件名称
System.out.println("文件名:" + key.toString() + ",");
// 输出文件内容
System.out.println("文件内容:" + value.toString() + ",");
System.out.println("======================");
}
reader.close();
}
}

案例:使用SequenceFile实现小文件存储和计算

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package tipdm.mr;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFilter;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

/**
* 需求:读取SequenceFile文件
*/


public class WordCountJobSeq {
/**
* Map阶段
*/
public static class MyMapper extends Mapper<Text, Text, Text, LongWritable>{
Logger logger = LoggerFactory.getLogger(MyMapper.class);
/**
* 需要实现map函数
* 这个map函数就是可以接收<k1, v1>, 产生<k2, v2>
* @param k1
* @param v1
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void map(Text k1, Text v1, Context context)
throws IOException, InterruptedException {
// 输出k1, v1的值
System.out.println("<k1, v1> = <" + k1.toString() + "," + v1.toString() + ">");
// logger.info("<k1, v1> = <" + k1.toString() + "," + v1.toString() + ">");
//k1 代表的是每一行数据的行首偏移量,v1代表的是每一行内容
//对获取到的每一行数据进行切割,把单词切割出来
String[] words = v1.toString().split(" ");
//迭代切割出来的单词数据
for (String word : words){
//把迭代出来的单词封装成<k2,v2>的形式
Text k2 = new Text(word);
LongWritable v2 = new LongWritable(1L);
//把<k2,v2>写出去
context.write(k2, v2);
}
}
}

/**
* Reduce阶段
*/
public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
Logger logger = LoggerFactory.getLogger(MyMapper.class);
/**
* 针对<k2,{v2...}>的数据进行累加求和
* 并且最终把数据转化为k3,v3写出去
* @param k2
* @param v2s
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void reduce(Text k2, Iterable<LongWritable> v2s, Context context)
throws IOException, InterruptedException {
//创建一个sum变量,保存v2s的和
long sum = 0L;
//对v2s中的数据进行累加求和
for (LongWritable v2: v2s){
// 输出k2, v2的值
// System.out.println("<k2, v2> = <" + k2.toString() + "," + v2.get() + ">");
logger.info("<k2, v2> = <" + k2.toString() + "," + v2.get() + ">");
sum += v2.get();
}
// 组装k3,v3
Text k3 = k2;
LongWritable v3 = new LongWritable(sum);
// 输出k3, v3的值
// System.out.println("<k3, v3> = <" + k3.toString() + "," + v3.get() + ">");
logger.info("<k3, v3> = <" + k3.toString() + "," + v3.get() + ">");
// 把结果写出去
context.write(k3, v3);
}
}

/**
* 组装Job=Map+Reduce
*/
public static void main(String[] args) {
try{
if(args.length!=2){
//如果传递的参数不够,程序直接退出
System.exit(100);
}

//指定Job需要的配置参数
Configuration conf = new Configuration();
//创建一个Job
Job job = Job.getInstance(conf);

//注意了:这一行必须设置,否则在集群中执行的时候是找不到WordCountJob类的
job.setJarByClass(WordCountJobSeq.class);

//指定输入路径(可以是文件,也可以是目录)
FileInputFormat.setInputPaths(job, new Path(args[0]));
//指定输出路径(只能指定一个不存在的目录)
FileOutputFormat.setOutputPath(job, new Path(args[1]));

//指定map相关代码
job.setMapperClass(MyMapper.class);
//指定k2的类型
job.setMapOutputKeyClass(Text.class);
//指定v2的类型
job.setMapOutputValueClass(LongWritable.class);

// 设置输入数据处理类
job.setInputFormatClass(SequenceFileInputFormat.class);

//指定reduce相关代码
job.setReducerClass(MyReducer.class);
//指定k3类型
job.setOutputKeyClass(Text.class);
//指定v3类型
job.setOutputValueClass(LongWritable.class);

//提交job
job.waitForCompletion(true);

} catch (Exception e){
e.printStackTrace();
}
}

}

4.8.2 数据倾斜问题

MapReduce程序执行时,Reduce节点大部分执行完毕,但是有一个或者几个Reduce节点运行很慢,导致整个程序处理实际变得很长,具体表现为:Reduce阶段一直卡着不动:

解决方法:

  1. 增加Reduce任务个数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package tipdm.mr;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

/**
* 数据倾斜-增加reduce任务个数
*/


public class WordCountSkew {
/**
* Map阶段
*/
public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
Logger logger = LoggerFactory.getLogger(MyMapper.class);
/**
* 需要实现map函数
* 这个map函数就是可以接收<k1, v1>, 产生<k2, v2>
* @param k1
* @param v1
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void map(LongWritable k1, Text v1, Context context)
throws IOException, InterruptedException {
// 输出k1, v1的值
// System.out.println("<k1, v1> = <" + k1.get() + "," + v1.toString() + ">");
logger.info("<k1, v1> = <" + k1.get() + "," + v1.toString() + ">");
//k1 代表的是每一行数据的行首偏移量,v1代表的是每一行内容
//对获取到的每一行数据进行切割,把单词切割出来
String[] words = v1.toString().split(" ");
//迭代切割出来的单词数据
for (String word : words){
//把迭代出来的单词封装成<k2,v2>的形式
Text k2 = new Text(word);
LongWritable v2 = new LongWritable(1L);
//把<k2,v2>写出去
context.write(k2, v2);
}
}
}

/**
* Reduce阶段
*/
public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
Logger logger = LoggerFactory.getLogger(MyMapper.class);
/**
* 针对<k2,{v2...}>的数据进行累加求和
* 并且最终把数据转化为k3,v3写出去
* @param k2
* @param v2s
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void reduce(Text k2, Iterable<LongWritable> v2s, Context context)
throws IOException, InterruptedException {
//创建一个sum变量,保存v2s的和
long sum = 0L;
//对v2s中的数据进行累加求和
for (LongWritable v2: v2s){
// 输出k2, v2的值
// System.out.println("<k2, v2> = <" + k2.toString() + "," + v2.get() + ">");
logger.info("<k2, v2> = <" + k2.toString() + "," + v2.get() + ">");
sum += v2.get();
// 模拟reduce的复杂计算消耗的实际
if (sum%200 == 0){
Thread.sleep(1);
}
}
// 组装k3,v3
Text k3 = k2;
LongWritable v3 = new LongWritable(sum);
// 输出k3, v3的值
// System.out.println("<k3, v3> = <" + k3.toString() + "," + v3.get() + ">");
logger.info("<k3, v3> = <" + k3.toString() + "," + v3.get() + ">");
// 把结果写出去
context.write(k3, v3);
}
}

/**
* 组装Job=Map+Reduce
*/
public static void main(String[] args) {
try{
if(args.length!=3){
//如果传递的参数不够,程序直接退出
System.exit(100);
}

//指定Job需要的配置参数
Configuration conf = new Configuration();
//创建一个Job
Job job = Job.getInstance(conf);

//注意了:这一行必须设置,否则在集群中执行的时候是找不到WordCountJob类的
job.setJarByClass(WordCountSkew.class);

//指定输入路径(可以是文件,也可以是目录)
FileInputFormat.setInputPaths(job, new Path(args[0]));
//指定输出路径(只能指定一个不存在的目录)
FileOutputFormat.setOutputPath(job, new Path(args[1]));

//指定map相关代码
job.setMapperClass(MyMapper.class);
//指定k2的类型
job.setMapOutputKeyClass(Text.class);
//指定v2的类型
job.setMapOutputValueClass(LongWritable.class);

//指定reduce相关代码
job.setReducerClass(MyReducer.class);
//指定k3类型
job.setOutputKeyClass(Text.class);
//指定v3类型
job.setOutputValueClass(LongWritable.class);

// 设置reduce任务个数
job.setNumReduceTasks(Integer.parseInt(args[2]));
//提交job
job.waitForCompletion(true);

} catch (Exception e){
e.printStackTrace();
}
}

}
  1. 把倾斜的数据打散
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
package tipdm.mr;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Random;

/**
* 数据倾斜-把倾斜的数据打散
*/


public class WordCountSkewRandKey {
/**
* Map阶段
*/
public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
Logger logger = LoggerFactory.getLogger(MyMapper.class);
/**
* 需要实现map函数
* 这个map函数就是可以接收<k1, v1>, 产生<k2, v2>
* @param k1
* @param v1
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void map(LongWritable k1, Text v1, Context context)
throws IOException, InterruptedException {
Random random = new Random();
// 输出k1, v1的值
// System.out.println("<k1, v1> = <" + k1.get() + "," + v1.toString() + ">");
logger.info("<k1, v1> = <" + k1.get() + "," + v1.toString() + ">");
//k1 代表的是每一行数据的行首偏移量,v1代表的是每一行内容
//对获取到的每一行数据进行切割,把单词切割出来
String[] words = v1.toString().split(" ");
//迭代切割出来的单词数据
for (String word : words){
if("5".equals(word)){
// 把倾斜的key打散,分成10份
word = "5_" + random.nextInt(10);
}
//把迭代出来的单词封装成<k2,v2>的形式
Text k2 = new Text(word);
LongWritable v2 = new LongWritable(1L);
//把<k2,v2>写出去
context.write(k2, v2);
}
}
}

/**
* Reduce阶段
*/
public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
Logger logger = LoggerFactory.getLogger(MyMapper.class);
/**
* 针对<k2,{v2...}>的数据进行累加求和
* 并且最终把数据转化为k3,v3写出去
* @param k2
* @param v2s
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void reduce(Text k2, Iterable<LongWritable> v2s, Context context)
throws IOException, InterruptedException {
//创建一个sum变量,保存v2s的和
long sum = 0L;
//对v2s中的数据进行累加求和
for (LongWritable v2: v2s){
// 输出k2, v2的值
// System.out.println("<k2, v2> = <" + k2.toString() + "," + v2.get() + ">");
logger.info("<k2, v2> = <" + k2.toString() + "," + v2.get() + ">");
sum += v2.get();
// 模拟reduce的复杂计算消耗的实际
if (sum%200 == 0){
Thread.sleep(1);
}
}
// 组装k3,v3
Text k3 = k2;
LongWritable v3 = new LongWritable(sum);
// 输出k3, v3的值
// System.out.println("<k3, v3> = <" + k3.toString() + "," + v3.get() + ">");
logger.info("<k3, v3> = <" + k3.toString() + "," + v3.get() + ">");
// 把结果写出去
context.write(k3, v3);
}
}

/**
* 组装Job=Map+Reduce
*/
public static void main(String[] args) {
try{
if(args.length!=3){
//如果传递的参数不够,程序直接退出
System.exit(100);
}

//指定Job需要的配置参数
Configuration conf = new Configuration();
//创建一个Job
Job job = Job.getInstance(conf);

//注意了:这一行必须设置,否则在集群中执行的时候是找不到WordCountJob类的
job.setJarByClass(WordCountSkewRandKey.class);

//指定输入路径(可以是文件,也可以是目录)
FileInputFormat.setInputPaths(job, new Path(args[0]));
//指定输出路径(只能指定一个不存在的目录)
FileOutputFormat.setOutputPath(job, new Path(args[1]));

//指定map相关代码
job.setMapperClass(MyMapper.class);
//指定k2的类型
job.setMapOutputKeyClass(Text.class);
//指定v2的类型
job.setMapOutputValueClass(LongWritable.class);

//指定reduce相关代码
job.setReducerClass(MyReducer.class);
//指定k3类型
job.setOutputKeyClass(Text.class);
//指定v3类型
job.setOutputValueClass(LongWritable.class);

// 设置reduce任务个数
job.setNumReduceTasks(Integer.parseInt(args[2]));
//提交job
job.waitForCompletion(true);

} catch (Exception e){
e.printStackTrace();
}
}
}
-------------本文结束感谢您的阅读-------------