0%

Hadoop-7-MapReduce概述

MapReduce定义

MapReduce是一个分布式运算程序的编程框架,是用户开发“基于Hadoop的数据分析应用”的核心框架。

MapReduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个Hadoop集群上。

MapReduce优缺点

优点

  1. MapReduce易于编程

    它简单的实现一些接口,就可以完成一个分布式程序,这个分布式程序可以分布到大量廉价的PC机器上运行。也就是说你写一个分布式程序,跟写一个简单的串行程序是一模一样的。就是因为这个特点使得MapReduce编程变得非常流行。

  2. 良好的扩展性

    当你的计算资源不能得到满足的时候,你可以通过简单的增加机器来扩展它的计算能力。

  3. 高容错性

    MapReduce设计的初衷就是使程序能够部署在廉价的PC机器上,这就要求它具有很高的容错性。比如其中一台机器挂了,它可以把上面的计算任务转移到另外一个节点上运行,不至于这个任务运行失败,而且这个过程不需要人工参与,而完全是由Hadoop内部完成的。

  4. 适合PB级以上海量数据的离线处理

    可以实现上千台服务器集群并发工作,提供数据处理能力。

缺点

  1. 不擅长实时计算

    MapReduce无法像MySQL一样,在毫秒或者秒级内返回结果。

  2. 不擅长流式计算

    流式计算的输入数据是动态的,而MapReduce的输入数据集是静态的,不能动态变化。这是因为MapReduce自身的设计特点决定了数据源必须是静态的。

  3. 不擅长DAG(有向无环图)计算

    多个应用程序存在依赖关系,后一个应用程序的输入为前一个的输出。在这种情况下,MapReduce并不是不能做,而是使用后,每个MapReduce作业的输出结果都会写入到磁盘,会造成大量的磁盘IO,导致性能非常的低下。

MapReduce核心思想

MapReduce核心思想如下所示:

image-20230716203008973

在这里以对钞票金额进行统计为例分析,分为以下几个步骤:

  1. 对这一堆钞票进行分堆,分成三堆然后找三个人过来一起统计金额,每人负责一堆。这个过程叫做MapReduce中的数据集切片
  2. 每个人负责将各自负责的金额统计出来,这个过程叫做Map阶段每个人就是一个MapTask
  3. 待每个人都将自己的金额统计出来后,再找两个人过来负责汇总前面每个人的统计结果。这个过程就叫做Reduce过程,这两个人都叫做ReduceTask
  4. 最终这两个人统计完后将结果输出,这样就使用了MapReduce思想,完成了一个任务。

MapReduce进程

一个完整的MapReduce程序在分布式运行时有三类实例进程:

  1. MrAppMaster:负责整个程序的过程调度及状态协调。
  2. MapTask:负责Map阶段的整个数据处理流程。
  3. ReduceTask:负责Reduce阶段的整个数据处理流程。

常用数据序列化类型

Java类型 Hadoop Writable类型
Boolean BooleanWritable
Byte ByteWritable
Int IntWritable
Float FloatWritable
Long LongWritable
Double DoubleWritable
String Text
Map MapWritable
Array ArrayWritable
Null NullWritable

MapReduce编程规范

用户编写的程序分成三个部分:MapperReducerDriver

Mapper阶段

  1. 用户自定义的Mapper要继承自己的父类
  2. Mapper的输入数据是KV对的形式(KV的类型可自定义)
  3. Mapper中的业务逻辑写在map()方法中
  4. Mapper的输出数据是KV对的形式(KV的类型可自定义)
  5. map()方法(MapTask进程)对每一个调用一次

Reducer阶段

  1. 用户自定义的Reducer要继承自己的父类。
  2. Reducer的输入数据类型对应Mapper的输出数据类型,也是KV
  3. Reducer的业务逻辑写在reduce()方法中。
  4. ReduceTask进程对每一组相同k的组调用一次reduce()方法。

