0%

Hadoop-9-MapReduce框架原理1

MapReduce的构成

image-20230718113427534

从流的角度

Input ---> InputFormat --->Mapper--->Shuffle--->Reducer--->OutputFormat--->Output

从不同的阶段

Map阶段---->Reduce阶段

Map阶段 ---> Shuffle(Map后半段+Reduce前半段)--->Reduce阶段

从源码的角度

MapTaskmap ---> sort

1
2
mapPhase = getProgress().addPhase("map", 0.667f);
sortPhase = getProgress().addPhase("sort", 0.333f);

ReduceTask : copy ---> sort ---> reduce

1
2
3
copyPhase = getProgress().addPhase("copy");
sortPhase = getProgress().addPhase("sort");
reducePhase = getProgress().addPhase("reduce");

InputFormat

切片与MapTask并行度决定机制

  1. 问题引出

    MapTask的并行度决定Map阶段的任务处理并发度,进而影响到整个Job的处理速度。

    思考:1G的数据,启动8MapTask,可以提高集群的并发处理能力。那么1K的数据,也启动8MapTask,会提高集群性能吗?MapTask并行任务是否越多越好呢?哪些因素影响了MapTask并行度?

  2. MapTask并行度决定机制

    数据块:BlockHDFS物理上把数据分成一块一块。数据块是HDFS存储数据单位。

    数据切片:数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储。数据切片是MapReduce程序计算输入数据的单位,一个切片会对应启动一个MapTask

    一般来说,为了提高集群整体运行效率,降低在计算过程中网络带宽的占用。切片的大小默认与HDFS中的Block大小一致。

Job提交流程源码

本地提交流程分析

在此采用断点方式进行分析,断点分析能力需要掌握。

WCDriver中的job.waitForCompletion(true);处打上断点。

image-20230717184455251

运行程序进行调试。

image-20230717184526172

开始运行后,程序会在断点处暂停。点击步进进入方法内部。

image-20230717184639075

进入方法后,在submit();处打上断点,然后点击恢复程序按钮,让代码运行到下一个断点处。

此处代码的作用是判断当前集群状态是否在运行任务,如果集群中有任务,则不允许执行新任务。

image-20230717184718495

image-20230717184835026

然后再点击步入进入到方法内部。

image-20230717185008370

进入到方法内部后,在connect();方法处打上断点,并点击恢复程序按钮,让代码运行到断点处。

image-20230717185127463

然后点击步入进入方法内部

image-20230717185204139

然后在return new Cluster(getConfiguration());处打上断点,并点击恢复程序让代码执行到断点处。

image-20230717185311179

接下来继续点击步入进入Cluster方法内部。然后在clientProtocol = provider.create(conf);处打上断点,并点击恢复程序让代码运行到此处。

image-20230717185449257

继续步入该方法。

image-20230717185516418

可以看到该方法在此处做了一个判断。如果我们在mapreduce.framework.name中设置的内容为yarn则返回一个新的对象YARNRNNER;反之返回null

由于这里我们是在本机上运行任务,所以这个值没有进行配置,这里应该返回null。点击步出返回上一个方法。这个时候再次点击恢复程序又会执行到该方法,原因是因为刚才返回的是null,所以又会重新去获取执行任务的方式。

image-20230717190015645

再次步进到方法内部。

image-20230717190048956

这个时候就刚才不一样了,这里会先去获取mapreduce.framework.name字段的值,如果没有则会给一个默认值local。这个时候该方法会返回LocalJobRunner。然后点击步出返回上一个方法,再点击步过

这个时候clientProtocol字段的值就变成了LocalJobRunner

image-20230717190307358

这个时候就清楚了,执行connect();方法最终是会得到一个Runner任务执行对象。如果是在本地执行任务则会返回一个LocalJobRunner;如果是在集群中执行任务会返回YARNRunner

接下来一直点击,步出直到回到connect();断点处。

image-20230717190621158

