MapReduce定义
MapReduce
是一个分布式运算程序的编程框架,是用户开发“基于Hadoop
的数据分析应用”的核心框架。
MapReduce
核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个Hadoop
集群上。
MapReduce优缺点
优点
MapReduce
易于编程它简单的实现一些接口,就可以完成一个分布式程序,这个分布式程序可以分布到大量廉价的
PC
机器上运行。也就是说你写一个分布式程序,跟写一个简单的串行程序是一模一样的。就是因为这个特点使得MapReduce
编程变得非常流行。良好的扩展性
当你的计算资源不能得到满足的时候,你可以通过简单的增加机器来扩展它的计算能力。
高容错性
MapReduce
设计的初衷就是使程序能够部署在廉价的PC
机器上,这就要求它具有很高的容错性。比如其中一台机器挂了,它可以把上面的计算任务转移到另外一个节点上运行,不至于这个任务运行失败,而且这个过程不需要人工参与,而完全是由Hadoop
内部完成的。适合PB级以上海量数据的离线处理
可以实现上千台服务器集群并发工作,提供数据处理能力。
缺点
不擅长实时计算
MapReduce
无法像MySQL
一样,在毫秒或者秒级内返回结果。不擅长流式计算
流式计算的输入数据是动态的,而
MapReduce
的输入数据集是静态的,不能动态变化。这是因为MapReduce
自身的设计特点决定了数据源必须是静态的。不擅长
DAG
(有向无环图)计算多个应用程序存在依赖关系,后一个应用程序的输入为前一个的输出。在这种情况下,
MapReduce
并不是不能做,而是使用后,每个MapReduce
作业的输出结果都会写入到磁盘,会造成大量的磁盘IO
,导致性能非常的低下。
MapReduce核心思想
MapReduce
核心思想如下所示:
在这里以对钞票金额进行统计为例分析,分为以下几个步骤:
- 对这一堆钞票进行分堆,分成三堆然后找三个人过来一起统计金额,每人负责一堆。这个过程叫做
MapReduce
中的数据集切片
。 - 每个人负责将各自负责的金额统计出来,这个过程叫做
Map阶段
每个人就是一个MapTask
。 - 待每个人都将自己的金额统计出来后,再找两个人过来负责汇总前面每个人的统计结果。这个过程就叫做
Reduce
过程,这两个人都叫做ReduceTask
。 - 最终这两个人统计完后将结果输出,这样就使用了
MapReduce
思想,完成了一个任务。
MapReduce进程
一个完整的MapReduce
程序在分布式运行时有三类实例进程:
- MrAppMaster:负责整个程序的过程调度及状态协调。
- MapTask:负责
Map
阶段的整个数据处理流程。 - ReduceTask:负责
Reduce
阶段的整个数据处理流程。
常用数据序列化类型
Java类型 | Hadoop Writable类型 |
---|---|
Boolean | BooleanWritable |
Byte | ByteWritable |
Int | IntWritable |
Float | FloatWritable |
Long | LongWritable |
Double | DoubleWritable |
String | Text |
Map | MapWritable |
Array | ArrayWritable |
Null | NullWritable |
MapReduce编程规范
用户编写的程序分成三个部分:Mapper
、Reducer
和Driver
。
Mapper阶段
- 用户自定义的
Mapper
要继承自己的父类 Mapper
的输入数据是KV
对的形式(KV
的类型可自定义)Mapper
中的业务逻辑写在map()
方法中Mapper
的输出数据是KV
对的形式(KV
的类型可自定义)- map()方法(MapTask进程)对每一个
调用一次
Reducer阶段
- 用户自定义的
Reducer
要继承自己的父类。 Reducer
的输入数据类型对应Mapper
的输出数据类型,也是KV
。Reducer
的业务逻辑写在reduce()
方法中。- ReduceTask进程对每一组相同k的
组调用一次reduce()方法。
Driver阶段
相当于YARN
集群的客户端,用于提交我们整个程序到YARN
集群,提交的是封装了MapReduce
程序相关运行参数的job
对象。
WordCount案例实操
本地测试
首先,在本地创建一个名为MRDemo
的Maven
工程。并在pom.xml
中放入以下依赖包。
1 | <dependencies> |
为了能够顺利使用log4j
打印出日志信息,还需要在resources
目录下创建一个log4j.properties
配置文件,并填入以下内容。
1 | log4j.rootLogger=INFO, stdout |
然后创建包com.tipdm.mr.wordcount
。
上述内容完成后,基本的Hadoop
的Maven
就准本完成。
接下来开始编写相关的脚本。
Mapper
类的编写。Map
阶段会运行MapTask
,MapTask
会调用Mapper
类。作用:在该类中实现需要在
MapTask
中实现的业务逻辑代码,该方法在被循环调用,每调用一次传入一行数据。编写的
Mapper
类需要继承自Hadoop
的Mapper
类,并且Mapper
类中有四个泛型。Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
KEYIN
:读取数据的偏移量的类型VALUEIN
:读取的一行一行的数据类型KEYOUT
:写出的key
的类型(在这是单词的类型)VALUEOUT
:写出的value
的类型(在这是单词的数量的类型)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49package com.tipdm.mr.wordcount;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class WCMapper extends Mapper <LongWritable, Text, Text, LongWritable>{
/**
* 1. 在map方法中实现需要在MapTask中实现的业务逻辑代码
* 2. 该方法在被循环调用,每调用一次传入一行数据
* @param key 读取数据时的偏移量
* @param value 读取的数据(一行一行的数据)
* @param context 上下文(在这用来将Key,Value写出去)
* @throws IOException
* @throws InterruptedException
*/
// 由于每次循环都要重新定义,所以可以设置为私有变量放入到外面
// 3.1 创建key的对象
private Text outKey = new Text();
// 3.2 创建value的对象
private LongWritable outValue = new LongWritable();
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws IOException, InterruptedException {
// super.map(key, value, context); 这里是调用父类方法
// 1. 将数据进行切割
// 1.1 将Text转成String --- 为了使用String API进行处理
String line = value.toString();
// 1.2 对数据进行切割
String[] words = line.split(" ");
// 2. 遍历数据
for (String word : words) {
// 3. 封装Key,Value
// 由于每一行都需要创建新的Text和LongWritable,所以将该值设置为私有属性放外面。
// // 3.1 创建key的对象
// Text outKey = new Text();
// // 3.2 创建value的对象
// LongWritable outValue = new LongWritable();
// 3.3 给key赋值
outKey.set(word);
// 3.4 给value赋值
outValue.set(1);
// 4. 将Key,value 写出去
context.write(outKey, outValue);
}
}
}Reducer
类的编写Reduce
阶段会运行ReduceTask
,ReduceTask
会调用Reducer
类,每调用一次传入一组数据。作用:在该类中实现需要在
ReduceTask
中实现的业务逻辑代码。编写的
Reducer
类需要继承自Hadoop
的Reducer
类,并且Reducer
类中有四个泛型。Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
KEYIN
:读取的key
的类型(Mapper
写出的key
的类型)VALUEIN
:读取的value
的类型(Mapper
写出的value
类型)KEYOUT
:写出的key
的类型(这里是单词)VALUEOUT
:写出的value
的类型(这里是单词的数量)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39package com.tipdm.mr.wordcount;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class WCReducer extends Reducer <Text, LongWritable, Text, LongWritable>{
/**
* 1. 在reduce方法中实现需要在ReduceTask中实现的业务逻辑代码
* 2. reduce方法在被循环利用,每调用一次传入一组数据(在这key值相同为一组)
* @param key 读取的key
* @param values 读取的所有的value
* @param context 上下文(在这用来将key,value写出去)
* @throws IOException
* @throws InterruptedException
*/
// 3.1 封装key, value
private LongWritable outValue = new LongWritable();
protected void reduce(Text key, Iterable<LongWritable> values, Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
long sum = 0; // value的和
// 1. 遍历所有的value
for (LongWritable value : values) {
// 2. 对value进行累加
// 2.1 将LongWrite转化成long
long v = value.get();
// 2.2 累加value
sum += v;
}
// 3. 封装key, value
// 3.2 给value赋值
outValue.set(sum);
// 4. 将key, value写出去
context.write(key, outValue);
}
}Driver
类的编写Driver
类是程序的入口,在该类中的相关设置可以控制程序的运行方式,读取和存储数据的路径。分为以下几个步骤:
1)创建
Job
示例2)
Job
赋值3)运行
Job
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43package com.tipdm.mr.wordcount;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class WCDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
// 1. 创建Job示例
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2. 给Job赋值
// 2.1 关联本程序的Jar -- 如果是本地可以不写,在集群上运行必须写
job.setJarByClass(WCDriver.class);
// 2.2 设置Mapper类和Reduce类
job.setMapperClass(WCMapper.class);
job.setReducerClass(WCReducer.class);
// 2.3 设置Mapper输出的key, value的类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
// 2.4 设置最终输出的key,value的类型(在这是reducer输出的key,value的类型)
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
// 2.5 设置输入和输出路径
FileInputFormat.setInputPaths(job, new Path("E:\\学习笔记\\大数据开发\\5_Hadoop\\ioText\\input"));
// 输出路径一定不能存在否则报错
FileOutputFormat.setOutputPath(job, new Path("E:\\学习笔记\\大数据开发\\5_Hadoop\\ioText\\output"));
// 3. 运行Job
// waitForCompletion(boolean verbose)
// verbose:是否打印执行过程信息
// 返回值:如果job执行成功返回true
boolean b = job.waitForCompletion(true);
System.out.println("b======" + b);
}
}这里是在本地运行
MapReduce
代码,所以设置的输入和输出路径都是本地路径。输入路径为:
E:\\学习笔记\\大数据开发\\5_Hadoop\\ioText\\input
在该路径下有一个
a.txt
文件,文件内容如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26minglog minglog
asd ddd
aa aa
ss ss
cls cls
jiao
banzhang
xue
hadoop ss ss
cls cls
jiao
banzhang
xue
hadoop
ss ss
minglog minglog
asd ddd
aa aa
minglog minglog
asd ddd
aa aa
cls cls
jiao
banzhang
xue
hadoop输出路径为:
E:\\学习笔记\\大数据开发\\5_Hadoop\\ioText\\output
。注意:输出路径一定不能存在,否则会报错。
运行完毕后在该输出路径下会生成输出文件。
用文本查看器打开
part-r-00000
文件。该文件下即可看到单词统计结果,本地
MapReduce
任务运行成功。
提交到集群测试
这种方式往往是在实际生产中用得最多的。
在集群中测试不能将路径参数写死,可以采用对main
方法传参的方式执行任务。
修改Driver
类如下:
1 | package com.tipdm.mr.wordcount; |
然后将该Maven
工程打包成jar
包。
打包完成后,在工程下会生成一个target
文件夹,在文件夹下就有我们已经打包好的jar
包。
将该jar
包发送到集群。
然后检查下在hdfs
的根目录下是否有output
文件夹,有的话将其删除。
然后使用以下命令在hadoop
中提交MarReduce
任务。
1 | hadoop jar MRDemo-1.0-SNAPSHOT.jar com.tipdm.mr.wordcount.WCDriver2 /input /output |
运行成功,然后在hdfs
根目录下会生成一个output
文件夹。
在该文件夹下也有输出文件。
输入文件的内容,也是单词统计结果。
并且在Yarn
下也可以看到我们提交的任务,我这里是已经提交过多次,所以有多条。
从客户端向集群提交任务
这种方式在实际生产用得并不多,但是也需要了解。
这个时候需要在Driver
类中的Configuration
添加以下内容:
1 | // 1. 创建Job示例 |
添加后的完整代码:
1 | package com.tipdm.mr.wordcount; |
然后对其进行打包。
接下来在修改job
执行的jar
包。
指定刚才打包的jar
包进行执行。
此时修改完毕后,还不能直接去执行代码,还需要修改相关的运行配置。
VM
配置项:-DHADOOP_USER_NAME=minglog
。main
方法传参:hdfs://hadoop102:8020/input hdfs://hadoop102:8020/output
。
如下所示:
然后将集群中hdfs
下的output
目录删除,运行脚本。
运行成功。
在hdfs
根目录下会重新生产output
文件夹。