Driver阶段

相当于YARN集群的客户端,用于提交我们整个程序到YARN集群,提交的是封装了MapReduce程序相关运行参数的job对象。

WordCount案例实操

本地测试

首先,在本地创建一个名为MRDemoMaven工程。并在pom.xml中放入以下依赖包。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
    <dependencies>
<!-- hadoop所需要的依赖-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.3</version>
</dependency>
<!-- 单元测试-->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<!-- 日志管理工具-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.30</version>
</dependency>
</dependencies>

为了能够顺利使用log4j打印出日志信息,还需要在resources目录下创建一个log4j.properties配置文件,并填入以下内容。

1
2
3
4
5
6
7
8
log4j.rootLogger=INFO, stdout  
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

然后创建包com.tipdm.mr.wordcount

上述内容完成后,基本的HadoopMaven就准本完成。

image-20230716205120020

接下来开始编写相关的脚本。

  1. Mapper类的编写。

    Map阶段会运行MapTaskMapTask会调用Mapper类。

    作用:在该类中实现需要在MapTask中实现的业务逻辑代码,该方法在被循环调用,每调用一次传入一行数据。

    编写的Mapper类需要继承自HadoopMapper类,并且Mapper类中有四个泛型。

    Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>

    • KEYIN:读取数据的偏移量的类型
    • VALUEIN:读取的一行一行的数据类型
    • KEYOUT:写出的key的类型(在这是单词的类型)
    • VALUEOUT:写出的value的类型(在这是单词的数量的类型)
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    package com.tipdm.mr.wordcount;

    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;

    import java.io.IOException;

    public class WCMapper extends Mapper <LongWritable, Text, Text, LongWritable>{
    /**
    * 1. 在map方法中实现需要在MapTask中实现的业务逻辑代码
    * 2. 该方法在被循环调用,每调用一次传入一行数据
    * @param key 读取数据时的偏移量
    * @param value 读取的数据(一行一行的数据)
    * @param context 上下文(在这用来将Key,Value写出去)
    * @throws IOException
    * @throws InterruptedException
    */

    // 由于每次循环都要重新定义,所以可以设置为私有变量放入到外面
    // 3.1 创建key的对象
    private Text outKey = new Text();
    // 3.2 创建value的对象
    private LongWritable outValue = new LongWritable();
    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws IOException, InterruptedException {
    // super.map(key, value, context); 这里是调用父类方法
    // 1. 将数据进行切割
    // 1.1 将Text转成String --- 为了使用String API进行处理
    String line = value.toString();
    // 1.2 对数据进行切割
    String[] words = line.split(" ");
    // 2. 遍历数据
    for (String word : words) {
    // 3. 封装Key,Value
    // 由于每一行都需要创建新的Text和LongWritable,所以将该值设置为私有属性放外面。
    // // 3.1 创建key的对象
    // Text outKey = new Text();
    // // 3.2 创建value的对象
    // LongWritable outValue = new LongWritable();
    // 3.3 给key赋值
    outKey.set(word);
    // 3.4 给value赋值
    outValue.set(1);
    // 4. 将Key,value 写出去
    context.write(outKey, outValue);
    }
    }
    }
  2. Reducer类的编写

    Reduce阶段会运行ReduceTaskReduceTask会调用Reducer类,每调用一次传入一组数据。

    作用:在该类中实现需要在ReduceTask中实现的业务逻辑代码。

    编写的Reducer类需要继承自HadoopReducer类,并且Reducer类中有四个泛型。

    Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>

    • KEYIN:读取的key的类型(Mapper写出的key的类型)
    • VALUEIN:读取的value的类型(Mapper写出的value类型)
    • KEYOUT:写出的key的类型(这里是单词)
    • VALUEOUT:写出的value的类型(这里是单词的数量)
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    package com.tipdm.mr.wordcount;

    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;

    import java.io.IOException;

    public class WCReducer extends Reducer <Text, LongWritable, Text, LongWritable>{
    /**
    * 1. 在reduce方法中实现需要在ReduceTask中实现的业务逻辑代码
    * 2. reduce方法在被循环利用,每调用一次传入一组数据(在这key值相同为一组)
    * @param key 读取的key
    * @param values 读取的所有的value
    * @param context 上下文(在这用来将key,value写出去)
    * @throws IOException
    * @throws InterruptedException
    */

    // 3.1 封装key, value
    private LongWritable outValue = new LongWritable();
    @Override
    protected void reduce(Text key, Iterable<LongWritable> values, Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
    long sum = 0; // value的和
    // 1. 遍历所有的value
    for (LongWritable value : values) {
    // 2. 对value进行累加
    // 2.1 将LongWrite转化成long
    long v = value.get();
    // 2.2 累加value
    sum += v;
    }
    // 3. 封装key, value
    // 3.2 给value赋值
    outValue.set(sum);
    // 4. 将key, value写出去
    context.write(key, outValue);
    }
    }
  3. Driver类的编写

    Driver类是程序的入口,在该类中的相关设置可以控制程序的运行方式,读取和存储数据的路径。

    分为以下几个步骤:

    1)创建Job示例

    2)Job赋值

    3)运行Job

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    package com.tipdm.mr.wordcount;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

    import java.io.IOException;


    public class WCDriver {
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
    // 1. 创建Job示例
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf);

    // 2. 给Job赋值
    // 2.1 关联本程序的Jar -- 如果是本地可以不写,在集群上运行必须写
    job.setJarByClass(WCDriver.class);
    // 2.2 设置Mapper类和Reduce类
    job.setMapperClass(WCMapper.class);
    job.setReducerClass(WCReducer.class);
    // 2.3 设置Mapper输出的key, value的类型
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(LongWritable.class);
    // 2.4 设置最终输出的key,value的类型(在这是reducer输出的key,value的类型)
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(LongWritable.class);
    // 2.5 设置输入和输出路径
    FileInputFormat.setInputPaths(job, new Path("E:\\学习笔记\\大数据开发\\5_Hadoop\\ioText\\input"));
    // 输出路径一定不能存在否则报错
    FileOutputFormat.setOutputPath(job, new Path("E:\\学习笔记\\大数据开发\\5_Hadoop\\ioText\\output"));
    // 3. 运行Job
    // waitForCompletion(boolean verbose)
    // verbose:是否打印执行过程信息
    // 返回值:如果job执行成功返回true
    boolean b = job.waitForCompletion(true);
    System.out.println("b======" + b);
    }
    }

    这里是在本地运行MapReduce代码,所以设置的输入和输出路径都是本地路径。

    输入路径为:E:\\学习笔记\\大数据开发\\5_Hadoop\\ioText\\input

    在该路径下有一个a.txt文件,文件内容如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    minglog minglog
    asd ddd
    aa aa
    ss ss
    cls cls
    jiao
    banzhang
    xue
    hadoop ss ss
    cls cls
    jiao
    banzhang
    xue
    hadoop
    ss ss
    minglog minglog
    asd ddd
    aa aa
    minglog minglog
    asd ddd
    aa aa
    cls cls
    jiao
    banzhang
    xue
    hadoop

    输出路径为:E:\\学习笔记\\大数据开发\\5_Hadoop\\ioText\\output

    注意:输出路径一定不能存在,否则会报错。

    image-20230716211602086

    运行完毕后在该输出路径下会生成输出文件。

    image-20230716211652899

    用文本查看器打开part-r-00000文件。

    image-20230716211729309

    该文件下即可看到单词统计结果,本地MapReduce任务运行成功。