接下来在return submitter.submitJobInternal(Job.this, cluster);处打上断点,并依次点击恢复程序步进进入到方法内部。然后在checkSpecs(job)处打上断点。

image-20230717191058014

点击步入进入方法内部,并在output.checkOutputSpecs(job);处打上断点,点击恢复程序

image-20230717191158125

继续点击步入进入方法内部,可以看到在该方法中做了两次判断,作用分别为:

  1. 判断输出文件夹是否指定,如果未指定则抛出异常InvalidJobConfException
  2. 判断输出文件夹是否已存在,如果已存在则抛出异常FileAlreadyExistsException

这就是为什么在执行MR任务时,必须要求指定输出文件夹且输出文件夹不能存在的原因。

image-20230717191233529

然后点击步出回到checkSpecs(job);

image-20230717192002469

接下来在该页面的Path jobStagingArea = JobSubmissionFiles.*getStagingDir*(cluster, conf);JobID jobId = submitClient.getNewJobID();Path submitJobDir = new Path(jobStagingArea, jobId.toString());处打上断点,并依次点击恢复程序然程序运行到Path submitJobDir = new Path(jobStagingArea, jobId.toString());处。

image-20230717192229283

在断点后方可以看到有两个字段,jobStagingAreajobID,在这两个字段的值会拼接成提交任务的地址submitJobDir

image-20230717192615463

点击步过执行Path submitJobDir = new Path(jobStagingArea, jobId.toString());生成提交地址。

image-20230717192936434

如果是在本地执行MR任务,地址的根目录指的是项目路径所在的盘符;如果是在集群中执行的MR任务根目录是在HDFS下。

我的项目在E盘下,所以我的路径在E:\tmp\hadoop\mapred\staging在该目录下已经生成了一个文件夹,文件夹名称和上方一样。

image-20230717194037544

打开该文件夹后,进入到.staging目录下,这个时候目录下是空的。

image-20230717194110544

然后回到IDEAcopyAndConfigureFiles(job, submitJobDir);处打上断点,并点击恢复程序,执行到断点处。

image-20230717194215681

点击步入进入方法,然后在rUploader.uploadResources(job, jobSubmitDir);处打上断点,点击恢复程序

image-20230717194303830

然后继续点击步过

image-20230717194358182

执行完后,在E:\tmp\hadoop\mapred\staging\ming544644833\.staging目录下会生成一个和job_id一致的文件夹。

接下来点击步出回到copyAndConfigureFiles(job, submitJobDir);处。

然后在int maps = writeSplits(job, submitJobDir);处打上断点

image-20230717194508046

点击恢复程序执行到此处。

image-20230717194546169

点击步入进入到方法内部,然后在maps = writeNewSplits(job, jobSubmitDir);处打上断点,点击恢复程序执行到断点处。

image-20230717194634841

接下来继续点击步进,让程序进入方法,然后在List<InputSplit> splits = input.getSplits(job);处打上断点,并点击恢复程序执行到断点处。

image-20230717194832276

该代码的作用就是对输入的数据进行切片。

继续找到该类中的JobSplitWriter.createSplitFiles(jobSubmitDir, conf, jobSubmitDir.getFileSystem(conf), array);处,打上断点并点击恢复程序

image-20230717195301428

然后继续点击步过执行方法。执行完后在E:\tmp\hadoop\mapred\staging\ming544644833\.staging\job_local1592906477_0001目录下就会生成相关文件。

image-20230717195427338

该文件存储了数据的切分结果。

然后点击步出回到int maps = writeSplits(job, submitJobDir);处,并在writeConf(conf, submitJobFile);处打上断点并点击恢复程序执行到断点处。

image-20230717195547489

然后继续点击步进进入方法,在方法内部找到conf.writeXml(out);并打上断点执行恢复程序运行到断点处。

image-20230717195705617

此时再次打开E:\tmp\hadoop\mapred\staging\ming544644833\.staging\job_local1592906477_0001目录,在该目录下生成了配置文件信息。

image-20230717195813024

此时打开job.xml文件内容是空的。继续点击步过执行conf.writeXml(out);

