4. MapReduce 计算扑克牌中的黑桃个数
就是我们平时打牌时用的扑克牌,现在呢,有一摞牌,我想知道这摞牌中有多少张黑桃
最直接的方式是一张一张检查并且统计出有多少张是黑桃,但是这种方式的效率比较低,如果说这一摞牌只有几十张也就无所谓了,如果这一摞拍有上千张呢?你一张一张去检查还不疯了?
这个时候我们可以使用MapReduce的计算方法
第一步:把这摞牌分配给在座的所有玩家 第二步:让每个玩家查一下自己手中的牌有多少张是黑桃,然后把这个数目汇报给你 第三步:你把所有玩家告诉你的数字加起来,得到最终的结果
之前是一张一张的串行计算,现在使用mapreduce是把数据分配给多个人,并行计算,每一个人获得一 个局部聚合的临时结果,最终再统一汇总一下。
这样就可以快速得到答案了,这其实就是MapReduce的计算思想。
下面我们再通过具体的案例分析MapReduce的计算思想
4.1 分布式计算介绍 传统的计算方式为将数据拉取到本地,在本地执行计算程序。
方法创新,从移动数据修改至移动计算程序到数据存储机器
目的:节省网络IO
第一步:对每个节点上面的数据进行局部计算
第二步:对每个节点上面计算的局部结果进行最终全局汇总
4.2 MapReduce原理剖析 MapReduce
是一种分布式计算模型,由Google
提出,主要用于搜索领域,解决海量数据的计算问题。
MapReduce
由两个阶段组成:Map
和Reduce
Map
—>第一阶段 Reduce
—>第二阶段
这是一个Hadoop
集群,一共5个节点
一个主节点,四个从节点
这里面我们只列出来了HDFS
相关的进程信息
假设我们有一个512M
的文件,这个文件会产生4个block
块,假设这4个block
块正好分别存储到了集群的4个节点上,我们的计算程序会被分发到每一个数据所在的节点,然后开始执行计算,在map
阶段,针对每一个block
块对应的数据都会产生一个map
任务(这个map
任务其实就是执行这个计算程序的),在这里也就意味着会产生4个map
任务并行执行,4个map
阶段都执行完毕以后,会执行reduce
阶段,在reduce
阶段中会对这4个map
任务的输出数据进行汇总统计,得到最终的结果。
下面看一个官方的mapreduce
原理图
左下角是一个文件,文件最下面是几个block
块,说明这个文件被切分成了这几个block
块,文件上面是一些split
,注意,咱们前面说的每个block
产生一个map
任务,其实这是不严谨的,其实严谨一点来说的话应该是一个split
产生一个map
任务。
那这里的block
和split
之间有什么关系吗? 我们来分析一下block
块是文件的物理切分,在磁盘上是真实存在的。是对文件的真正切分而split
是逻辑划分,不是对文件真正的切分,默认情况下我们可以认为一个split
的大小和一个block
的大小是一样的,所以实际上是一个split
会产生一个map task
这里面的map Task
就是咱们前面说的map
任务,看后面有一个reduce Task
,reduce
会把结果数据输出到hdfs
上,有几个reduce
任务就会产生几个文件,这里有三个reduce
任务,就产生了3个文件,咱们前面分析的案例中只有一个reduce
任务做全局汇总
注意看map
的输入 输出reduce
的输入 输出
map
的输入是k1,v1
输出是k2,v2
reduce
的输入是k2,v2
输出是k3,v3
都是键值对的形式。 在这注意一下,为什么在这是1,2,3呢? 这个主要是为了区分数据,方便理解,没有其它含义,这是我们人为定义的。
4.3 案例分析 mapreduce
主要分为两大步骤map
和reduce
,map
和reduce
在代码层面对应的就是两个类,map
对应的是mapper
类,reduce
对应的是reducer
类,下面我们就来根据一个案例具体分析一下这两个步骤
假设我们有一个文件,文件里面有两行内容 第一行是hello you
第二行是hello me
我们想统计文件中每个单词出现的总次数
4.3.1 map阶段 第一步 :框架会把输入文件(夹)划分为很多InputSplit
,这里的inputsplit
就是前面我们所说的split
【对文件进行逻辑划分产生的】,默认情况下,每个HDFS
的Block
对应一个InputSplit
。再通过RecordReader
类,把每个InputSplit
解析成一个一个的<k1,v1>
。默认情况下,每一行数据,都会被解析成一个<k1,v1>
这里的k1
是指每一行的起始偏移量,v1
代表的是那一行内容
1 2 <0, hello you> <10, hello me>
注意:map
第一次执行会产生<0,hello you>
,第二次执行会产生<10,hello me>
,并不是执行一次就 获取到这两行结果了,因为每次只会读取一行数据,我在这里只是把这两行执行的最终结果都列出来了
第二步 :框架调用Mapper
类中的map(…)
函数,map
函数的输入是<k1,v1>
,输出是<k2,v2>
。一个InputSplit
对应一个map task
。程序员需要自己覆盖Mapper
类中的map
函数,实现具体的业务逻辑。
因为我们需要统计文件中每个单词出现的总次数,所以需要先把每一行内容中的单词切开,然后记录出现 次数为1,这个逻辑就需要我们在map
函数中实现了
1 2 3 4 5 6 # 针对<0, hello you> <hello, 1> <you, 1> # 针对<10, hello me> <hello, 1> <me, 1>
第三步 :框架对map
函数输出的`<k2,v2>
进行分区。不同分区中的<k2,v2>
由不同的reduce task
处理,默认只有1个分区,所以所有的数据都在一个分区,最后只会产生一个reduce task
。
经过这个步骤之后,数据没什么变化,如果有多个分区的话,需要把这些数据根据分区规则分开,在这里 默认只有1个分区。
1 2 3 4 <hello,1> <you,1> <hello,1> <me,1>
咱们在这所说的单词计数,其实就是把每个单词出现的次数进行汇总即可,需要进行全局的汇总,不需要进行分区,所以一个redeuce
任务就可以搞定,如果你的业务逻辑比较复杂,需要进行分区,那么就会产生多个reduce
任务了,那么这个时候,map
任务输出的数据到底给哪个reduce
使用?这个就需要划分一下,要不然就乱套了。
假设有两个reduce
,map
的输出到底给哪个reduce
,如何分配,这是一个问题。
这个问题,由分区来完成。
map
输出的那些数据到底给哪个reduce
使用,这个就是分区干的事了。
第四步 :框架对每个分区中的数据,都会按照k2
进行排序、分组。分组指的是相同k2
的v2
分成一个组。先按照k2
排序
1 2 3 4 <hello,1> <hello,1> <me,1> <you,1>
然后按照k2
进行分组,把相同k2
的v2
分成一个组
1 2 3 <hello,{1,1}> <me,{1}> <you,{1}>
第五步 :在map
阶段,框架可以选择执行Combiner
过程
Combiner
可以翻译为规约,规约是什么意思呢? 在刚才的例子中,咱们最终是要在reduce
端计算单词出现的总次数的,所以其实是可以在map
端提前执行reduce
的计算逻辑,先对在map
端对单词出现的次数进行局部求和操作,这样就可以减少map
端到reduce
端数据传输的大小,这就是规约的好处,当然了,并不是所有场景都可以使用规约,针对求平均值之类的操作就不能使用规约了,否则最终计算的结果就不准确了。
Combiner
一个可选步骤,默认这个步骤是不执行的。
第六步 :框架会把map task
输出的<k2,v2>
写入到linux
的磁盘文件中
1 2 3 <hello,{1,1}> <me,{1}> <you,{1}>
至此,整个map
阶段执行结束
最后注意一点:
MapReduce
程序是由map
和reduce
这两个阶段组成的,但是reduce
阶段不是必须的,也就是说有的mapreduce
任务只有map
阶段,为什么会有这种任务呢?
是这样的,咱们前面说过,其实reduce
主要是做最终聚合的,如果我们这个需求是不需要聚合操作,直接对数据做过滤处理就行了,那也就意味着数据经过map
阶段处理完就结束了,所以如果reduce
阶段不存在的话,map
的结果是可以直接保存到HDFS
中的。
注意,如果没有reduce
阶段,其实map
阶段只需要执行到第二步就可以,第二步执行完成以后,结果就可以直接输出到HDFS
了。
针对我们这个单词计数的需求是存在reduce
阶段的,所以我们继续往下面分析。
4.3.2 reduce阶段 第一步 :框架对多个map
任务的输出,按照不同的分区,通过网络copy
到不同的reduce
节点。这个过程称作shuffle
针对我们这个需求,只有一个分区,所以把数据拷贝到reduce
端之后还是老样子
1 2 3 <hello,{1,1}> <me,{1}> <you,{1}>
第二步 :框架对reduce
端接收的相同分区的<k2,v2>
数据进行合并、排序、分组。
reduce
端接收到的是多个map
的输出,对多个map
任务中相同分区的数据进行合并 排序 分组注意,之前在map
中已经做了排序 分组,这边也做这些操作 重复吗?
不重复,因为map
端是局部的操作 reduce
端是全局的操作
之前是每个map
任务内进行排序,是有序的,但是多个map
任务之间就是无序的了。
不过针对我们这个需求只有一个map
任务一个分区,所以最终的结果还是老样子
1 2 3 <hello,{1,1}> <me,{1}> <you,{1}>
第三步 :框架调用Reducer类中的reduce方法,reduce方法的输入是,输出是。一个调用一次reduce函数。程序员需要覆盖reduce函数,实现具体的业务逻辑。
那我们在这里就需要在reduce函数中实现最终的聚合计算操作了,将相同k2的{v2}累加求和,然后再转 化为k3,v3写出去,在这里最终会调用三次reduce函数
1 2 3 <hello,2> <me,1> <you,1>
第四步 :框架把reduce
的输出结果保存到HDFS
中。
至此整个reduce
阶段结束
4.4 案例开发 接下来看这个图再重新梳理一下单词计数的执行流程
单文件WordCount分析
多文件WordCount分析
4.4.1 脚本开发 代码开发的整个过程分为3个阶段:
Map
阶段代码开发
Reduce
阶段代码开发
组装Job
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 package tipdm.mr;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class WordCountJob { public static class MyMapper extends Mapper <LongWritable, Text, Text, LongWritable>{ @Override protected void map (LongWritable k1, Text v1, Context context) throws IOException, InterruptedException { String[] words = v1.toString().split(" " ); for (String word : words){ Text k2 = new Text (word); LongWritable v2 = new LongWritable (1L ); context.write(k2, v2); } } } public static class MyReducer extends Reducer <Text, LongWritable, Text, LongWritable>{ @Override protected void reduce (Text k2, Iterable<LongWritable> v2s, Context context) throws IOException, InterruptedException { long sum = 0L ; for (LongWritable v2: v2s){ sum += v2.get(); } Text k3 = k2; LongWritable v3 = new LongWritable (sum); context.write(k3, v3); } } public static void main (String[] args) { try { if (args.length!=2 ){ System.exit(100 ); } Configuration conf = new Configuration (); Job job = Job.getInstance(conf); FileInputFormat.setInputPaths(job, new Path (args[0 ])); FileOutputFormat.setOutputPath(job, new Path (args[1 ])); job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); job.setReducerClass(MyReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); job.waitForCompletion(true ); } catch (Exception e){ e.printStackTrace(); } } }
4.4.2 jar打包 在pom
文件中添加maven
的编译打包插件。添加到</prject>下面
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 <build > <plugins > <plugin > <groupId > org.apache.maven.plugins</groupId > <artifactId > maven-compiler-plugin</artifactId > <version > 2.3.2</version > <configuration > <encoding > UTF-8</encoding > <source > 1.8</source > <target > 1.8</target > <showWarnings > true</showWarnings > </configuration > </plugin > <plugin > <artifactId > maven-assembly-plugin</artifactId > <configuration > <descriptorRefs > <descriptorRef > jar-with-dependencies</descriptorRef > </descriptorRefs > <archive > <manifest > <mainClass > </mainClass > </manifest > </archive > </configuration > <executions > <execution > <id > make-assembly</id > <phase > package</phase > <goals > <goal > single</goal > </goals > </execution > </executions > </plugin > </plugins > </build >
注意了,这些添加完以后还有一个地方需要修改,需要在pom
中的hadoop-client
和log4j
依赖中增加scope
属性,值为provided
,表示只在编译的时候使用这个依赖,在执行以及打包的时候都不使用,因为hadoop-client
和log4j
依赖在集群中都是有的,所以在打jar
包的时候就不需要打进去了,如果我们使用到了集群中没有的第三方依赖包就不需要增加这个provided
属性了,不增加provided
就可以把对应的第三方依赖打进jar
包里面了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 <dependency > <groupId > org.apache.hadoop</groupId > <artifactId > hadoop-client</artifactId > <version > 3.2.0</version > <scope > provided</scope > </dependency > <dependency > <groupId > org.slf4j</groupId > <artifactId > slf4j-api</artifactId > <version > 1.7.10</version > <scope > provided</scope > </dependency > <dependency > <groupId > org.slf4j</groupId > <artifactId > slf4j-log4j12</artifactId > <version > 1.7.10</version > <scope > provided</scope > </dependency >
打包:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 D:\IdeaProjects\db_hadoop>mvn clean package -DskipTests [INFO ] Scanning for projects... [INFO ] [INFO ] ---------------------------------------------------------------------- [INFO ] Building db_hadoop 1.0 -SNAPSHOT [INFO ] ---------------------------------------------------------------------- .............................. [INFO ] --- maven-jar-plugin :2.3 .2 :jar (default-jar ) @ db_hadoop --- [INFO ] Building jar: D:\IdeaProjects\db_hadoop\target\db_hadoop-1 .0 -SNAPSHOT . [INFO ] [INFO ] --- maven-assembly-plugin :2.2 -beta-5 :single (make-assembly ) @ db_hadoo [INFO ] Building jar: D:\IdeaProjects\db_hadoop\target\db_hadoop-1 .0 -SNAPSHOT- [INFO ] ---------------------------------------------------------------------- [INFO ] ---------------------------------------------------------------------- [INFO ] Total time: 5.410 s [INFO ] Finished at: Wed Apr 22 11 :00 :45 CST 2020 [INFO ] Final Memory: 24 M/375 M [INFO ] ----------------------------------------------------------------------
命令执行成功之后,就可以到target
目录下获取对应的jar
包了,需要使用jar-with-dependencies
结尾的 那个jar
包。
1 D:\IdeaProjects\db_hadoop\target\db_hadoop-1 .0 -SNAPSHOT-jar-with-dependencies .jar
把这个jar
包上传到集群的任意一台机器上面或者是hadoop
客户端机器上都可以,只要这台机器可以和集群进行交互即可。
注意,这个jar
包不能使用java -jar
的方式执行,需要使用集群特有的执行方式。
4.4.3 在集群中执行jar包 首先,把jar
包上传到集群
确认是否存在
1 2 3 4 5 6 7 8 9 10 11 12 13 14 [root@master hadoop-3.2.0]# ll total 196 drwxr-xr-x. 2 1001 1002 203 Jan 8 2019 bin -rw-r--r--. 1 root root 5716 Apr 22 11:00 db_hadoop-1.0-SNAPSHOT-jar-with-dependencies.jar drwxr-xr-x. 3 1001 1002 20 Jan 8 2019 etc drwxr-xr-x. 2 1001 1002 106 Jan 8 2019 include drwxr-xr-x. 3 1001 1002 20 Jan 8 2019 lib drwxr-xr-x. 4 1001 1002 4096 Jan 8 2019 libexec -rw-rw-r--. 1 1001 1002 150569 Oct 19 2018 LICENSE.txt -rw-rw-r--. 1 1001 1002 22125 Oct 19 2018 NOTICE.txt -rw-rw-r--. 1 1001 1002 1361 Oct 19 2018 README.txt -rw-r--r--. 1 root root 1361 Apr 19 12:29 README.txt.bak drwxr-xr-x. 3 1001 1002 4096 Apr 18 23:13 sbin drwxr-xr-x. 4 1001 1002 31 Jan 8 2019 share
在向集群中正式提交任务jar
包之前需要先把测试数据准备好
在本地创建一个hello.txt
文件,内容是
1 2 3 [root@master hadoop-3.2.0]# vi hello.txt hello you hello me
单词中间用空格隔开,因为我们在MapReduce
代码中是使用空格进行切割单词的。
然后把hello.txt
上传到hdfs
的test
目录下
1 2 3 4 5 [root@master hadoop-3.2.0]# hdfs dfs -mkdir /test [root@master hadoop-3.2.0]# hdfs dfs -put hello.txt /test [root@master hadoop-3.2.0]# hdfs dfs -ls /test Found 1 items -rw-r--r-- 2 root supergroup 19 2020-04-22 11:16 /test/hello.txt
接下来就可以向集群提交MapReduce
任务了
具体的命令是这样的
1 hadoop jar db_hadoop-1.0-SNAPSHOT-jar-with-dependencies.jar tipdm.mr.WordCountJob /test/hello.txt /out
hadoop
:表示使用hadoop
脚本提交任务,其实在这里使用yarn
脚本也是可以的,从hadoop2
开始支持使用yarn
,不过也兼容hadoop1
,也继续支持使用hadoop
脚本,所以在这里使用哪个都可以,具体就看你个人的喜好了,我是习惯于使用hadoop
脚本
jar
:表示执行jar
包
db_hadoop-1.0-SNAPSHOT-jar-with-dependencies.jar
:指定具体的jar
包路径信息
tipdm.mr.WordCountJob
:指定要执行的mapreduce
代码的全路径
/test/hello.txt
:指定mapreduce
接收到的第一个参数,代表的是输入路径,这里的输入路径可以直接指定hello.txt
的路径,也可以直接指定它的父目录,因为它的父目录里面也没有其它无关的文件,如果指定目录的话就意味着hdfs
会读取这个目录下所有的文件,所以后期如果我们需要处理一批文件,那就可以把他们放到同一个目录里面,直接指定目录即可。
/out
:指定mapreduce
接收到的第二个参数,代表的是输出目录,这里的输出目录必须是不存在的,MapReduce
程序在执行之前会检测这个输出目录,如果存在会报错,因为它每次执行任务都需要一个新的输出目录来存储结果数据
任务提交到集群上面之后,可以在shell窗口中看到如下日志信息,最终map
执行到100%,reduce
执行到100%,说明任务执行成功了。
1 2 3 2020-04-22 15:12:59,887 INFO mapreduce.Job: map 0% reduce 0% 2020-04-22 15:13:08,050 INFO mapreduce.Job: map 100% reduce 0% 2020-04-22 15:13:16,261 INFO mapreduce.Job: map 100% reduce 100%
当然了,也可以到web
界面中查看任务执行情况。
访问 http://master:8088
那我们来查看一下任务输出的结果,
1 2 3 4 [root@master hadoop-3.2.0]# hdfs dfs -ls /out Found 2 items -rw-r--r-- 2 root supergroup 0 2020-04-22 15:13 /out/_SUCCESS -rw-r--r-- 2 root supergroup 19 2020-04-22 15:13 /out/part-r-00000
在out
输出目录中,_SUCCESS
是一个标记文件,有这个文件表示这个任务执行成功了。
part-r-00000
是具体的数据文件,如果有多个reduce
任务会产生多个这种文件,多个文件的话会按照从0
开始编号,00001
,00002
等等…
还有一点需要注意的,part
后面的r
表示这个结果文件是reduce
步骤产生的,如果一个mapreduce
只有map
阶段没有reduce
阶段,那么产生的结果文件是part-m-00000
这样的。
查看数据文件part-r-00000
1 2 3 4 [root@master hadoop-3.2.0]# hdfs dfs -cat /out/part-r-00000 hello 2 me 1 you 1
4.4.4 增加日志输出 4.4.4.1 使用System.out.println()进行日志输出 首先 ,在map
和reduce
类中增加输出语句
map
类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 protected void map (LongWritable k1, Text v1, Context context) throws IOException, InterruptedException { System.out.println("<k1, v1> = <" + k1.get() + "," + v1.toString() + ">" ); String[] words = v1.toString().split(" " ); for (String word : words){ Text k2 = new Text (word); LongWritable v2 = new LongWritable (1L ); context.write(k2, v2); } }
reduce
类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 protected void reduce (Text k2, Iterable<LongWritable> v2s, Context context) throws IOException, InterruptedException { long sum = 0L ; for (LongWritable v2: v2s){ System.out.println("<k2, v2> = <" + k2.toString() + "," + v2.get() + ">" ); sum += v2.get(); } Text k3 = k2; LongWritable v3 = new LongWritable (sum); System.out.println("<k3, v3> = <" + k3.toString() + "," + v3.get() + ">" ); context.write(k3, v3); }
然后 ,进入yarn
资源管理页面master:8088
点击History
发现无法访问
需要开启YARN
的日志聚合功能,把散落在`NodeManager
节点上的日志统一收集管理,方便查看日志。
修改yarn-site.xml
文件配置,增加yarn.log-aggregation-enable
和yarn.log.server.url
这两个参数
1 2 3 4 5 6 7 8 <property > <name > yarn.log-aggregation-enable</name > <value > true</value > </property > <property > <name > yarn.log.server.url</name > <value > http://master:19888/jobhistory/logs/</value > </property >
修改后重启集群生效。
1 2 [root@master hadoop-3.2.0]# sbin/stop-all.sh [root@master hadoop-3.2.0]# vim etc/hadoop/yarn-site.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. See accompanying LICENSE file. --> <configuration > <property > <name > yarn.nodemanager.aux-services</name > <value > mapreduce_shuffle</value > </property > <property > <name > yarn.nodemanager.env-whitelist</name > <value > JAVA_HOME,HADOOP_COMMON_HOME,HADOOP_HDFS_HOME,HADOOP_CONF_DIR,CLASSPATH_PREPEND_DISTCACHE,HADOOP_YARN_HOME,HADOOP_MAPRED_HOME</value > </property > <property > <name > yarn.resourcemanager.hostname</name > <value > master</value > </property > <property > <name > yarn.log-aggregation-enable</name > <value > true</value > </property > <property > <name > yarn.log.server.url</name > <value > http://master:19888/jobhistory/logs/</value > </property > </configuration >
同步到其他两个节点:
1 2 [root@master hadoop-3.2.0]# scp -rq etc/hadoop/yarn-site.xml slave1:/data/soft/hadoop-3.2.0/etc/hadoop/ [root@master hadoop-3.2.0]# scp -rq etc/hadoop/yarn-site.xml slave2:/data/soft/hadoop-3.2.0/etc/hadoop/
重新启动hadoop集群
1 [root@master hadoop-3.2.0]# sbin/start-all.sh
启动historyserver进程,需要在集群的所有节点上都启动这个进程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 [root@master hadoop-3.2.0]# bin/mapred --daemon start historyserver [root@master hadoop-3.2.0]# jps 13430 NameNode 14006 ResourceManager 14650 JobHistoryServer 13723 SecondaryNameNode 14717 Jps [root@slave1 hadoop-3.2.0]# bin/mapred --daemon start historyserver [root@slave1 hadoop-3.2.0]# jps 4722 JobHistoryServer 4789 Jps 4168 NodeManager 4045 DataNode [root@slave2 hadoop-3.2.0]# bin/mapred --daemon start historyserver [root@slave2 hadoop-3.2.0]# jps 5217 JobHistoryServer 4803 NodeManager 5284 Jps 4681 DataNode
重新提交mapreduce任务
1 [root@master hadoop-3.2.0]# hadoop jar dbhadoop-1.0-SNAPSHOT-jar-with-dependencies.jar tipdm.mr.WordCountJob /test/hello.txt /out2
再次进去yarn的8088界面,即可进入history
点击map中的Successful
再点击logs
该处即可看到我们控制台输入的内容。
4.4.4.2 使用Logger进行输出 修改map代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public static class MyMapper extends Mapper <LongWritable, Text, Text, LongWritable>{ Logger logger = LoggerFactory.getLogger(MyMapper.class); @Override protected void map (LongWritable k1, Text v1, Context context) throws IOException, InterruptedException { logger.info("<k1, v1> = <" + k1.get() + "," + v1.toString() + ">" ); String[] words = v1.toString().split(" " ); for (String word : words){ Text k2 = new Text (word); LongWritable v2 = new LongWritable (1L ); context.write(k2, v2); } } }
修改reduce代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 public static class MyReducer extends Reducer <Text, LongWritable, Text, LongWritable>{ Logger logger = LoggerFactory.getLogger(MyMapper.class); @Override protected void reduce (Text k2, Iterable<LongWritable> v2s, Context context) throws IOException, InterruptedException { long sum = 0L ; for (LongWritable v2: v2s){ logger.info("<k2, v2> = <" + k2.toString() + "," + v2.get() + ">" ); sum += v2.get(); } Text k3 = k2; LongWritable v3 = new LongWritable (sum); logger.info("<k3, v3> = <" + k3.toString() + "," + v3.get() + ">" ); context.write(k3, v3); } }
重新打包:
1 mvn clean package -DskipTests
上传到hadoop集群重新编译
1 [root@master hadoop-3.2.0]# hadoop jar dbhadoop-1.0-SNAPSHOT-jar-with-dependencies.jar tipdm.mr.WordCountJob /test/hello.txt /out1
再次进入logs
可以看到日志信息再下方输出
4.4.4.3 使用命令查看日志 1 [root@master hadoop-3.2.0]# yarn logs -applicationId application_1639643066888_0001
可重定向到文件中,在本地查看日志信息。
4.5 停止集群中的任务 1 2 3 hadoop job -kill job_1639643066888_0002 # 或者 yarn application -kill application_1639643066888_0002
4.6 MapReduce程序扩展 咱们前面说过MapReduce任务是由map阶段和reduce阶段组成的
但是我们也说过,reduce阶段不是必须的,那也就意味着MapReduce程序可以只包含map阶段。
什么场景下会只需要map阶段呢?
当数据只需要进行普通的过滤、解析等操作,不需要进行聚合,这个时候就不需要使用reduce阶段了。
在代码层面该如何设置呢?
很简单,在组装Job的时候设置reduce的task数目为0就可以了。并且Reduce代码也不需要写了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 package tipdm.mr;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.io.IOException;public class WordCountJobnoReduce { public static class MyMapper extends Mapper <LongWritable, Text, Text, LongWritable>{ Logger logger = LoggerFactory.getLogger(MyMapper.class); @Override protected void map (LongWritable k1, Text v1, Context context) throws IOException, InterruptedException { logger.info("<k1, v1> = <" + k1.get() + "," + v1.toString() + ">" ); String[] words = v1.toString().split(" " ); for (String word : words){ Text k2 = new Text (word); LongWritable v2 = new LongWritable (1L ); context.write(k2, v2); } } } public static void main (String[] args) { try { if (args.length!=2 ){ System.exit(100 ); } Configuration conf = new Configuration (); Job job = Job.getInstance(conf); job.setJarByClass(WordCountJobnoReduce.class); FileInputFormat.setInputPaths(job, new Path (args[0 ])); FileOutputFormat.setOutputPath(job, new Path (args[1 ])); job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); job.setNumReduceTasks(0 ); job.waitForCompletion(true ); } catch (Exception e){ e.printStackTrace(); } } }
重新打包后上传到集群运行:
1 [root@master hadoop-3.2.0]# hadoop jar dbhadoop-1.0-SNAPSHOT-jar-with-dependencies.jar tipdm.mr.WordCountJobnoReduce /test/hello.txt /out3
此时可见reduce阶段0%就结束了
1 2 3 4 5 [root@master hadoop-3.2.0]# hdfs dfs -cat /out3/part-m-00000 hello 1 you 1 hello 1 me 1
可见结果只有reduce部分的结果。
4.7 Shuffer执行过程级源码分析输入输出 4.7.1 Shuffer过程详解 咱们前面简单说过,shuffer是一个网络拷贝的过程,是指通过网络把数据从map端拷贝到reduce端的过 程,下面我们来详细分析一下这个过程。
看这张图
接下来我们来根据这张图分析一下shuffle
的一些细节信息,首先看map
阶段,最左边有一个inputsplit
,最终会产生一个map
任务,map
任务在执行的时候会把k1,v1
转化为k2,v2
,这些数据会先临时存储到一个内存缓冲区中,这个内存缓冲区的大小默认是100M
(io.sort.mb
属性),当达到内存缓冲区大小的80%
(io.sort.spill.percent
)也就是80M
的时候,会把内存中的数据溢写到本地磁盘中(mapred.local.dir
),一直到map
把所有的数据都计算完,最后会把内存缓冲区中的数据一次性全部刷新到本地磁盘文件中,在这个图里面表示产生了3个临时文件,每个临时文件中有3个分区,这是由于map阶段中对数据做了分区,所以数据在存储的时候,在每个临时文件中也划分为了3块,最后需要对这些临时文件进行合并,合并为一个大文件,因为一个map
任务最终只会产生一个文件,这个合并之后的文件也是有3个分区的,这3个分区的数据会被shuffle
线程分别拷贝到三个不同的reduce
节点,图里面只显示了一个reduce
节点,下面还有两个没有显示。不同map
任务中的相同分区的数据会在同一个reduce
节点进行合并,合并以后会执行reduce
的功能,最终产生结果数据。
在这里shuffle
其实是横跨map
端和reduce
端的,它主要是负责把map
端产生的数据通过网络拷贝到reduce
阶段进行统一聚合计算。
4.7.2 Hadoop中序列化机制 咱们前面在开发MapReduce
程序的时候使用到了LongWritable
和Text
这些数据类型,这些数据类型对应的是Java
中的Long
和String
,那MapReduce
为什么不直接使用Java
中的这些数据类型呢?那肯定是嫌弃Java
中的这些数据类型使用起来不爽,那具体不爽在什么地方呢?
这个其实就涉及到序列化这个知识点了,下面我们来分析一下,来看这张图,
我们的map阶段在读取数据的是需要从hdfs中读取的,这里面需要经过磁盘IO和网络IO,不过正常情况下map任务会执行本地计算,也就是map任务会被分发到数据所在的节点进行计算,这个时候,网络io几乎就没有了,就剩下了磁盘io,再往后面看,map阶段执行完了以后,数据会被写入到本地磁盘文件,这个时候也需要经过磁盘io,后面的shuffle拷贝数据其实也需要先经过磁盘io把数据从本地磁盘读出来再通过网络发送到reduce节点,再写入reduce节点的本地磁盘,然后reduce阶段在执行的时候会经过磁盘io读取本地文件中的数据,计算完成以后还会经过磁盘io和网络io把数据写入到hdfs中。经过我们刚才的分析,其实在这里面占得比重最高的是磁盘io,所以说影响mapreduce任务执行效率的主要原因就是磁盘io ,如果想要提高任务执行效率,就需要从这方面着手分析。当程序在向磁盘中写数据以及从磁盘中读取数据的时候会对数据进行序列化和反序列化,磁盘io这些步骤我们省略不了,但是我们可以从序列化和反序列化这一块来着手做一些优化。
首先我们分析一下序列化和反序列化,看这个图,当我们想把内存中的数据写入到文件中的时候,会对数据序列化,然后再写入,这个序列化其实就是把内存中的对象信息转成二进制的形式,方便存储到文件中,默认java中的序列化会把对象及其父类、超类的整个继承体系信息都保存下来,这样存储的信息太大了,就会导致写入文件的信息过大,这样写入是会额外消耗性能的。
反序列化也是一样,reduce端想把文件中的对象信息加载到内存中,如果文件很大,在加载的时候也会额外消耗很多性能,所以如果我们把对象存储的信息尽量精简,那么就可以提高数据写入和读取消耗的性能。
基于此,hadoop官方实现了自己的序列化和反序列化机制,没有使用java中的序列化机制,所以hadoop中的数据类型没有沿用java中的数据类型,而是自己单独设计了一些writable的实现了,例如、longwritable、text等 那我们来看一下Hadoop中提供的常用的基本数据类型的序列化类。
在这需要注意一下
Text等价于java.lang.String的Writable,针对UTF-8序列
NullWritable是单例,获取实例使用NullWritable.get()
那下面我们来总结一下hadoop自己实现的序列化有什么特点:
紧凑: 高效使用存储空间
快速: 读写数据的额外开销小
可扩展: 可透明地读取老格式的数据
互操作: 支持多语言的交互
对应的我们也对java中序列化的不足之后做了一个总结:
不精简,附加信息多,不太适合随机访问
存储空间大,递归地输出类的超类描述直到不再有超类
前面我们分析了Java中的序列化和Hadoop中的序列化,其实最主要的区别就是针对相同的数据,Java中的序列化会占用较大的存储空间,而Hadoop中的序列化可以节省很多存储空间,这样在海量数据计算的场景下,可以减少数据传输的大小,极大的提高计算效率,下面我们就来具体实战分析一下。
先创建一个Java的序列化代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 package tipdm.mr;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.*;public class JavaSerialize { public static void main (String[] args) throws IOException { StudentJava studentJava = new StudentJava (); studentJava.setId(1L ); studentJava.setName("Hadoop" ); FileOutputStream fos = new FileOutputStream ("D:\\student_java.txt" ); ObjectOutputStream oos = new ObjectOutputStream (fos); oos.writeObject(studentJava); oos.close(); fos.close(); } } class StudentJava implements Serializable { private static final long serialVersionUID = 1L ; private Long id; private String name; public static long getSerialVersionUID () { return serialVersionUID; } public Long getId () { return id; } public void setId (Long id) { this .id = id; } public String getName () { return name; } public void setName (String name) { this .name = name; } }
接下来再开发Hadoop的序列化代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 package tipdm.mr;import org.apache.hadoop.io.Writable;import java.io.*;public class HadoopSerialize { public static void main (String[] args) throws IOException { StudentWritable studentWritable = new StudentWritable (); studentWritable.setId(1L ); studentWritable.setName("Hadoop" ); FileOutputStream fos = new FileOutputStream ("D:\\sudent_hadoop.txt" ); ObjectOutputStream oos = new ObjectOutputStream (fos); studentWritable.write(oos); oos.close(); fos.close(); } } class StudentWritable implements Writable { private Long id; private String name; public Long getId () { return id; } public void setId (Long id) { this .id = id; } public String getName () { return name; } public void setName (String name) { this .name = name; } @Override public void write (DataOutput out) throws IOException { out.writeLong(this .id); out.writeUTF(this .name); } @Override public void readFields (DataInput in) throws IOException { this .id = in.readLong(); this .name = in.readUTF(); } }
执行这两个代码,最终会在D盘下产生两个文件,查看这两个文件的大小,最终发现Java序列化的文件大小是Hadoop序列化文件大小的10倍左右。
源码分析。。。。
4.8 MapReduce性能优化 4.8.1 小文件问题 Hadoop的HDFS和Mapreduce框架是针对大数据文件来设计的,在小文件的处理上不但效率低下,而且十分消耗内存资源。
解决该问题通常是找一个容器将小文件打包起来。
HDFS提供了两种类型的容器,SequenceFile和MapFile
SequenceFile 是Hadoop提供的一种二进制文件,这种二进制文件直接将对序列化到文件中。
一般对小文件可以使用这种文件合并,即将文件名作为key,文件内容作为value序列化到大文件中。
注意:SequeceFile需要一个合并文件的过程,文件较大,且合并后的文件将不方便查看,必须通过遍历查看每一个小文件。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 package tipdm.mr;import org.apache.commons.io.FileUtils;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.FileUtil;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.SequenceFile;import org.apache.hadoop.io.Text;import java.io.File;import java.io.IOException;import java.sql.SQLOutput;public class SmallFileSeq { public static void main (String[] args) throws Exception { write("D:\\smallFile" , "/seqFile" ); read("/seqFile" ); } public static void write (String inputDir, String outputFile) throws Exception{ Configuration conf = new Configuration (); conf.set("fs.defaultFS" , "hdfs://master:9000" ); FileSystem fileSystem = FileSystem.get(conf); fileSystem.delete(new Path (outputFile), true ); SequenceFile.Writer.Option[] opts = new SequenceFile .Writer.Option[]{ SequenceFile.Writer.file(new Path (outputFile)), SequenceFile.Writer.keyClass(Text.class), SequenceFile.Writer.valueClass(Text.class) }; SequenceFile.Writer writer = SequenceFile.createWriter(conf, opts); File InputDirPath = new File (inputDir); if (InputDirPath.isDirectory()){ File[] files = InputDirPath.listFiles(); for (File file : files) { String content = FileUtils.readFileToString(file, "UTF-8" ); String filename = file.getName(); Text key = new Text (filename); Text value = new Text (content); writer.append(key, value); } } writer.close(); } private static void read (String inputFile) throws IOException { Configuration conf = new Configuration (); conf.set("fs.defaultFS" , "hdfs://master:9000" ); SequenceFile.Reader reader = new SequenceFile .Reader(conf, SequenceFile.Reader.file(new Path (inputFile))); Text key = new Text (); Text value = new Text (); while (reader.next(key, value)){ System.out.println("文件名:" + key.toString() + "," ); System.out.println("文件内容:" + value.toString() + "," ); } reader.close(); } }
MapFile 是排序后的SequenceFile,MapFile由两部分组成,分别是index和data
index作为小文件的数据索引,主要记录了每个Record的key值,以及该Record在文件中的偏移位置。
在MapFile被访问的时候,索引文件回被加载到内存,通过索引映射关系可迅速定位到指定Record所在文件位置。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 package tipdm.mr;import org.apache.commons.io.FileUtils;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.MapFile;import org.apache.hadoop.io.SequenceFile;import org.apache.hadoop.io.Text;import java.io.File;import java.io.IOException;public class SmallFileMap { public static void main (String[] args) throws Exception { write("D:\\smallFile" , "/mapFile" ); read("/mapFile" ); } private static void write (String inputDir, String outputDir) throws Exception{ Configuration conf = new Configuration (); conf.set("fs.defaultFS" , "hdfs://master:9000" ); FileSystem fileSystem = FileSystem.get(conf); fileSystem.delete(new Path (outputDir), true ); SequenceFile.Writer.Option[] opts = new SequenceFile .Writer.Option[]{ MapFile.Writer.keyClass(Text.class), MapFile.Writer.valueClass(Text.class) }; MapFile.Writer writer = new MapFile .Writer(conf, new Path (outputDir), opts); File InputDirPath = new File (inputDir); if (InputDirPath.isDirectory()){ File[] files = InputDirPath.listFiles(); for (File file : files) { String content = FileUtils.readFileToString(file, "UTF-8" ); String filename = file.getName(); Text key = new Text (filename); Text value = new Text (content); writer.append(key, value); } } writer.close(); } private static void read (String inputDir) throws IOException { Configuration conf = new Configuration (); conf.set("fs.defaultFS" , "hdfs://master:9000" ); MapFile.Reader reader = new MapFile .Reader(new Path (inputDir), conf); Text key = new Text (); Text value = new Text (); while (reader.next(key, value)){ System.out.println("文件名:" + key.toString() + "," ); System.out.println("文件内容:" + value.toString() + "," ); System.out.println("======================" ); } reader.close(); } }
案例:使用SequenceFile实现小文件存储和计算 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 package tipdm.mr;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFilter;import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.io.IOException;public class WordCountJobSeq { public static class MyMapper extends Mapper <Text, Text, Text, LongWritable>{ Logger logger = LoggerFactory.getLogger(MyMapper.class); @Override protected void map (Text k1, Text v1, Context context) throws IOException, InterruptedException { System.out.println("<k1, v1> = <" + k1.toString() + "," + v1.toString() + ">" ); String[] words = v1.toString().split(" " ); for (String word : words){ Text k2 = new Text (word); LongWritable v2 = new LongWritable (1L ); context.write(k2, v2); } } } public static class MyReducer extends Reducer <Text, LongWritable, Text, LongWritable>{ Logger logger = LoggerFactory.getLogger(MyMapper.class); @Override protected void reduce (Text k2, Iterable<LongWritable> v2s, Context context) throws IOException, InterruptedException { long sum = 0L ; for (LongWritable v2: v2s){ logger.info("<k2, v2> = <" + k2.toString() + "," + v2.get() + ">" ); sum += v2.get(); } Text k3 = k2; LongWritable v3 = new LongWritable (sum); logger.info("<k3, v3> = <" + k3.toString() + "," + v3.get() + ">" ); context.write(k3, v3); } } public static void main (String[] args) { try { if (args.length!=2 ){ System.exit(100 ); } Configuration conf = new Configuration (); Job job = Job.getInstance(conf); job.setJarByClass(WordCountJobSeq.class); FileInputFormat.setInputPaths(job, new Path (args[0 ])); FileOutputFormat.setOutputPath(job, new Path (args[1 ])); job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); job.setInputFormatClass(SequenceFileInputFormat.class); job.setReducerClass(MyReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); job.waitForCompletion(true ); } catch (Exception e){ e.printStackTrace(); } } }
4.8.2 数据倾斜问题 MapReduce程序执行时,Reduce节点大部分执行完毕,但是有一个或者几个Reduce节点运行很慢,导致整个程序处理实际变得很长,具体表现为:Reduce阶段一直卡着不动:
解决方法:
增加Reduce任务个数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 package tipdm.mr;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.io.IOException;public class WordCountSkew { public static class MyMapper extends Mapper <LongWritable, Text, Text, LongWritable>{ Logger logger = LoggerFactory.getLogger(MyMapper.class); @Override protected void map (LongWritable k1, Text v1, Context context) throws IOException, InterruptedException { logger.info("<k1, v1> = <" + k1.get() + "," + v1.toString() + ">" ); String[] words = v1.toString().split(" " ); for (String word : words){ Text k2 = new Text (word); LongWritable v2 = new LongWritable (1L ); context.write(k2, v2); } } } public static class MyReducer extends Reducer <Text, LongWritable, Text, LongWritable>{ Logger logger = LoggerFactory.getLogger(MyMapper.class); @Override protected void reduce (Text k2, Iterable<LongWritable> v2s, Context context) throws IOException, InterruptedException { long sum = 0L ; for (LongWritable v2: v2s){ logger.info("<k2, v2> = <" + k2.toString() + "," + v2.get() + ">" ); sum += v2.get(); if (sum%200 == 0 ){ Thread.sleep(1 ); } } Text k3 = k2; LongWritable v3 = new LongWritable (sum); logger.info("<k3, v3> = <" + k3.toString() + "," + v3.get() + ">" ); context.write(k3, v3); } } public static void main (String[] args) { try { if (args.length!=3 ){ System.exit(100 ); } Configuration conf = new Configuration (); Job job = Job.getInstance(conf); job.setJarByClass(WordCountSkew.class); FileInputFormat.setInputPaths(job, new Path (args[0 ])); FileOutputFormat.setOutputPath(job, new Path (args[1 ])); job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); job.setReducerClass(MyReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); job.setNumReduceTasks(Integer.parseInt(args[2 ])); job.waitForCompletion(true ); } catch (Exception e){ e.printStackTrace(); } } }
把倾斜的数据打散
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 package tipdm.mr;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.io.IOException;import java.util.Random;public class WordCountSkewRandKey { public static class MyMapper extends Mapper <LongWritable, Text, Text, LongWritable>{ Logger logger = LoggerFactory.getLogger(MyMapper.class); @Override protected void map (LongWritable k1, Text v1, Context context) throws IOException, InterruptedException { Random random = new Random (); logger.info("<k1, v1> = <" + k1.get() + "," + v1.toString() + ">" ); String[] words = v1.toString().split(" " ); for (String word : words){ if ("5" .equals(word)){ word = "5_" + random.nextInt(10 ); } Text k2 = new Text (word); LongWritable v2 = new LongWritable (1L ); context.write(k2, v2); } } } public static class MyReducer extends Reducer <Text, LongWritable, Text, LongWritable>{ Logger logger = LoggerFactory.getLogger(MyMapper.class); @Override protected void reduce (Text k2, Iterable<LongWritable> v2s, Context context) throws IOException, InterruptedException { long sum = 0L ; for (LongWritable v2: v2s){ logger.info("<k2, v2> = <" + k2.toString() + "," + v2.get() + ">" ); sum += v2.get(); if (sum%200 == 0 ){ Thread.sleep(1 ); } } Text k3 = k2; LongWritable v3 = new LongWritable (sum); logger.info("<k3, v3> = <" + k3.toString() + "," + v3.get() + ">" ); context.write(k3, v3); } } public static void main (String[] args) { try { if (args.length!=3 ){ System.exit(100 ); } Configuration conf = new Configuration (); Job job = Job.getInstance(conf); job.setJarByClass(WordCountSkewRandKey.class); FileInputFormat.setInputPaths(job, new Path (args[0 ])); FileOutputFormat.setOutputPath(job, new Path (args[1 ])); job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); job.setReducerClass(MyReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); job.setNumReduceTasks(Integer.parseInt(args[2 ])); job.waitForCompletion(true ); } catch (Exception e){ e.printStackTrace(); } } }