提交到集群测试

这种方式往往是在实际生产中用得最多的。

在集群中测试不能将路径参数写死,可以采用对main方法传参的方式执行任务。

修改Driver类如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package com.tipdm.mr.wordcount;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class WCDriver2 {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
// 1. 创建Job示例
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);

// 2. 给Job赋值
job.setJarByClass(WCDriver2.class);
job.setMapperClass(WCMapper.class);
job.setReducerClass(WCReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);

// 将输入和输出路径通过main方法传参的方式传入
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 3. 运行Job
boolean b = job.waitForCompletion(true);
System.out.println("b======" + b);
}
}

然后将该Maven工程打包成jar包。

image-20230716212030392

打包完成后,在工程下会生成一个target文件夹,在文件夹下就有我们已经打包好的jar包。

image-20230716212052559

将该jar包发送到集群。

image-20230716212155056

然后检查下在hdfs的根目录下是否有output文件夹,有的话将其删除。

image-20230716212343793

然后使用以下命令在hadoop中提交MarReduce任务。

1
hadoop jar MRDemo-1.0-SNAPSHOT.jar com.tipdm.mr.wordcount.WCDriver2 /input /output

image-20230716212414927

image-20230716212424371

image-20230716212433736

运行成功,然后在hdfs根目录下会生成一个output文件夹。