这个时候文件中就有具体的配置内容了。

image-20230717195922718

我们自己并没有配置,为什么这里会有这么多配置项?

原因是因为,有很多配置是Hadoop中默认有的,也会将其写入到配置文件执行任务。

然后点击步出,回到writeConf(conf, submitJobFile);,然后找到status = submitClient.submitJob( jobId, submitJobDir.toString(), job.getCredentials());打上断点并点击恢复程序执行到断点处。

image-20230717200127144

继续点击程序恢复这个时候就会直接在LocalJobRunner中提交任务。

image-20230717200206230

程序运行成功。

集群提交流程分析

集群提交任务与本地类似只是在Configuration中我们配置了mapreduce.framework.nameyarn,导致在Runner返回的是YARNRunner

如下所示,此时条件成立,返回YARNRunner

image-20230717200800188

而且在集群中提交任务时,需要将JAR包也传输到tmp目录下。(任务执行完后会删除)

image-20230717200906647

所有临时文件

image-20230717200946721

执行完后,刷新会显示找不到目录。

image-20230717201020412

切片源码详解

IDEA中搜索FileInputFormat,找到该类的源码然后打开。

image-20230718093330977

注意,这里可能会出现两个,选择包名较长的那个。

然后滑动到最上方可以看到,FileInputFormat是继承自InputFormat的一个抽象类。

image-20230718112110076

然后按住Ctrl点击InputFromat进入抽象类内部。

image-20230718112400758

可以看到该抽象类定义了两个抽象方法getSplitscreateRecordReader,其中getSplits方法就是对数据集进行切片;createRecordReader是对切片后的数据进行读取。

将鼠标光标放到InputFormat类名上,按住Ctrl + H查看该类的继承层次结构。

image-20230718112645767

接下来点击FileInputFormat进入到该类。

image-20230718093807481

该类将InputFormat中的抽象方法getSplit进行了实现,用来对数据集进行切片。

再点击TextInputFormat查看实现类内容。

image-20230718112954204

该实现类对createRecordReader抽线方法进行了重写,用来读取切片后的数据。

然后,回到FileInputFormat这里首先对该类中的切片方法进行深入分析。

首先取消所有断点,然后在long minSize = Math.*max*(getFormatMinSplitSize(), *getMinSplitSize*(job));处打上断点。

image-20230718094958817

image-20230718095015160

关键源码解释如下:

指定最小切片长度

1
2
3
4
5
6
7
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
protected long getFormatMinSplitSize() {
return 1;
}
public static long getMinSplitSize(JobContext job) {
return job.getConfiguration().getLong(SPLIT_MINSIZE, 1L);
}

可以看到getFormatMinSplitSize()getMinSplitSize(job)这两个方法在没有指定SPLIT_MINSIZE参数内容时都为1。所以这里其实就是设置了minSize=1

点击步过得到的结果确实为1

image-20230718095049811

指定最大切片长度

1
2
3
4
5
long maxSize = getMaxSplitSize(job);
public static long getMaxSplitSize(JobContext context) {
return context.getConfiguration().getLong(SPLIT_MAXSIZE,
Long.MAX_VALUE);
}

getMaxSplitSize方法中如果没有指定SPLIT_MAXSIZE参数,那么返回的就是Long.MAX_VALUELong类型的最大值,点击步过生成如下内容。

image-20230718095435186

定义存储切片的ArrayList对象和所有的文件对象

image-20230718095602134

循环处理每一个文件对象

