0%

Hadoop(4)YARN

5. YARN

实现Hadoop集群的资源共享

YARN不仅仅支持MapReduce,还支持SparkFlink等计算引擎。

YARN主要负责集群资源的管理和调度,支持主从架构,主节点最多可以有2个,从节点可以有多个

  • ResourceManager:主节点主要负责集群资源的分配和管理。

  • NodeManager:从节点主要负责当前机器资源管理。

YARN主要管理内存CPU这两种资源类型

NodeManager启动时会向ResourceManager注册,注册信息中包含该节点可分配的CPU和内存总量。

yarn.nodemanager.resource.memory-mb:单节点可分配的物理内存总量,默认是8MB*1024,即8G

yarn.nodemanager.resource.cou-vcores:单节点可分配的虚拟CPU个数,默认是8

5.1 YARN中的调度器

YARN支持三种调度器:

  1. FIFO Scheduler:先进先出(first in,first out)调度策略
  2. Capacity Scheduler:FIFO Scheduler的多队列版本(默认调度器)
  3. Fair Scheduler:多队列,多用户共享资源

案例:YARN多资源队列配置和使用

  1. 增加online【实时任务】队列和offline【离线任务】队列
  2. 向offline队列提交任务

5.2 增加队列

编辑配置文件:etc/hadoop/capacity-scheduler.xml

1
[root@master hadoop]# vim etc/hadoop/capacity-scheduler.xml

修改/增加以下内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
<property>
<name>yarn.scheduler.capacity.root.queues</name><!--搜索改行然后修改内容-->
<value>default,online,offline</value>
<description>队列列表,多个队列之间使用逗号分割</description>
</property>
<property>
<name>yarn.scheduler.capacity.root.default.capacity</name>
<value>70</value>
<description>default队列70%</description>
</property>
<property>
<name>yarn.scheduler.capacity.root.online.capacity</name>
<value>10</value>
<description>online队列10%</description>
</property>
<property>
<name>yarn.scheduler.capacity.root.offline.capacity</name>
<value>20</value>
<description>offline队列20%</description>
</property>
<property>
<name>yarn.scheduler.capacity.root.default.maximum-capacity</name><!--搜索改行然后修改内容-->
<value>70</value>
<description>Default队列可使用的资源上限.</description>
</property>
<property>
<name>yarn.scheduler.capacity.root.online.maximum-capacity</name>
<value>10</value>
<description>online队列可使用的资源上限.</description>
</property>
<property>
<name>yarn.scheduler.capacity.root.offline.maximum-capacity</name>
<value>20</value>
<description>offline队列可使用的资源上限.</description>
</property>

修改好以后再同步到另外两个节点上

1
2
[root@master hadoop]# scp -rq capacity-scheduler.xml bigdata02:/data/soft/
[root@master hadoop]# scp -rq capacity-scheduler.xml bigdata03:/data/soft/

重启集群:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
[root@master hadoop-3.2.0]# sbin/stop-all.sh 
Stopping namenodes on [master]
上一次登录:五 12月 24 17:37:00 CST 2021pts/0 上
Stopping datanodes
上一次登录:五 12月 24 17:40:02 CST 2021pts/0 上
Stopping secondary namenodes [master]
上一次登录:五 12月 24 17:40:04 CST 2021pts/0 上
Stopping nodemanagers
上一次登录:五 12月 24 17:40:05 CST 2021pts/0 上
[root@master hadoop-3.2.0]# sbin/start-all.sh
Starting namenodes on [master]
上一次登录:五 12月 24 17:40:15 CST 2021pts/0 上
Starting datanodes
上一次登录:五 12月 24 17:40:19 CST 2021pts/0 上
Starting secondary namenodes [master]
上一次登录:五 12月 24 17:40:21 CST 2021pts/0 上
Starting resourcemanager
上一次登录:五 12月 24 17:40:25 CST 2021pts/0 上
Starting nodemanagers
上一次登录:五 12月 24 17:40:28 CST 2021pts/0 上
[root@master hadoop-3.2.0]# jps
6224 SecondaryNameNode
2758 JobHistoryServer
6871 Jps
6507 ResourceManager
5932 NameNode