image-20230716212527588

在该文件夹下也有输出文件。

image-20230716212548222

输入文件的内容,也是单词统计结果。

image-20230716212606572

并且在Yarn下也可以看到我们提交的任务,我这里是已经提交过多次,所以有多条。

image-20230716212718654

从客户端向集群提交任务

这种方式在实际生产用得并不多,但是也需要了解。

这个时候需要在Driver类中的Configuration添加以下内容:

1
2
3
4
5
6
7
8
9
10
11
// 1. 创建Job示例
Configuration conf = new Configuration();
// 配置在集群中执行
//设置在集群运行的相关参数-设置HDFS,NAMENODE的地址
conf.set("fs.defaultFS", "hdfs://hadoop102:8020");
//指定MR运行在Yarn上
conf.set("mapreduce.framework.name","yarn");
//指定MR可以在远程集群运行
conf.set("mapreduce.app-submission.cross-platform", "true");
//指定yarn resourcemanager的位置
conf.set("yarn.resourcemanager.hostname", "hadoop103");

添加后的完整代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package com.tipdm.mr.wordcount;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/*
从本地向集群提交Job
*/
public class WCDriver3 {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
// 1. 创建Job示例
Configuration conf = new Configuration();
// 配置在集群中执行
//设置在集群运行的相关参数-设置HDFS,NAMENODE的地址
conf.set("fs.defaultFS", "hdfs://hadoop102:8020");
//指定MR运行在Yarn上
conf.set("mapreduce.framework.name","yarn");
//指定MR可以在远程集群运行
conf.set("mapreduce.app-submission.cross-platform", "true");
//指定yarn resourcemanager的位置
conf.set("yarn.resourcemanager.hostname", "hadoop103");


Job job = Job.getInstance(conf);

// 2. 给Job赋值
job.setJarByClass(WCDriver3.class);
job.setMapperClass(WCMapper.class);
job.setReducerClass(WCReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);

// 将输入和输出路径通过main方法传参的方式传入
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 3. 运行Job
boolean b = job.waitForCompletion(true);
System.out.println("b======" + b);
}
}

然后对其进行打包。

接下来在修改job执行的jar包。

image-20230716213041883

指定刚才打包的jar包进行执行。

此时修改完毕后,还不能直接去执行代码,还需要修改相关的运行配置。

  1. VM配置项:-DHADOOP_USER_NAME=minglog
  2. main方法传参:hdfs://hadoop102:8020/input hdfs://hadoop102:8020/output

如下所示:

image-20230716213139887

然后将集群中hdfs下的output目录删除,运行脚本。

image-20230716213326886

运行成功。

image-20230716213406385

hdfs根目录下会重新生产output文件夹。

image-20230716213439340

image-20230716213455817

-------------本文结束感谢您的阅读-------------