image-20230718095655924

  1. 跳过文件夹对象

    image-20230718095736823

  2. 获取文件路径和文件长度

    image-20230718095801635

  3. 然后是一个判断,当文件内容不是空的时候去对文件内容进行切片;当文件内容是空的时候直接返回一个空的字符串。

    image-20230718100038521

  4. 当文件不是空的时候,内部又有一些关键代码。如下所示,首先判断了一下文件是否可切分,如果不可切分直接就是整个文件是一个切片。

    image-20230718101015034

  5. 当文件可切分时再详细看源码。

    image-20230718101045161

  6. 首先定义了块大小和切片大小。

    1
    2
    3
    4
    5
    6
    long blockSize = file.getBlockSize();
    long splitSize = computeSplitSize(blockSize, minSize, maxSize);
    protected long computeSplitSize(long blockSize, long minSize,
    long maxSize) {
    return Math.max(minSize, Math.min(maxSize, blockSize));
    }

    块大小就是HDFS的默认块大小128M;片大小从源码来看默认也是与块大小一致。

    片大小=块大小:默认情况。

    片大小>块大小:设置MinSize值大于128M

    片大小<块大小:设置MaxSize值小于128M

  7. 接下来是计算剩余文件长度,初始情况下等于文件长度。

    image-20230718101842949

  8. 然后开始进行循环切片,使用的是while循环,循环判断条件为使用剩余文件长度/切片大小如果该值大于SPLIT_SLOP值时,运行循环体内容。

    image-20230718102750389

    SPLIT_SLOP值的作用是控制最小的切片长度,防止最后一个片长度过小被单独切分为一片导致资源浪费。默认情况下为1.1

    例如:现在有一个文件大小为20.5M,设定的切片大小为10M,现在对其进行切片该如何处理。

    首先第一片毫无疑问是10M。关键在于第二片的大小,如果没有设置SPLIT_SLOP参数时,第二片应该为10M,还会再分一个第三片出来0.5M,这个时候第三片太小了,但是还是在后面进行计算的时候还是会使用一个MapTask进行计算,并占用计算资源。而设置了SPLIT_SLOP参数,此时就会先去判断,当前剩余的文件长度是不是比SPLIT_SLOP小,这里最后一片切出来的长度为0.5M小于1.1M,故会将最后的0.5M放入到第二次切片中形成10.5M,这样就会在一定的程度上减少资源浪费。

    在源码的最后还加了依次判断,当上面的循环结束时,剩余的文件大小就小于了SPLIT_SLOP值,但是还是要进一步判断,如果该值为0,就不会再进行后面的操作,文件刚好被完整切分。更多的情况是不等于0,此时就是将后面的剩余文件内容添加到前一个切片中。

至此,文件切片源码解析完毕。

TextInputFormat类

FileInputFormat实现类

思考:在运行MapReduce程序时,输入的文件格式包括:基于行的日志文件、二进制格式文件、数据库表等。那么,针对不同的数据类型,MapReduce是如何读取这些数据的呢?

FileInputFormat常见的接口实现类包括:TextInputFormatKeyValueTextInputFormatNLineInputFormatCombineTextInputFormat和自定义InputFormat等。

TextInputFormat

TextInputFormat是默认的FileInputFormat实现类。按行读取每条记录。key是存储该行在整个文件中的起始字节偏移量, LongWritable类型。value是这行的内容,不包括任何行终止符(换行符和回车符),Text类型。

这也就是为什么我们在完成单词统计案例时,在Mapper中的前两个泛型必须指定LongWritableText的原因。

image-20230718103513023

CombineTextInputFormat类

切片机制

前面讲过使用的FileInputFormat默认实现类是TextInputFormat,但是在一些特殊的场景下有一些其他的实现类也比较有用,例如CombineTextInputFormat它是专门用来处理元数据中有大量的小文件的情况。

当文件都是小文件时,认的情况下TextInputFormat,将每一个小文件作为一个切片,而一个切片在Mapper阶段会对应一个MapTask,这样会造成资源浪费,效率低下。

CombineTextInputFormat则不一样,它具有独特的切片机制,当设置的CombineTextInputFormat.setMaxInputSplitSize参数值大于总文件大小时,会将所有文件放入一个切片中,这样使用一个MapTask就可以完成任务,节省计算资源并提高效率。

