MapReduce的构成
从流的角度
Input ---> InputFormat --->Mapper--->Shuffle--->Reducer--->OutputFormat--->Output
从不同的阶段
Map阶段---->Reduce阶段
Map阶段 ---> Shuffle(Map后半段+Reduce前半段)--->Reduce阶段
从源码的角度
MapTask
:map ---> sort
1 | mapPhase = getProgress().addPhase("map", 0.667f); |
ReduceTask
: copy ---> sort ---> reduce
1 | copyPhase = getProgress().addPhase("copy"); |
InputFormat
切片与MapTask并行度决定机制
问题引出
MapTask
的并行度决定Map
阶段的任务处理并发度,进而影响到整个Job
的处理速度。思考:
1G
的数据,启动8
个MapTask
,可以提高集群的并发处理能力。那么1K
的数据,也启动8
个MapTask
,会提高集群性能吗?MapTask
并行任务是否越多越好呢?哪些因素影响了MapTask
并行度?MapTask
并行度决定机制数据块:
Block
是HDFS
物理上把数据分成一块一块。数据块是HDFS
存储数据单位。数据切片:数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储。数据切片是
MapReduce
程序计算输入数据的单位,一个切片会对应启动一个MapTask
。一般来说,为了提高集群整体运行效率,降低在计算过程中网络带宽的占用。切片的大小默认与
HDFS
中的Block
大小一致。
Job提交流程源码
本地提交流程分析
在此采用断点方式进行分析,断点分析能力需要掌握。
在WCDriver
中的job.waitForCompletion(true);
处打上断点。
运行程序进行调试。
开始运行后,程序会在断点处暂停。点击步进
进入方法内部。
进入方法后,在submit();
处打上断点,然后点击恢复程序
按钮,让代码运行到下一个断点处。
此处代码的作用是判断当前集群状态是否在运行任务,如果集群中有任务,则不允许执行新任务。
然后再点击步入
进入到方法内部。
进入到方法内部后,在connect();
方法处打上断点,并点击恢复程序
按钮,让代码运行到断点处。
然后点击步入
进入方法内部
然后在return new Cluster(getConfiguration());
处打上断点,并点击恢复程序
让代码执行到断点处。
接下来继续点击步入
进入Cluster
方法内部。然后在clientProtocol = provider.create(conf);
处打上断点,并点击恢复程序
让代码运行到此处。
继续步入
该方法。
可以看到该方法在此处做了一个判断。如果我们在mapreduce.framework.name
中设置的内容为yarn
则返回一个新的对象YARNRNNER
;反之返回null
。
由于这里我们是在本机上运行任务,所以这个值没有进行配置,这里应该返回null
。点击步出
返回上一个方法。这个时候再次点击恢复程序
又会执行到该方法,原因是因为刚才返回的是null
,所以又会重新去获取执行任务的方式。
再次步进
到方法内部。
这个时候就刚才不一样了,这里会先去获取mapreduce.framework.name
字段的值,如果没有则会给一个默认值local
。这个时候该方法会返回LocalJobRunner
。然后点击步出
返回上一个方法,再点击步过
。
这个时候clientProtocol
字段的值就变成了LocalJobRunner
。
这个时候就清楚了,执行
connect();
方法最终是会得到一个Runner
任务执行对象。如果是在本地执行任务则会返回一个LocalJobRunner
;如果是在集群中执行任务会返回YARNRunner
。
接下来一直点击,步出
直到回到connect();
断点处。
接下来在return submitter.submitJobInternal(Job.this, cluster);
处打上断点,并依次点击恢复程序
和步进
进入到方法内部。然后在checkSpecs(job)
处打上断点。
点击步入
进入方法内部,并在output.checkOutputSpecs(job);
处打上断点,点击恢复程序
。
继续点击步入
进入方法内部,可以看到在该方法中做了两次判断,作用分别为:
- 判断输出文件夹是否指定,如果未指定则抛出异常
InvalidJobConfException
。 - 判断输出文件夹是否已存在,如果已存在则抛出异常
FileAlreadyExistsException
。
这就是为什么在执行
MR
任务时,必须要求指定输出文件夹且输出文件夹不能存在的原因。
然后点击步出
回到checkSpecs(job);
接下来在该页面的Path jobStagingArea = JobSubmissionFiles.*getStagingDir*(cluster, conf);
、JobID jobId = submitClient.getNewJobID();
和Path submitJobDir = new Path(jobStagingArea, jobId.toString());
处打上断点,并依次点击恢复程序
然程序运行到Path submitJobDir = new Path(jobStagingArea, jobId.toString());
处。
在断点后方可以看到有两个字段,jobStagingArea
和jobID
,在这两个字段的值会拼接成提交任务的地址submitJobDir
。
点击步过
执行Path submitJobDir = new Path(jobStagingArea, jobId.toString());
生成提交地址。
如果是在本地执行
MR
任务,地址的根目录指的是项目路径所在的盘符;如果是在集群中执行的MR
任务根目录是在HDFS
下。
我的项目在E
盘下,所以我的路径在E:\tmp\hadoop\mapred\staging
在该目录下已经生成了一个文件夹,文件夹名称和上方一样。
打开该文件夹后,进入到.staging
目录下,这个时候目录下是空的。
然后回到IDEA
在copyAndConfigureFiles(job, submitJobDir);
处打上断点,并点击恢复程序
,执行到断点处。
点击步入
进入方法,然后在rUploader.uploadResources(job, jobSubmitDir);
处打上断点,点击恢复程序
。
然后继续点击步过
。
执行完后,在E:\tmp\hadoop\mapred\staging\ming544644833\.staging
目录下会生成一个和job_id
一致的文件夹。
接下来点击步出
回到copyAndConfigureFiles(job, submitJobDir);
处。
然后在int maps = writeSplits(job, submitJobDir);
处打上断点
点击恢复程序
执行到此处。
点击步入
进入到方法内部,然后在maps = writeNewSplits(job, jobSubmitDir);
处打上断点,点击恢复程序
执行到断点处。
接下来继续点击步进
,让程序进入方法,然后在List<InputSplit> splits = input.getSplits(job);
处打上断点,并点击恢复程序
执行到断点处。
该代码的作用就是对输入的数据进行切片。
继续找到该类中的JobSplitWriter.createSplitFiles(jobSubmitDir, conf, jobSubmitDir.getFileSystem(conf), array);
处,打上断点并点击恢复程序
。
然后继续点击步过
执行方法。执行完后在E:\tmp\hadoop\mapred\staging\ming544644833\.staging\job_local1592906477_0001
目录下就会生成相关文件。
该文件存储了数据的切分结果。
然后点击步出回到int maps = writeSplits(job, submitJobDir);
处,并在writeConf(conf, submitJobFile);
处打上断点并点击恢复程序
执行到断点处。
然后继续点击步进
进入方法,在方法内部找到conf.writeXml(out);
并打上断点执行恢复程序
运行到断点处。
此时再次打开E:\tmp\hadoop\mapred\staging\ming544644833\.staging\job_local1592906477_0001
目录,在该目录下生成了配置文件信息。
此时打开job.xml文件内容是空的。继续点击步过
执行conf.writeXml(out);
。
这个时候文件中就有具体的配置内容了。
我们自己并没有配置,为什么这里会有这么多配置项?
原因是因为,有很多配置是
Hadoop
中默认有的,也会将其写入到配置文件执行任务。
然后点击步出
,回到writeConf(conf, submitJobFile);
,然后找到status = submitClient.submitJob(
jobId, submitJobDir.toString(), job.getCredentials());
打上断点并点击恢复程序
执行到断点处。
继续点击程序恢复
这个时候就会直接在LocalJobRunner
中提交任务。
程序运行成功。
集群提交流程分析
集群提交任务与本地类似只是在Configuration
中我们配置了mapreduce.framework.name
为yarn
,导致在Runner
返回的是YARNRunner
。
如下所示,此时条件成立,返回YARNRunner
。
而且在集群中提交任务时,需要将JAR
包也传输到tmp
目录下。(任务执行完后会删除)
所有临时文件
执行完后,刷新会显示找不到目录。
切片源码详解
在IDEA
中搜索FileInputFormat
,找到该类的源码然后打开。
注意,这里可能会出现两个,选择包名较长的那个。
然后滑动到最上方可以看到,FileInputFormat
是继承自InputFormat
的一个抽象类。
然后按住Ctrl
点击InputFromat
进入抽象类内部。
可以看到该抽象类定义了两个抽象方法getSplits
和createRecordReader
,其中getSplits
方法就是对数据集进行切片;createRecordReader
是对切片后的数据进行读取。
将鼠标光标放到InputFormat
类名上,按住Ctrl + H
查看该类的继承层次结构。
接下来点击FileInputFormat
进入到该类。
该类将InputFormat
中的抽象方法getSplit
进行了实现,用来对数据集进行切片。
再点击TextInputFormat
查看实现类内容。
该实现类对createRecordReader
抽线方法进行了重写,用来读取切片后的数据。
然后,回到FileInputFormat
这里首先对该类中的切片方法进行深入分析。
首先取消所有断点,然后在long minSize = Math.*max*(getFormatMinSplitSize(), *getMinSplitSize*(job));
处打上断点。
关键源码解释如下:
指定最小切片长度
1 | long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); |
可以看到getFormatMinSplitSize()
和getMinSplitSize(job)
这两个方法在没有指定SPLIT_MINSIZE
参数内容时都为1
。所以这里其实就是设置了minSize=1
。
点击步过
得到的结果确实为1
。
指定最大切片长度
1 | long maxSize = getMaxSplitSize(job); |
在getMaxSplitSize
方法中如果没有指定SPLIT_MAXSIZE
参数,那么返回的就是Long.MAX_VALUE
是Long
类型的最大值,点击步过
生成如下内容。
定义存储切片的ArrayList对象和所有的文件对象
循环处理每一个文件对象
跳过文件夹对象
获取文件路径和文件长度
然后是一个判断,当文件内容不是空的时候去对文件内容进行切片;当文件内容是空的时候直接返回一个空的字符串。
当文件不是空的时候,内部又有一些关键代码。如下所示,首先判断了一下文件是否可切分,如果不可切分直接就是整个文件是一个切片。
当文件可切分时再详细看源码。
首先定义了块大小和切片大小。
1
2
3
4
5
6long blockSize = file.getBlockSize();
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
protected long computeSplitSize(long blockSize, long minSize,
long maxSize) {
return Math.max(minSize, Math.min(maxSize, blockSize));
}块大小就是
HDFS
的默认块大小128M
;片大小从源码来看默认也是与块大小一致。片大小=块大小
:默认情况。片大小>块大小
:设置MinSize
值大于128M
。片大小<块大小
:设置MaxSize
值小于128M
。接下来是计算剩余文件长度,初始情况下等于文件长度。
然后开始进行循环切片,使用的是
while
循环,循环判断条件为使用剩余文件长度/切片大小
如果该值大于SPLIT_SLOP
值时,运行循环体内容。SPLIT_SLOP
值的作用是控制最小的切片长度,防止最后一个片长度过小被单独切分为一片导致资源浪费。默认情况下为1.1
。例如:现在有一个文件大小为
20.5M
,设定的切片大小为10M
,现在对其进行切片该如何处理。首先第一片毫无疑问是
10M
。关键在于第二片的大小,如果没有设置SPLIT_SLOP
参数时,第二片应该为10M
,还会再分一个第三片出来0.5M
,这个时候第三片太小了,但是还是在后面进行计算的时候还是会使用一个MapTask
进行计算,并占用计算资源。而设置了SPLIT_SLOP
参数,此时就会先去判断,当前剩余的文件长度是不是比SPLIT_SLOP
小,这里最后一片切出来的长度为0.5M
小于1.1M
,故会将最后的0.5M
放入到第二次切片中形成10.5M
,这样就会在一定的程度上减少资源浪费。在源码的最后还加了依次判断,当上面的循环结束时,剩余的文件大小就小于了
SPLIT_SLOP
值,但是还是要进一步判断,如果该值为0
,就不会再进行后面的操作,文件刚好被完整切分。更多的情况是不等于0
,此时就是将后面的剩余文件内容添加到前一个切片中。
至此,文件切片源码解析完毕。
TextInputFormat类
FileInputFormat实现类
思考:在运行MapReduce程序时,输入的文件格式包括:基于行的日志文件、二进制格式文件、数据库表等。那么,针对不同的数据类型,MapReduce是如何读取这些数据的呢?
FileInputFormat
常见的接口实现类包括:TextInputFormat
、KeyValueTextInputFormat
、NLineInputFormat
、CombineTextInputFormat
和自定义InputFormat
等。
TextInputFormat
TextInputFormat
是默认的FileInputFormat
实现类。按行读取每条记录。key
是存储该行在整个文件中的起始字节偏移量, LongWritable
类型。value
是这行的内容,不包括任何行终止符(换行符和回车符),Text
类型。
这也就是为什么我们在完成单词统计案例时,在Mapper
中的前两个泛型必须指定LongWritable
和Text
的原因。
CombineTextInputFormat类
切片机制
前面讲过使用的FileInputFormat
默认实现类是TextInputFormat
,但是在一些特殊的场景下有一些其他的实现类也比较有用,例如CombineTextInputFormat
它是专门用来处理元数据中有大量的小文件的情况。
当文件都是小文件时,认的情况下TextInputFormat
,将每一个小文件作为一个切片,而一个切片在Mapper
阶段会对应一个MapTask
,这样会造成资源浪费,效率低下。
而CombineTextInputFormat
则不一样,它具有独特的切片机制,当设置的CombineTextInputFormat.setMaxInputSplitSize
参数值大于总文件大小时,会将所有文件放入一个切片中,这样使用一个MapTask
就可以完成任务,节省计算资源并提高效率。
CombineTextInputFormat
的切片机制分为两个步骤。
虚拟存储过程。
例如设置
CombineTextInputFormat
大小为4M
。虚拟文件生成规律为:当文件大小在
[0, 4M]
时单独一个文件;当文件大小在[4M, 8M]
时平均分为两个小文件;当文件大小在[8M, +∞]
时,大于8M的部分全部分为若干个4M
,一直到当剩余文件大小在[4M, 8M]
时,在平均分为两个小文件。切片过程。
当文件大于等于
4M
时,单独形成一个切片;当文件小于4M
时,和后面的文件一起形成一个切片,直到累加的文件大小大于等于4M
。
例如现在有4个小文件,如下所示:
文件名 文件大小 a.txt
1.2M
b.txt
5M
c.txt
7M
d.txt
11M
虚拟存储过程
最终存储的文件为:
a.txt
:1.2M
b.txt
:2.5M + 2.5M
c.txt
:3.5M + 3.5M
d.txt
:4M + 3.5M + 3.5M
切片过程
片1
:1.2M + 2.5M + 2.5M= 6.2M
片2
:3.5M + 3.5M = 7M
片3
:4M
片4
:3.5M + 3.5M = 7M
案例实现演练
首先在input
文件夹下生成4
个1KB
的小文件,然后执行程序。
可以看到,使用默认的TextInputFormat
生成的是4
个切片。
接下来在WCDriver
中做出以下配置。
1 | // 设置虚拟存储切片最大值为:4M |
然后删除output
目录重新运行项目。
这个时候切片的数据就变成了1
。
Map阶段工作流程
接下来对前面的过程进行细化如下所示:
Input ---> Mapper
过程如下:
Shuffer机制
Shuffer概述
前面我们在讨论MR
的工作原理都是直接说,分为Map
阶段和Reduce
阶段,经过自己写WCCount
项目时可以看出,其实自己在写代码的时候,只是写了一个Mapper
和Reducer
代码,但是大家应该都可以发现我们Reducer
类传入的参数和我们在Mapper
类中输出的内容不一致。它似乎是经过了进一步的处理,将相同的Key
放到了一组中,然后放入我们的Reducer
代码中,进行Reduce
操作。这中间的过程是框架帮我们做的,那么框架在这中间到底是做了一些什么事情呢?这个就是我们本节要讨论的内容。
Map
方法之后,Reduce
方法之前的数据处理过程称之为Shuffle
。
整个Shuffer
机制可以被分为以下几个步骤:
Map ---> Sort ---> Cpoy ---> Sort ---> Reduce
Map ---> Sort
Map
阶段结束后,会将Mapper
中的数据传入到一个环形缓冲区中按照一定的规则进行分区,将原始数据分为不同的区,后续在进行Reduce
操作时,一个分区会对应一个ReudceTask
。之所以称为环形缓冲区,原因是因为在此处存储的数据的放入内存中的,而内存资源是有限的,环形缓冲区默认的内存是
100M
,当写入的数据达到80%
时会执行两步操作:①倒转数据写入的方向,之前是正向的现在就变成反向。②将前面正向写的内容存储到本地文件,同时清空前面正向内存中存储的数据。当反向又达到80%时,又会倒转数据写入的方向,依次内推。
当内存中的数据写入到硬盘后会执行,排序操作(快速排序)。这样当所有的数据都读取完后就会在硬盘中写入多个排好序的临时文件。
这个时候再对所有的临时文件中的数据做归并排序,将
分区1
的数据和分区2
的数据进行排序汇总。Sort ---> Cpoy
上述方法是一个
MapTask
会生成一个当前这个MapTask
的汇总数据。在进入
Reduce
之前还会将前面所有的数据读取到内存缓冲中,内存缓冲区不够就会往硬盘中存储,此时是按照不同的分区去读取,所有的相同分区被读取到一个文件中。Cpoy ---> Sort
前面的过程只是机械的将不同的分区内容放到一起,虽然在前面的文件中都是排序好的结果,但是到这里汇总后的结果却变成了无序的,需要进一步做归并排序。
Sort ---> Reduce
最后再将排序后的结果,按照相同的
Key
值为一组放入到Reduce
任务中。
前面的内容是对
Shuffer
整个过程进行分步解析,接下来举一个实际的例子,通过这个实际的例子讲解会加深大家对Shuffer
工作机制的理解。绘制成图如下所示:
源码分析
新建一个包partition
并复制前面做的项目WCCount
代码
然后在WCDriver
处配置一下,ReduceTask
的数量为2
。
1 | // 设置ReduceTask的数量 |
接下来在WCMapper
中的context.write(outKey, outValue);
处打上断点(为了防止在不必要的位置暂停程序运行,建议将之前打的所有断点取消),然后启动项目调试。
点击步入
进入方法内部。
继续点击步入
进入方法内部。
再继续点击步入
进入方法内部。
在这里我们可以看到写出了三个值,key
、value
和partitioner
。分别表示键、值和分区号。
继续步入
进入到partitioner.getPartition(key, value, partitions)
方法内部。
可以看到在这里使用key
的hashCode
值对ReduceTask
的数量取了一个模,其实就是得到了不同的分区号,这个地方就是对数据集进行分区的过程。
前面我们设置了
ReduceTask
的数量为2
,那么这里对2
取模实际上就是返回了0
和1
这两个值。
然后释放所有断点,让程序执行完。在output
文件夹中可以看到有2
个输出文件。
其实就是每一个ReduceTask
会生成一个输出文件。
这里设置了ReduceTask
的数量为2,这里就会对数据进行2个分区。默认的情况下是将所有的数据放入一个分区,这个也是可以通过源码来查看的。
将前面设置的ReduceTask
数量注释掉,重新调试代码。
此时由于partitions
为1
,所以返回的结果永远都是0
,表示将所有的数据放入一个分区。
自定义分区
前面我们将ReduceTask
数量设置为2时,生成了两个文件,两个文件中的内容不一致,这个分组的依据默认是通过对应key
的hashCode
值去对2取模得到的结果,这个结果没有什么规律性,只能够达到对原始数据分区的目的。那么我们能够自定义分区逻辑,让不同的文件中存储我们不同的内容,让分区的结果更有意义呢?
项目1:根据区号将前面的流量统计案例分成不同的文件
136
、137
、138
、139
开头的手机号分别放入4个文件中,其他的手机号放入一个文件中。
这个时候就需要重新定义一个自定义分区类,这个类需要继承Partitioner
抽象类,并实现其getPartition
抽象方法。
1 | package com.tipdm.mr.partition2; |
注意
Shuffer
是在Mapper
结束后执行的,所以Partitioner
的泛型为前面Mapper
阶段输出的参数类型。
定义好自定义分区类后,还需要再Driver
类中进行配置才可以生效。
设置
ReduceTask
数量为5
。1
2// 设置ReduceTask的数量为5
job.setNumReduceTasks(5);配置自定义分区类
1
2// 配置自定义的分区类
job.setPartitionerClass(MyPartition.class);
此时运行项目,在output
文件夹下就会生成5
个输出文件。
打开part-r-00000
,按照需求应该生成136
开头的手机号数据。
数据没问题,其他文件就不再一一展示。
项目2:WCCount项目中根据起始位置字母不同放入不同文件
- 将
a-p
放入一个文件,将p-z
放入另一个文件。将WordCount
中的代码复制到一个新的包partition3
下。
重新定义一个自定义分区类,这个类需要继承Partitioner
抽象类,并实现其getPartition
抽象方法。
如下所示:
1 | package com.tipdm.mr.partition3; |
同样需要在Driver
类中配置ReduceTask
数量和自定义的分区类。
WritableComparable排序
排序是MapReduce
框架中最重要的操作之一。
MapTask
和ReduceTask
均会对数据按照key
进行排序。该操作属于Hadoop
的默认行为。任何应用程序中的数据均会被排序,而不管逻辑上是否需要。
默认排序是按照字典顺序排序,且实现该排序的方法是快速排序。
对于MapTask
,它会将处理的结果暂时放到环形缓冲区中,当环形缓冲区使用率达到一定阈值后,再对缓冲区中的数据进行一次快速排序,并将这些有序数据溢写到磁盘上,而当数据处理完毕后,它会对磁盘上所有文件进行归并排序。
对于ReduceTask
,它从每个MapTask
上远程拷贝相应的数据文件,如果文件大小超过一定阈值,则溢写磁盘上,否则存储在内存中。如果磁盘上文件数目达到一定阈值,则进行一次归并排序以生成一个更大文件;如果内存中文件大小或者数目超过一定阈值,则进行一次合并后将数据溢写到磁盘上。当所有数据拷贝完毕后,ReduceTask
统一对内存和磁盘上的所有数据进行一次归并排序。
自定义排序WritableComparable
bean
对象做为key
传输,需要实现WritableComparable
接口重写compareTo
方法,就可以实现排序。
1 | package com.tipdm.mr.compare; |
案例1:对前面的流量分析统计结果,进行排序输出
- 对总流量按照升序排列。
文件内容如图所示。
FlowBean
将上一节定义的
FlowBean
复制过来。Mapper
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24package com.tipdm.mr.compare2;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class FlowMapper extends Mapper<LongWritable, Text, FlowBean, Text> {
private FlowBean outKey = new FlowBean();
private Text outValue = new Text();
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, FlowBean, Text>.Context context) throws IOException, InterruptedException {
String inKey = value.toString();
String[] all_string = inKey.split("\t");
System.out.println(all_string.toString());
outKey.setUpFlow(Long.parseLong(all_string[1]));
outKey.setDownFlow(Long.parseLong(all_string[2]));
outKey.setSumFlow(Long.parseLong(all_string[3]));
outValue.set(all_string[0]);
context.write(outKey, outValue);
}
}Reducer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15package com.tipdm.mr.compare2;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class FlowReducer extends Reducer<FlowBean, Text, Text, FlowBean> {
protected void reduce(FlowBean key, Iterable<Text> values, Reducer<FlowBean, Text, Text, FlowBean>.Context context) throws IOException, InterruptedException {
for (Text value : values) {
context.write(value, key);
}
}
}Driver
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29package com.tipdm.mr.compare2;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class FlowDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(FlowDriver.class);
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReducer.class);
job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
FileInputFormat.setInputPaths(job, new Path("..\\ioText\\input3"));
FileOutputFormat.setOutputPath(job, new Path("..\\ioText\\output"));
job.waitForCompletion(true);
}
}注意:此处不能完全把前面的代码搬过来,需要修改
Mapper
阶段的输出数据类型。
结果展示:
此时已经可以按照总流量进行升序排列。
案例2:基于前一个需求,增加自定义分区类,分区按照省份手机号设置
在前面案例的基础上增加一个自定义分区类
MyPartition
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23package com.tipdm.mr.compare3;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class MyPartition extends Partitioner<FlowBean, Text> {
public int getPartition(FlowBean flowBean, Text text, int numPartitions) {
String string = text.toString();
if (string.startsWith("136")){
return 0;
}else if (string.startsWith("137")) {
return 1;
}else if (string.startsWith("138")) {
return 2;
}else if (string.startsWith("139")) {
return 3;
}else{
return 4;
}
}
}并在
Driver
类中配置自定义分区1
2job.setNumReduceTasks(5);
job.setPartitionerClass(MyPartition.class);
结果展示:
Combiner合并
Combiner概述
Combiner
是MR
程序中Mapper
和Reducer
之外的一种组件。Combiner
组件的父类就是Reducer
。Combiner
和Reducer
的区别在于运行的位置。Combiner
是在每一个MapTask
所在的节点运行;Reducer
是接收全局所有Mapper
的输出结果。
Combiner
的意义就是对每一个MapTask
的输出进行局部汇总,以减小网络传输量。Combiner
能够应用的前提是不能影响最终的业务逻辑,而且,Combiner
的输出kv
应该跟Reducer
的输入kv
类型要对应起来。
原理图
Combiner
作用的地点在Copy阶段
前
这里如果是将所有数据全部拷贝到ReduceTask
再进行汇总,会造成大量的网络IO
,因为ReduceTask
和MapTask
不一定是在一个机器上。
使用Combiner
后会提前在MapTask
上进行统计每个MapTask
的结果后,直接将结果传输到ReduceTask
。即改进成如下图所示方式进行网络传输。
注意:
MR
默认是没有开启Combiner
的,原始因为不是所有的任务都能够使用Combiner
,有的任务使用Combiner
会导致结果出问题。例如:求均值任务。
此时即使仍然能降低网络
IO
,但是由于结果计算错误,不能使用Combiner
。
案例演示
直接将前面的WCCount
案例代码复制过来,然后将input
中的文件内容手动复制多一点。我这里是手动复制了74万行的数据,接下来再次去执行MR
项目。
首先直接执行,不修改项目内容。
Map
阶段输出的物理化字节数为18335873
。
接下来设置Combiner
,再次运行项目。
由于
Combiner
的代码是与Reducer
代码一致,简单起见直接使用Reducer
的代码。
这个时候Map
阶段输出的物理化字节数减少到了181
,极大的减少了网络IO
。