5.3 往指定的队列中增加任务

修改WordCountJob代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package tipdm.mr;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

/**
* 指定队列名称
*/


public class WordCountJobQueue {
/**
* Map阶段
*/
public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
Logger logger = LoggerFactory.getLogger(MyMapper.class);
/**
* 需要实现map函数
* 这个map函数就是可以接收<k1, v1>, 产生<k2, v2>
* @param k1
* @param v1
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void map(LongWritable k1, Text v1, Context context)
throws IOException, InterruptedException {
// 输出k1, v1的值
// System.out.println("<k1, v1> = <" + k1.get() + "," + v1.toString() + ">");
logger.info("<k1, v1> = <" + k1.get() + "," + v1.toString() + ">");
//k1 代表的是每一行数据的行首偏移量,v1代表的是每一行内容
//对获取到的每一行数据进行切割,把单词切割出来
String[] words = v1.toString().split(" ");
//迭代切割出来的单词数据
for (String word : words){
//把迭代出来的单词封装成<k2,v2>的形式
Text k2 = new Text(word);
LongWritable v2 = new LongWritable(1L);
//把<k2,v2>写出去
context.write(k2, v2);
}
}
}

/**
* Reduce阶段
*/
public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
Logger logger = LoggerFactory.getLogger(MyMapper.class);
/**
* 针对<k2,{v2...}>的数据进行累加求和
* 并且最终把数据转化为k3,v3写出去
* @param k2
* @param v2s
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void reduce(Text k2, Iterable<LongWritable> v2s, Context context)
throws IOException, InterruptedException {
//创建一个sum变量,保存v2s的和
long sum = 0L;
//对v2s中的数据进行累加求和
for (LongWritable v2: v2s){
// 输出k2, v2的值
// System.out.println("<k2, v2> = <" + k2.toString() + "," + v2.get() + ">");
logger.info("<k2, v2> = <" + k2.toString() + "," + v2.get() + ">");
sum += v2.get();
}
// 组装k3,v3
Text k3 = k2;
LongWritable v3 = new LongWritable(sum);
// 输出k3, v3的值
// System.out.println("<k3, v3> = <" + k3.toString() + "," + v3.get() + ">");
logger.info("<k3, v3> = <" + k3.toString() + "," + v3.get() + ">");
// 把结果写出去
context.write(k3, v3);
}
}

/**
* 组装Job=Map+Reduce
*/
public static void main(String[] args) {
try{
//指定Job需要的配置参数
Configuration conf = new Configuration();

// 解析命令行中通过-D传递过来的参数,添加到conf中
String[] remainingArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

//创建一个Job
Job job = Job.getInstance(conf);

//注意了:这一行必须设置,否则在集群中执行的时候是找不到WordCountJob类的
job.setJarByClass(WordCountJobQueue.class);

//指定输入路径(可以是文件,也可以是目录)
FileInputFormat.setInputPaths(job, new Path(remainingArgs[0]));
//指定输出路径(只能指定一个不存在的目录)
FileOutputFormat.setOutputPath(job, new Path(remainingArgs[1]));

//指定map相关代码
job.setMapperClass(MyMapper.class);
//指定k2的类型
job.setMapOutputKeyClass(Text.class);
//指定v2的类型
job.setMapOutputValueClass(LongWritable.class);

//指定reduce相关代码
job.setReducerClass(MyReducer.class);
//指定k3类型
job.setOutputKeyClass(Text.class);
//指定v3类型
job.setOutputValueClass(LongWritable.class);

//提交job
job.waitForCompletion(true);

} catch (Exception e){
e.printStackTrace();
}
}

}

打包:

1
D:\IdeaProjects\dbhadoop>mvn clean package -DskipTests

上传到集群,然后在集群中运行jar。

1
[root@master hadoop-3.2.0]# hadoop jar dbhadoop-1.0-SNAPSHOT-jar-with-dependencies.jar  tipdm.mr.WordCountJobQueue -Dmapreduce.job.queuename=offline /test/hello.txt /out11

进去8088界面查看任务运行状态:

image-20211224175556445

可见任务在offline队列执行。

-------------本文结束感谢您的阅读-------------