CombineTextInputFormat的切片机制分为两个步骤。

  1. 虚拟存储过程。

    例如设置CombineTextInputFormat大小为4M

    虚拟文件生成规律为:当文件大小在[0, 4M]时单独一个文件;当文件大小在[4M, 8M]时平均分为两个小文件;当文件大小在[8M, +∞]时,大于8M的部分全部分为若干个4M,一直到当剩余文件大小在[4M, 8M]时,在平均分为两个小文件。

  2. 切片过程。

    当文件大于等于4M时,单独形成一个切片;当文件小于4M时,和后面的文件一起形成一个切片,直到累加的文件大小大于等于4M

例如现在有4个小文件,如下所示:

文件名 文件大小
a.txt 1.2M
b.txt 5M
c.txt 7M
d.txt 11M
  • 虚拟存储过程

    最终存储的文件为:

    a.txt1.2M

    b.txt2.5M + 2.5M

    c.txt3.5M + 3.5M

    d.txt4M + 3.5M + 3.5M

  • 切片过程

    片11.2M + 2.5M + 2.5M= 6.2M

    片23.5M + 3.5M = 7M

    片34M

    片43.5M + 3.5M = 7M

案例实现演练

首先在input文件夹下生成41KB的小文件,然后执行程序。

image-20230718111327565

可以看到,使用默认的TextInputFormat生成的是4个切片。

接下来在WCDriver中做出以下配置。

1
2
3
4
// 设置虚拟存储切片最大值为:4M
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);
// 设置使用CombineTextInputFormat。如果不设置默认使用的是TextInputFormat
job.setInputFormatClass(CombineTextInputFormat.class);

image-20230718111516433

然后删除output目录重新运行项目。

image-20230718111550867

这个时候切片的数据就变成了1

Map阶段工作流程

接下来对前面的过程进行细化如下所示:

Input ---> Mapper过程如下:

image-20230718115333369

Shuffer机制

Shuffer概述

前面我们在讨论MR的工作原理都是直接说,分为Map阶段和Reduce阶段,经过自己写WCCount项目时可以看出,其实自己在写代码的时候,只是写了一个MapperReducer代码,但是大家应该都可以发现我们Reducer类传入的参数和我们在Mapper类中输出的内容不一致。它似乎是经过了进一步的处理,将相同的Key放到了一组中,然后放入我们的Reducer代码中,进行Reduce操作。这中间的过程是框架帮我们做的,那么框架在这中间到底是做了一些什么事情呢?这个就是我们本节要讨论的内容。

  • Map方法之后,Reduce方法之前的数据处理过程称之为Shuffle

整个Shuffer机制可以被分为以下几个步骤:

Map ---> Sort ---> Cpoy ---> Sort ---> Reduce

  1. Map ---> Sort

    Map阶段结束后,会将Mapper中的数据传入到一个环形缓冲区中按照一定的规则进行分区,将原始数据分为不同的区,后续在进行Reduce操作时,一个分区会对应一个ReudceTask

    之所以称为环形缓冲区,原因是因为在此处存储的数据的放入内存中的,而内存资源是有限的,环形缓冲区默认的内存是100M,当写入的数据达到80%时会执行两步操作:①倒转数据写入的方向,之前是正向的现在就变成反向。②将前面正向写的内容存储到本地文件,同时清空前面正向内存中存储的数据。

    当反向又达到80%时,又会倒转数据写入的方向,依次内推。

    当内存中的数据写入到硬盘后会执行,排序操作(快速排序)。这样当所有的数据都读取完后就会在硬盘中写入多个排好序的临时文件。

    image-20230718150548888

    这个时候再对所有的临时文件中的数据做归并排序,将分区1的数据和分区2的数据进行排序汇总。

    image-20230718151007462

  2. Sort ---> Cpoy

    上述方法是一个MapTask会生成一个当前这个MapTask的汇总数据。

    在进入Reduce之前还会将前面所有的数据读取到内存缓冲中,内存缓冲区不够就会往硬盘中存储,此时是按照不同的分区去读取,所有的相同分区被读取到一个文件中。

    image-20230718151738638

  3. Cpoy ---> Sort

    前面的过程只是机械的将不同的分区内容放到一起,虽然在前面的文件中都是排序好的结果,但是到这里汇总后的结果却变成了无序的,需要进一步做归并排序。

    image-20230718151933129

  4. Sort ---> Reduce

    最后再将排序后的结果,按照相同的Key值为一组放入到Reduce任务中。

image-20230718152131415

前面的内容是对Shuffer整个过程进行分步解析,接下来举一个实际的例子,通过这个实际的例子讲解会加深大家对Shuffer工作机制的理解。

绘制成图如下所示:

image-20230718155023010

image-20230718155202787

源码分析

新建一个包partition并复制前面做的项目WCCount代码

然后在WCDriver处配置一下,ReduceTask的数量为2

1
2
// 设置ReduceTask的数量
job.setNumReduceTasks(2);

image-20230718160756592

接下来在WCMapper中的context.write(outKey, outValue);处打上断点(为了防止在不必要的位置暂停程序运行,建议将之前打的所有断点取消),然后启动项目调试。

image-20230718160306994

点击步入进入方法内部。

image-20230718160247889

继续点击步入进入方法内部。

image-20230718160347224

再继续点击步入进入方法内部。

image-20230718160412686

在这里我们可以看到写出了三个值,keyvaluepartitioner。分别表示键、值和分区号。

继续步入进入到partitioner.getPartition(key, value, partitions)方法内部。

image-20230718160549831

可以看到在这里使用keyhashCode值对ReduceTask的数量取了一个模,其实就是得到了不同的分区号,这个地方就是对数据集进行分区的过程。

前面我们设置了ReduceTask的数量为2,那么这里对2取模实际上就是返回了01这两个值。

然后释放所有断点,让程序执行完。在output文件夹中可以看到有2个输出文件。

image-20230718161400609

其实就是每一个ReduceTask会生成一个输出文件。

image-20230718161516447

这里设置了ReduceTask的数量为2,这里就会对数据进行2个分区。默认的情况下是将所有的数据放入一个分区,这个也是可以通过源码来查看的。

将前面设置的ReduceTask数量注释掉,重新调试代码。

image-20230718161136924

此时由于partitions1,所以返回的结果永远都是0,表示将所有的数据放入一个分区。

自定义分区

前面我们将ReduceTask数量设置为2时,生成了两个文件,两个文件中的内容不一致,这个分组的依据默认是通过对应keyhashCode值去对2取模得到的结果,这个结果没有什么规律性,只能够达到对原始数据分区的目的。那么我们能够自定义分区逻辑,让不同的文件中存储我们不同的内容,让分区的结果更有意义呢?

项目1:根据区号将前面的流量统计案例分成不同的文件

  • 136137138139开头的手机号分别放入4个文件中,其他的手机号放入一个文件中。

这个时候就需要重新定义一个自定义分区类,这个类需要继承Partitioner抽象类,并实现其getPartition抽象方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package com.tipdm.mr.partition2;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class MyPartition extends Partitioner<Text, FlowBean> {
@Override
public int getPartition(Text text, FlowBean flowBean, int numPartitions) {
String string = text.toString();
if (string.startsWith("136")){
return 0;
}else if (string.startsWith("137")) {
return 1;
}else if (string.startsWith("138")) {
return 2;
}else if (string.startsWith("139")) {
return 3;
}else{
return 4;
}
}
}

注意Shuffer是在Mapper结束后执行的,所以Partitioner的泛型为前面Mapper阶段输出的参数类型。

image-20230718162806651

定义好自定义分区类后,还需要再Driver类中进行配置才可以生效。

  1. 设置ReduceTask数量为5

    1
    2
    // 设置ReduceTask的数量为5
    job.setNumReduceTasks(5);
  2. 配置自定义分区类

    1
    2
    // 配置自定义的分区类
    job.setPartitionerClass(MyPartition.class);

image-20230718163031716

此时运行项目,在output文件夹下就会生成5个输出文件。

image-20230718163142291

打开part-r-00000,按照需求应该生成136开头的手机号数据。

image-20230718163227796

数据没问题,其他文件就不再一一展示。

项目2:WCCount项目中根据起始位置字母不同放入不同文件

  • a-p放入一个文件,将p-z放入另一个文件。将WordCount中的代码复制到一个新的包partition3下。

重新定义一个自定义分区类,这个类需要继承Partitioner抽象类,并实现其getPartition抽象方法。

如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package com.tipdm.mr.partition3;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class MyPartition extends Partitioner<Text, LongWritable> {
@Override
public int getPartition(Text text, LongWritable longWritable, int numPartitions) {
String string = text.toString().toLowerCase();
char shart_char = string.toCharArray()[0];
if (shart_char >= 97 && shart_char <= 112){
return 0;
}else{
return 1;
}
}
}

同样需要在Driver类中配置ReduceTask数量和自定义的分区类。

image-20230718164513317

WritableComparable排序

排序是MapReduce框架中最重要的操作之一。

MapTaskReduceTask均会对数据按照key进行排序。该操作属于Hadoop的默认行为。任何应用程序中的数据均会被排序,而不管逻辑上是否需要。

默认排序是按照字典顺序排序,且实现该排序的方法是快速排序。

对于MapTask,它会将处理的结果暂时放到环形缓冲区中,当环形缓冲区使用率达到一定阈值后,再对缓冲区中的数据进行一次快速排序,并将这些有序数据溢写到磁盘上,而当数据处理完毕后,它会对磁盘上所有文件进行归并排序。

对于ReduceTask,它从每个MapTask上远程拷贝相应的数据文件,如果文件大小超过一定阈值,则溢写磁盘上,否则存储在内存中。如果磁盘上文件数目达到一定阈值,则进行一次归并排序以生成一个更大文件;如果内存中文件大小或者数目超过一定阈值,则进行一次合并后将数据溢写到磁盘上。当所有数据拷贝完毕后,ReduceTask统一对内存和磁盘上的所有数据进行一次归并排序。

自定义排序WritableComparable

bean对象做为key传输,需要实现WritableComparable接口重写compareTo方法,就可以实现排序。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package com.tipdm.mr.compare;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class FlowBean implements WritableComparable<FlowBean> {
private long upFlow;
private long downFlow;
private long sumFlow;

public FlowBean() {
}

// 按照总流量升序排列
@Override
public int compareTo(FlowBean o) {
return Long.compare(this.sumFlow, o.sumFlow);
}

@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}

@Override
public void readFields(DataInput in) throws IOException {
upFlow = in.readLong();
downFlow = in.readLong();
sumFlow = in.readLong();
}

public long getUpFlow() {
return upFlow;
}

public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}

public long getDownFlow() {
return downFlow;
}

public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}

public long getSumFlow() {
return sumFlow;
}

public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
}

@Override
public String toString() {
return upFlow + " " + downFlow + " " + sumFlow;
}
}

案例1:对前面的流量分析统计结果,进行排序输出

  • 对总流量按照升序排列。

image-20230718180719140

文件内容如图所示。

  • FlowBean

    将上一节定义的FlowBean复制过来。

  • Mapper

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    package com.tipdm.mr.compare2;

    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;

    import java.io.IOException;

    public class FlowMapper extends Mapper<LongWritable, Text, FlowBean, Text> {
    private FlowBean outKey = new FlowBean();
    private Text outValue = new Text();
    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, FlowBean, Text>.Context context) throws IOException, InterruptedException {
    String inKey = value.toString();
    String[] all_string = inKey.split("\t");
    System.out.println(all_string.toString());
    outKey.setUpFlow(Long.parseLong(all_string[1]));
    outKey.setDownFlow(Long.parseLong(all_string[2]));
    outKey.setSumFlow(Long.parseLong(all_string[3]));

    outValue.set(all_string[0]);
    context.write(outKey, outValue);
    }
    }
  • Reducer

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    package com.tipdm.mr.compare2;

    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;

    import java.io.IOException;

    public class FlowReducer extends Reducer<FlowBean, Text, Text, FlowBean> {
    @Override
    protected void reduce(FlowBean key, Iterable<Text> values, Reducer<FlowBean, Text, Text, FlowBean>.Context context) throws IOException, InterruptedException {
    for (Text value : values) {
    context.write(value, key);
    }
    }
    }
  • Driver

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    package com.tipdm.mr.compare2;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

    import java.io.IOException;

    public class FlowDriver {
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf);

    job.setJarByClass(FlowDriver.class);
    job.setMapperClass(FlowMapper.class);
    job.setReducerClass(FlowReducer.class);
    job.setMapOutputKeyClass(FlowBean.class);
    job.setMapOutputValueClass(Text.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(FlowBean.class);
    FileInputFormat.setInputPaths(job, new Path("..\\ioText\\input3"));
    FileOutputFormat.setOutputPath(job, new Path("..\\ioText\\output"));

    job.waitForCompletion(true);
    }
    }

    注意:此处不能完全把前面的代码搬过来,需要修改Mapper阶段的输出数据类型。

结果展示:

image-20230718181034218

此时已经可以按照总流量进行升序排列。

案例2:基于前一个需求,增加自定义分区类,分区按照省份手机号设置

  • 在前面案例的基础上增加一个自定义分区类MyPartition

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    package com.tipdm.mr.compare3;

    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Partitioner;

    public class MyPartition extends Partitioner<FlowBean, Text> {
    @Override
    public int getPartition(FlowBean flowBean, Text text, int numPartitions) {
    String string = text.toString();
    if (string.startsWith("136")){
    return 0;
    }else if (string.startsWith("137")) {
    return 1;
    }else if (string.startsWith("138")) {
    return 2;
    }else if (string.startsWith("139")) {
    return 3;
    }else{
    return 4;
    }
    }
    }
  • 并在Driver类中配置自定义分区

    1
    2
    job.setNumReduceTasks(5);
    job.setPartitionerClass(MyPartition.class);

结果展示:

image-20230718181656562

image-20230718181711971

Combiner合并

Combiner概述

  1. CombinerMR程序中MapperReducer之外的一种组件。
  2. Combiner组件的父类就是Reducer
  3. CombinerReducer的区别在于运行的位置。
    • Combiner是在每一个MapTask所在的节点运行;
    • Reducer是接收全局所有Mapper的输出结果。
  4. Combiner的意义就是对每一个MapTask的输出进行局部汇总,以减小网络传输量。
  5. Combiner能够应用的前提是不能影响最终的业务逻辑,而且,Combiner的输出kv应该跟Reducer的输入kv类型要对应起来。

原理图

Combiner作用的地点在Copy阶段

image-20230718184307145

这里如果是将所有数据全部拷贝到ReduceTask再进行汇总,会造成大量的网络IO,因为ReduceTaskMapTask不一定是在一个机器上。

使用Combiner后会提前在MapTask上进行统计每个MapTask的结果后,直接将结果传输到ReduceTask。即改进成如下图所示方式进行网络传输。

image-20230718185235980

注意:MR默认是没有开启Combiner的,原始因为不是所有的任务都能够使用Combiner,有的任务使用Combiner会导致结果出问题。

例如:求均值任务。

image-20230718185908630

此时即使仍然能降低网络IO,但是由于结果计算错误,不能使用Combiner

案例演示

直接将前面的WCCount案例代码复制过来,然后将input中的文件内容手动复制多一点。我这里是手动复制了74万行的数据,接下来再次去执行MR项目。

image-20230718190242430

首先直接执行,不修改项目内容。

image-20230718190945941

Map阶段输出的物理化字节数为18335873

接下来设置Combiner,再次运行项目。

image-20230718190658681

由于Combiner的代码是与Reducer代码一致,简单起见直接使用Reducer的代码。

image-20230718190907900

这个时候Map阶段输出的物理化字节数减少到了181,极大的减少了网络IO

-------------本文结束感谢您的阅读-------------