YARN 框架概述

YARN产生和发展简史

Hadoop演进阶段

数据、程序、运算资源(内存、cpu)三者组在一起,完成了数据的计算处理过程。在单机环境下,这些都不是太大问题。为了应对海量数据的场景,Hadoop出现并提供了分而治之的分布式处理思想。通过对Hadoop版本演进的简单回顾,可以让我们知道YARN的产生和发展简史,洞悉YARN发展进程。

很多Hadoop的早期用户使用Hadoop的方式与在众多主机上运行桌面应用程序类似。

  • 在少量几个节点上手工建立一个集群;
  • 将数据载入Hadoop分布式文件系统(HDFS);
  • 通过运行MapReduce任务来运算并获得结果;
  • 然后拆掉集群。

这种方式的一部分原因是没有在Hadoop HDFS上持久存储数据的迫切需求,另一部分原因是没有共享数据和计算结果的动机。

阶段0:Ad Hoc集群

Ad Hoc应当理解为专用、特定的意思(数仓领域中常理解为即席查询)。Ad Hoc集群时代标志着Hadoop集群的起源,集群以Ad Hoc、单用户方式建立。

后来,随着私人集群的使用和Hadoop容错性的提高,持久的HDFS集群出现,并且实现了HDFS集群的共享,把常用和感兴趣的数据集载入HDFS共享集群中。当共享HDFS成为现实,还没实现共享的计算平台就成为关切对象。

不同于HDFS,为多个组织的多个用户简单设置一个共享MapReduce集群并非易事。尤其是集群下的物理资源的共享很不理想。

阶段1:HOD集群

为了解决集群条件下的多租户问题, Yahoo发展并且部署了称为“Hadoop on Demand”的平台。Hadoop On Demand (HOD)是一个能在大规模物理集群上供应虚拟Hadoop集群的系统。在已经分配的节点上, HOD会启动MapReduce和HDFS守护进程来响应用户数据和应用的请求。

img

HOD的主要特点是用户可以使用HOD来同时分配多个MapReduce集群。

HOD的缺点包括:无法支持数据本地化、资源回收效率低、无动态扩容缩容能力,多租户共享延迟高等。

阶段2:共享计算集群

共享MapReduce计算集群和与之协同工作的共享HDFS是Hadoop 1.x版本里的主要架构。

img

这种共享计算架构的主要组件如下所示:

JobTracker:一个中央守护进程,负责运行集群上的所有作业。

TaskTracker:系统里的从进程,根据JobTracker的指令来执行任务。

共享计算集群的主要弊端有JobTracker可扩展性瓶颈(JobTracker在内存中保存用户作业的数据)、JobTracker身兼多职(作业数据管理、作业状态记录、作业调度、)、可靠性和可用性欠缺(JobTracker单点故障)、计算模型的单一(不是所有问题都能MapReduce)。

并且MapReduce框架本身也经历了很多变化。但是MapReduce被绑定到了集群的管理层,证明MapReduce的变化演变是比较困难的。

阶段4:YARN集群

针对共享计算集群,JobTracker需要彻底地重写,才能解决扩展性的主要问题。但是,这种重写即使成功了,也不一定能解决平台和用户代码的耦合问题,也不能解决用户对非MapReduce编程模型的需求。如果不做重大的重新设计,集群可用性会继续被捆绑到整个系统的稳定性上。

YARN闪亮登场了,一款被设计用以解决以往架构的需求和缺陷的资源管理和调度软件。经过之前的发展,Hadoop走下了不少的弯路,甚至跳进了一些深坑。因此对于YARN的每个重大决策背后都有完整的惨痛的历史。

对YARN的需求

可扩展性:可以平滑的扩展至数万节点和并发的应用。

可维护性:保证集群软件的升级与用户应用程序完全解耦。

多租户:需要支持在同一集群中多个租户并存,同时支持多个租户间细颗粒度地共享单个节点。

位置感知:将计算移至数据所在位置。

高集群使用率:实现底层物理资源的高使用率。

安全和可审计的操作:继续以安全的、可审计的方式使用集群资源。

可靠性和可用性:具有高度可靠的用户交互、并支持高可用性

对编程模型多样化的支持:支持多样化的编程模型,需要演进为不仅仅以MapReduce为中心。

灵活的资源模型:支持各个节点的动态资源配置以及灵活的资源模型。

向后兼容:保持现有的MapReduce应用程序的向后兼容性。

YARN简介

img

Apache Hadoop YARN (Yet Another Resource Negotiator,另一种资源协调者)是一种新的 Hadoop 资源管理器,它是一个通用资源管理系统和调度平台,可为上层应用提供统一的资源管理和调度,它的引入为集群在利用率、资源统一管理和数据共享等方面带来了巨大好处。

可以把Hadoop YARN理解为相当于一个分布式的操作系统平台,而MapReduce等计算程序则相当于运行于操作系统之上的应用程序,YARN为这些程序提供运算所需的资源(内存、cpu等)。

YARN是一个分布式的资源管理系统,用以提高分布式的集群环境下的资源利用率,这些资源包括内存、IO、网络、磁盘等。其产生的原因是为了解决原MapReduce框架的不足。最初MapReduce的committer们还可以周期性的在已有的代码上进行修改,可是随着代码的增加以及原MapReduce框架设计的不足,在原MapReduce框架上进行修改变得越来越困难,所以MapReduce的committer们决定从架构上重新设计MapReduce,使下一代的MapReduce(MRv2/Yarn)框架具有更好的扩展性、可用性、可靠性、向后兼容性和更高的资源利用率以及能支持除了MapReduce计算框架外的更多的计算框架。

YARN与MRv1区别

由于 MRv1(第一代MapReduce)在扩展性、可靠性、资源利用率和多框架等方面存在明显不足, Apache 开始尝试对 MapReduce 进行升级改造,于是诞生了更加先进的下一代 MapReduce 计算框架 MRv2。

并且在MRv2中,将资源管理任务调度模块单独抽离出来,构建成了一个独立的通用资源管理系统 YARN,而MRv2则专注于数据的计算处理了。

img

MRv1 架构

在 Hadoop 1.0 中 MapReduce框架(MRv1,第一代MapReduce框架),和HDFS一样,MapReduce也是采用Master/Slave的架构,其架构如下图所示:

img

MapReduce 包含四个组成部分,分别为Client,JobTracker,TaskTracker,Task。

  • Client:客户端,每一个Job都会在用户端通过Client类,将应用程序以及参数配置Configuration打包成Jar文件存储在HDFS,并把路径提交到JobTracker的master服务,然后由master创建每一个Task(即MapTask和ReduceTask),将它们分发到各个TaskTracker服务中去执行。
  • JobTracker:管理主节点,JobTracker负责资源监控和作业调度。JobTracker监控所有的TaskTracker与job的健康状况,一旦发现失败,就将相应的任务转移到其它节点;同时JobTracker会跟踪任务的执行进度,资源使用量等信息,并将这些信息告诉任务调度器,而调度器会在资源出现空闲时,选择合适的任务使用这些资源。在Hadoop中,任务调度器是一个可插拔的模块,用于可以根据自己的需要设计相应的调度器。
  • TaskTracker:执行从节点,TaskTracker会周期性地通过HeartBeat将本节点上资源的使用情况和任务的运行进度汇报给JobTracker,同时执行JobTracker发送过来的命令 并执行相应的操作(如启动新任务,杀死任务等)。TaskTracker使用“slot”等量划分本节点上的资源量。“slot”代表计算资源(cpu,内存等) 。一个Task获取到一个slot之后才有机会运行,而Hadoop调度器的作用就是将各个TaskTracker上的空闲slot分配给Task使用。slot分为MapSlot和ReduceSlot两种,分别提供MapTask和ReduceTask使用。TaskTracker通过slot数目(可配置参数)限定Task的并发度。
  • Task:计算任务,Task分为MapTask和Reduce Task两种,均由TaskTracker启动。HDFS以固定大小的block为基本单位存储数据,而对于MapReduce而言,其处理单位是split。split是一个逻辑概念,它只包含一些元数据信息,比如数据起始位置、数据长度、数据所在节点等。它的划分方法完全由用户自己决定。但需要注意的是,split的多少决定了MapTask的数目,因为每一个split只会交给一个MapTask处理。

MRv1 缺陷

在 Hadoop 1.0 中, JobTracker 由资源管理(由 TaskScheduler 模块实现)和作业控制(由

JobTracker 中多个模块共同实现)两部分组成,Hadoop 对JobTracker 赋予的功能过多而造成负载过重。

img

Hadoop YARN 是在 MRv1 基础上演化而来的,它克服了 MRv1 中的各种局限性,概括为以下几个方面:

  • 扩展性差:在 MRv1 中, JobTracker 同时兼备了资源管理和作业控制两个功能,这成为系统的一个最大瓶颈,严重制约了 Hadoop 集群扩展性。
  • 可靠性差:MRv1 采用了 master/slave 结构,其中, master 存在单点故障问题,一旦它出现故障将导致整个集群不可用。
  • 资源利用率低: MRv1 采用了基于槽位的资源分配模型,槽位是一种粗粒度的资源划分单位,通常一个任务不会用完槽位对应的资源,且其他任务也无法使用这些空闲资源。此外, Hadoop 将槽位分为 Map Slot 和 Reduce Slot 两种,且不允许它们之间共享,常常会导致一种槽位资源紧张而另外一种闲置(比如一个作业刚刚提交时,只会运行 Map Task,此时 Reduce Slot 闲置)。
  • 无法支持多种计算框架:随着互联网高速发展, MapReduce 这种基于磁盘的离线计算框架已经不能满足应用要求,从而出现了一些新的计算框架,包括内存计算框架、流式计算框架和迭代式计算框架等,而 MRv1 不能支持多种计算框架并存。

YARN 架构

为了克服以上几个缺点, Apache 开始尝试对 Hadoop 进行升级改造,进而诞生了更加先进的下一代 MapReduce 计算框架 MRv2。正是由于 MRv2 将资源管理功能抽象成了一个独立的通用系统 YARN,直接导致下一代 MapReduce 的核心从单一的计算框架 MapReduce转移为通用的资源管理系统 YARN。

img

YARN 实际上是一个弹性计算平台,它的目标已经不再局限于支持MapReduce 一种计算框架,而是朝着对多种框架进行统一管理的方向发展。

YARN 与 MRv1 区别

Hadoop2.0即第二代Hadoop,由分布式存储系统HDFS、并行计算框架MapReduce和分布式资源管理系统YARN三个系统组成,其中YARN是一个资源管理系统,负责集群资源管理和调度,MapReduce则是运行在YARN上的离线处理框架,称为MRv2(MapReduce的第二版)。

img

MRv1 主要由编程模型(由新旧 API 组成)、数据处理引擎(由 MapTask 和ReduceTask 组成)和运行时环境(由一个 JobTracker 和若干个 TaskTracker 组成)三部分组成,为了保证编程模型的向后兼容性, MRv2 重用了 MRv1 中的编程模型和数据处理引擎,但运行时环境被完全重写,具体如下。

  • 编程模型与数据处理引擎:MRv2 重用了 MRv1 中的编程模型和数据处理引擎。

    • 为了能够让用户应用程序平滑迁移到 Hadoop 2.0 中, MRv2 应尽可能保证编程接口的向后兼容性,但由于 MRv2 本身进行了改进和优化,它在向后兼容性方面存在少量问题。
    • MapReduce 应用程序编程接口有两套,分别是新 API(mapredue)和旧 API(mapred) , MRv2 可做到以下兼容性 :采用 MRv1 旧 API 编写的应用程序,可直接使用之前的 JAR 包将程序运行在 MRv2 上;但采用 MRv1 新 API 编写的应用程序则不可以,需要使用 MRv2 编程库重新编译并修改不兼容的参数和返回值。
  • 运行时环境:MRv1 的运行时环境主要由两类服务组成,分别是 JobTracker 和TaskTracker。

    • JobTracker 负责资源和任务的管理与调度, TaskTracker 负责单个节点的资源管理和任务执行。 MRv1 将资源管理和应用程序管理两部分混杂在一起,使得它在扩展性、容错性和多框架支持等方面存在明显缺陷。
    • MRv2 则通过将资源管理和应用程序管理两部分剥离开,分别由 YARN 和 ApplicationMaster 负责,其中, YARN 专管资源管理和调度,而 ApplicationMaster 则负责与具体应用程序相关的任务切分、任务调度和容错等。

YARN 集群部署

Apache Hadoop YARN 一种开源的分布式资源管理和作业调度技术,它是作为Apache Hadoop 的核心组件之一,负责将系统资源(计算、存储和网络资源)分配给运行在Hadoop集群中的各种应用程序,并对运行在各集群节点上的任务进行调度。在生产环境中,通常采用分布式模式安装部署YARN集群。

集群角色

YARN集群是一个标准的Master/Slave 结构(主从结构),其中ResourceManager(RM) 为Master, NodeManager(NM) 为 Slave。常见的是一主多从集群,也可以搭建RM的HA高可用集群。

ResourceManager作为主节点,是集群所有可用资源的唯一仲裁者,通过NodeManage管理整个集群的资源,其核心职责是调度分配资源。NodeManage负责在每台具体的机器节点上管理资源。

img

集群规划

使用VMWare创建三台虚拟机,安装CentOS7.7 64位操作系统,设置主机名和IP地址,集群服务安装规划如下所示:

img

安装YARN集群时,相关软件版本说明如下所示,其中Hadoop版本为3.1.4,在CentOS7系统下编译并支持snappy压缩。

img

环境准备

在安装部署集群环境之前,首先对集群机器进行准备工作,具体如下说明。

软件目录结构

三台机器创建软件目录相关结构,如下所示:

img

  • 目录【/export/software】:软件压缩包存储
  • 目录【/export/server】:软件安装
  • 目录【/export/data】:测试数据

关闭防火墙

关闭三台机器防火墙,命令如下所示

sudo systemctl stop firewalld
sudo systemctl disable firewalld

禁用系统SELINUX

三台机器禁用SELINUX,命令如下:

sudo vim /etc/selinux/config

将【SELINUX=enforcing】修改为【SELINUX=disabled】,重启虚拟机使之生效。

2.3.4 配置时间同步

此处采用同步阿里时间服务器,在每台机器安装ntpdate,与阿里服务同步,命令如下:

sudo yum -y install ntp ntpdate 
ntpd  ntp.aliyun.com 

或则针对每台虚拟机,在VMWare中设置虚拟机与宿主机时间同步,截图如下:

img

IP与主机名映射

在三台机器上配置IP地址与主机名称映射,命令和内容如下:

vim /etc/hosts

增加内容如下:

192.168.88.100    node1.itcast.cn    node1 
192.168.88.101    node2.itcast.cn    node2 
192.168.88.102    node3.itcast.cn    node3

2.3.6 SSH 无密钥登录

配置三台机器之间SSH无密钥登录,方便后续集群服务启动,具体命令如下:

  • 生成密钥,
ssh-keygen -t rsa

然后一直回车,默认生成密钥文件。

  • 执行命令,将公钥复制到其余3台机器上

    ssh-copy-id node1.itcast.cn
    ssh-copy-id node2.itcast.cn
    ssh-copy-id node3.itcast.cn

安装JDK8

在三台机器上安装JDK8,安装之前先卸载OPEN JDK,相关命令如下:

rpm -qa|grep java
rpm -e --nodeps java-1.8.0-openjdk-xxx

上传JDK8软件包并解压至【/export/server】目录,在【/etc/profile】配置环境变量:

export JAVA_HOME=/export/server/jdk
export PATH=:$PATH:$JAVA_HOME/bin

执行如下命令生效:

source /etc/profile

HDFS 集群部署

在部署运行YARN集群之前,先配置HDFS集群,由于YARN集群其中后,运行应用程序时,默认情况下首先将应用程序相关信息(包括jar包、配置信息等等)上传至HDFS目录,同时应用运行时和完成时日志信息存储在HDFS。

step1、解压软件包

在node1.itcast.cn机器上解压配置HDFS集群,再分发软件包到其他机器。

[root@node1 ~]# cd /export/software/
[root@node1 software]# rz
[root@node1 software]# tar -zxf hadoop-3.1.4-bin-snappy-CentOS7.tar.gz -C /export/server/

解压完成以后,创建hadoop软连接,方便后续软件版本升级和管理。

[root@node1 ~]# cd /export/server/
[root@node1 server]# ln -s hadoop-3.1.4 hadoop
[root@node1 server]# ll
total 0
lrwxrwxrwx  1 root root  12 Feb 23 21:35 hadoop -> hadoop-3.1.4
drwxr-xr-x  9 root root 149 Nov  4 17:57 hadoop-3.1.4
lrwxrwxrwx. 1 root root  12 Sep  4 17:38 jdk -> jdk1.8.0_241
drwxr-xr-x. 7 root root 245 Dec 11  2019 jdk1.8.0_241

step2、配置环境变量

在Hadoop中,bin和sbin目录下的脚本、etc/hadoop下的配置文件,有很多配置项都会使用到HADOOP_*这些环境变量。如果仅仅是配置了HADOOP_HOME,这些脚本会从HADOOP_HOME下通过追加相应的目录结构来确定COMMON、HDFS和YARN的类库路径。

HADOOP_HOME:Hadoop软件的安装路径;
HADOOP_CONF_DIR:Hadoop的配置文件路径;
HADOOP_COMMON_HOME:Hadoop公共类库的路径;
HADOOP_HDFS_HOME:Hadoop HDFS的类库路径;
HADOOP_YARN_HOME:Hadoop YARN的类库路径;
HADOOP_MAPRED_HOME:Hadoop MapReduce的类库路径;

编辑【/etc/profile】文件,命令如下:

vim /etc/profile

添加如下内容:

export HADOOP_HOME=/export/server/hadoop
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export HADOOP_COMMON_HOME=$HADOOP_HOME
export HADOOP_HDFS_HOME=$HADOOP_HOME
export HADOOP_YARN_HOME=$HADOOP_HOME
export HADOOP_MAPRED_HOME=$HADOOP_HOME
export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin

执行如下命令生效:

source /etc/profile

备注:三台机器都要配置环境变量,便使之生效,方便后续直接使用命令。

step3、hadoop-env.sh

在Hadoop环境变量脚本配置JDK和HADOOP安装目录,命令和内容如下。

执行命令:

[root@node1 ~]# vim /export/server/hadoop/etc/hadoop/hadoop-env.sh

修改内容如下:

export JAVA_HOME=/export/server/jdk
export HADOOP_HOME=/export/server/hadoop

由于HADOOP 3开始,执行start-dfs.sh脚本启动HDFS服务时,默认使用非root普通用户,如果使用root用户启动,需要在【hadoop-env.sh】添加如下变量

export HDFS_NAMENODE_USER=root
export HDFS_DATANODE_USER=root 
export YARN_RESOURCEMANAGER_USER=root 
export YARN_NODEMANAGER_USER=root

step4、core-site.xml

配置Hadoop Common模块公共属性,编辑core-site.xml文件,命令和内容如下。

执行命令:

[root@node1 ~]# vim /export/server/hadoop/etc/hadoop/core-site.xml 

增加配置内容:

<property>
	<name>fs.defaultFS</name>
	<value>hdfs://node1.itcast.cn:8020</value>
</property>
<property>
	<name>hadoop.tmp.dir</name>
	<value>/export/server/hadoop/datas/tmp</value>
</property>
<property>
	<name>hadoop.http.staticuser.user</name>
	<value>root</value>
</property>

step5、hdfs-site.xml

配置HDFS分布式文件系统相关属性,具体命令和内容如下所示:

执行命令:

[root@node1 ~]# vim /export/server/hadoop/etc/hadoop/hdfs-site.xml 

增加配置内容:

<!-- 设定SNN运行主机和端口。-->
<property>
    <name>dfs.namenode.secondary.http-address</name>
    <value>node2.itcast.cn:9868</value>
</property>

step6、workers

配置HDFS集群中从节点DataNode所运行机器,,具体命令和内容如下所示:

执行命令:

[root@node1 ~]# vim /export/server/hadoop/etc/hadoop/workers 

增加配置内容:

node1.itcast.cn
node2.itcast.cn
node3.itcast.cn

step7、分发配置

将node1.itcast.cn上配置好HDFS,分发到node2.itcast.cn和node3.itcast.cn,命令如下:

[root@node1 ~]# cd /export/server/ 
[root@node1 server]# scp -r hadoop-3.1.4 root@node2.itcast.cn:$PWD
[root@node1 server]# scp -r hadoop-3.1.4 root@node3.itcast.cn:$PWD

分发完成以后,在node2.itcast.cn和node3.itcast.cn上创建软连接:

[root@node2 ~]# cd /export/server/
[root@node2 server]# ln -s hadoop-3.1.4 hadoop

[root@node3 ~]# cd /export/server/
[root@node3 server]# ln -s hadoop-3.1.4 hadoop

step8、格式化HDFS

第一次启动HDFS文件之前,先格式HDFS文件系统,命令如下:

[root@node1 ~]# hdfs namenode -format

step9、启动HDFS集群

在node1.itcast.cn上启动HDFS集群服务:NameNode和DataNodes,命令如下:

[root@node1 ~]# hdfs --daemon start namenode
[root@node1 ~]# hdfs --workers --daemon start datanode

查看各个机器上服务是否启动,命令如下:

[root@node1 ~]# for i in `seq 1 3`; do echo node$i.itcast.cn; ssh node$i.itcast.cn `which jps`; done

查看HDFS WEB UI,地址为:http://node1.itcast.cn:9870/

img

至此HDFS集群搭建部署完成,接下来部署YARN集群,运行启动以后运行MapReduce程序。

YARN 集群部署

在HDFS集群基础之上配置YARN集群,目前不考虑RM HA高可用,服务规划如下:

img

step1、yarn-site.xml

配置YARN集群相关属性,编辑文件和添加如下所示:

执行命令:

[root@node1 ~]# vim /export/server/hadoop/etc/hadoop/yarn-site.xml 

增加配置内容:

<property>
		<name>yarn.resourcemanager.hostname</name>
		<value>node2.itcast.cn</value>
	</property>
          <!-- 容器虚拟内存与物理内存之间的比率。-->
<property>
                     <name>yarn.nodemanager.vmem-pmem-ratio</name>
                     <value>4</value>
</property>
	<property>
		<name>yarn.nodemanager.aux-services</name>
		<value>mapreduce_shuffle</value>
	</property>
	<property>
		<name>yarn.log-aggregation-enable</name>
		<value>true</value>
	</property>
	<property>
		<name>yarn.log.server.url</name>
		<value>http://node1.itcast.cn:19888/jobhistory/logs</value>
	</property>

上述属性文件中配置RM所在主机名称、日志聚集功能及运行MapReduce程序插件服务。

step2、mapred-site.xml

Hadoop YARN集群运行以后,运行MapReduce程序,所以需要配置MapReduce框架运行YARN上时相关属性,编辑文件和添加内容如下:

执行命令:

[root@node1 ~]# vim /export/server/hadoop/etc/hadoop/mapred-site.xml 

增加配置内容:

	<property>
		<name>mapreduce.framework.name</name>
        <value>yarn</value>
        </property>
	<property>
		<name>mapreduce.application.classpath</name>
<value>$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*:$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/*</value>
	</property>

	<property>
		<name>mapreduce.jobhistory.address</name>
		<value>node1.itcast.cn:10020</value>
	</property>
	<property>
		<name>mapreduce.jobhistory.webapp.address</name>
		<value>node1.itcast.cn:19888</value>
	</property>

其中配置MapReduce运行环境YARN、历史服务器MRJobHistoryServer地址信息。此外,运行MapReduce在YARN上需要设置环境变量,在mapred-site.xml中添加如下内容:

<property>
	<name>yarn.app.mapreduce.am.env</name>
	<value>HADOOP_MAPRED_HOME=/export/server/hadoop</value>
</property>
<property>
	<name>mapreduce.map.env</name>
	<value>HADOOP_MAPRED_HOME=/export/server/hadoop</value>
</property>
<property>
	<name>mapreduce.reduce.env</name>
	<value>HADOOP_MAPRED_HOME=/export/server/hadoop</value>
</property>

step3、同步配置

将YARN配置文件【yarn-site.xml】和MapReduce配置文件【mapred-site.xml】同步到node2.itcast.cn和node3.itcast.cn机器Hadoop安装目录中,命令如下所示:

[root@node1 ~]# cd /export/server/hadoop/etc/hadoop
[root@node1 server]# scp -r yarn-site.xml root@node2.itcast.cn:$PWD
[root@node1 server]# scp -r mapred-site.xml root@node3.itcast.cn:$PWD

step4、启动YARN集群

在启动YARN集群之前,首先启动HDFS集群,再进行启动。在node2.itcast.cn上启动YARN服务组件,相关命令如下:

[root@node2 ~]# yarn --daemon start resourcemanager
[root@node2 ~]# yarn --workers --daemon start nodemanager

查看各个机器上服务是否启动,命令如下:

[root@node2 ~]# for i in `seq 1 3`; do echo node$i.itcast.cn; ssh node$i.itcast.cn `which jps`; done

查看YARN WEB UI界面,地址:http://node2.itcast.cn:8088/

img

step5、启动MR历史服务

启动MapReduce历史服务:MRJobHistoryServer,当MapReduce运行在YARN上完成以后,可以从历史服务查看MR运行状况,在node1.itcast.cn上启动,命令如下:

[root@node1 ~]# mapred --daemon start historyserver
[root@node1 ~]# jps
3091 NameNode
3909 JobHistoryServer
3735 NodeManager
3932 Jps
3245 DataNode

step6、运行词频统计

将官方提供词频统计WordCount程序运行在YARN集群上,首先准备数据,再提交运行。

  • 准备数据,命令如下:
[root@node1 ~]# vim input.data

内容如下:

hadoop hive hive spark flink
hadoop flink flink spark flink spark hdfs
hadoop flink flink spark
  • 数据文件上传HDFS:
[root@node1 ~]# hdfs dfs -mkdir -p /datas
[root@node1 ~]# hdfs dfs -put input.data /datas
  • 提交程序运行,命令如下:
[root@node1 ~]# yarn jar \
/export/server/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.4.jar \
wordcount /datas/input.data /datas/output

查看YARN WEB UI监控页面,可以看到提交运行MapReduce程序。

img

  • 查看MapReduce程序运行结果:
[root@node1 ~]#  hdfs dfs -ls /datas/output
Found 2 items
-rw-r--r--   3 root supergroup          0 2021-02-24 21:05 /datas/output/_SUCCESS
-rw-r--r--   3 root supergroup         39 2021-02-24 21:05 /datas/output/part-r-00000
[root@node1 ~]# 
[root@node1 ~]# hdfs dfs -text /datas/output/part*          
flink   6
hadoop  3
hdfs    1
hive    2
spark   4

当MapReduce程序运行在YARN上时,首先将程序jar包和相关配置信息上传到HDFS,针对上述应用存储目录如下图所示:

img

RM 重启机制

在YARN中,如果配置单点的ResourceManager或RM HA发生故障转移时,RM重启功能可使其在重新启动后仍能正常运行,且最终的RM在停机时对用户不可见。RM有两种重新启动类型:

  • 不保留工作的RM重新启动

  • 保留工作的RM重新启动

不保留工作的RM重启

在Hadoop-2.4.0版本实现,当Client提交一个application给RM时,RM会将该application的相关信息存储起来,具体存储的位置是可以在配置文件中指定的,可以存储到本地文件系统上,也可以存储到HDFS或者是Zookeeper上,此外RM也会保存application的最终状态信息(failed,killed,finished),如果是在安全环境下运行,RM还会保存相关证书文件。

当RM被关闭后,NodeManager(以下简称NM)和Client由于发现连接不上RM,会不断的向RM发送消息,以便于能及时确认RM是否已经恢复正常,当RM重新启动后,它会发送一条re-sync(重新同步)的命令给所有的NM和ApplicationMaster(以下简称AM),NM收到重新同步的命令后会杀死所有的正在运行的containers并重新向RM注册,从RM的角度来看,每台重新注册的NM跟一台新加入到集群中NM是一样的。

AM收到重新同步的命令后会自行将自己杀掉。接下来,RM会将存储的关于application的相关信息读取出来,将在RM关闭之前最终状态为正在运行中的application重新提交运行。

保留工作的RM重启

从Hadoop-2.6.0开始增强了RM重启功能,与不保留工作不同的地方在于,RM会记录下container的整个生命周期的数据,包括application运行的相关数据,资源申请状况,队列资源使用状况等数据。

当RM重启之后,会读取之前存储的关于application的运行状态的数据,同时发送re-sync的命令,与第一种方式不同的是,NM在接受到重新同步的命令后并不会杀死正在运行的containers,而是继续运行containers中的任务,同时将containers的运行状态发送给RM,之后,RM根据自己所掌握的数据重构container实例和相关的application运行状态,如此一来,就实现了在RM重启之后,紧接着RM关闭时任务的执行状态继续执行。

启用RM重启机制

在YARN用户配置文件:yarn-site.xml配置启用RM重启功能,使用Zookeeper进行转态数据存储,相关操作命令和属性内容如下。

执行命令:

[root@node1 ~]# vim /export/server/hadoop/etc/hadoop/yarn-site.xml 

增加配置内容:

<property>
	<name>hadoop.zk.address</name>
	<value>node1.itcast.cn:2181,node2.itcast.cn:2181,node3.itcast.cn:2181</value>
</property>

<property>
	<name>yarn.resourcemanager.recovery.enabled</name>
	<value>true</value>
</property>
<property>
	<name>yarn.resourcemanager.store.class</name>
	<value>org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore</value>
</property>

基于Zookeeper转态管理

如果启用了RM的重启机制,升级为Active状态的RM会初始化RM内部状态和恢复先前活动RM留下的状态,这依赖于RM的重启特性。而之前提交到RM托管的作业会发起新的尝试请求。用户提交的应用可以考虑进行周期性的CheckPoint来避免任务丢失。

RM的重启机制本质上是将RM内部的状态信息写入外部存储,在RM启动时会初始化状态信息的目录,当Application运行时会将相关的状态写入对应的目录下。如果RM发生故障切换,新的Active状态的RM会通过外部存储进行恢复。RM状态存储的实现是RMStateStore抽象类,YARN对RMStateStore提供了几种实例:

img

从类继承图可以看出提供5种方式存储状态,比对如下:

状态存储方式 说明
Memory MemoryRMStateStore是基于内存的状态存储实现,使用RMState对象存储RM所有的状态。
ZooKeeper ZKRMStateStore是基于ZooKeeper的状态存储实现,支持RM的HA,只有基于ZooKeeper的状态存储支持隔离机制,能避免出现裂脑情况发生,允许有多个处于Active状态的RM同时编辑状态存储。建议在YARN的HA中使用。
FileSystem FileSystemRMStateStore支持HDFS和基于本地FS的状态存储实现。不支持隔离机制。
LevelDB LeveldbRMStateStore是基于LevelDB的状态存储实现,它比基于HDFS和ZooKeeper的状态存储库更轻巧。LevelDB支持更好的原子操作,每个状态更新的I/O操作更少,文件系统上的文件总数也少得多。不支持隔离机制。
Null NullRMStateStore是一个空实现。

以ZKRMStateStore为例,活动RM的所有状态信息存储在ZooKeeper的rmstore/ZKRMStateRoot下,主要有ReservationSystemRoot、RMAppRoot、AMRMTokenSecretManagerRoot、EpochNode、RMDTSecretManagerRoot和RMVersionNode共6个ZNode。

img

这6个ZNode的结构和作用主要涵盖了RM资源预留信息、应用信息,应用的Token信息,RM版本信息,如下:

img

各种类型Znode节点说明如下:

ZNode名称 说明
ReservationSystemRoot RM的资源预留系统,对应的实现是ReservationSystem接口的子类。
RMAppRoot Application信息,对应的实现是RMApp接口的子类。
AMRMTokenSecretManagerRoot ApplicationAttempt的Token信息,RM会将每个Token保存在本地的内存中,直到应用程序运行完成为止,并保存到ZooKeeper存储以供重新启动。对应的实现是AMRMTokenSecretManager类。
EpochNode RM的保存工作重启的时间信息。每次RM重新启动时,纪元都会增加。它用于确保ContainerId的唯一性。对应的实现是Epoch抽象类。
RMDTSecretManagerRoot 一个特定于RM的委托令牌保密管理器。保密管理器负责生成和接受每个令牌的密码。
RMVersionNode RM的版本信息。

查看类【ZKRMStateStore】文档注释,可知不同ZNode节点功能

img

YARN HA 集群

ResourceManager(RM)负责管理群集中的资源和调度应用程序(如MR、Spark等)。在Hadoop 2.4之前,YARN群集中的ResourceManager存在SPOF(Single Point of Failure,单点故障)。为了解决ResourceManager的单点问题,YARN设计了一套Active/Standby模式的ResourceManager HA(High Availability,高可用)架构。在运行期间有多个ResourceManager同时存在来增加冗余进而消除这个单点故障,并且只能有一个ResourceManager处于Active状态,其他的则处于Standby状态,当Active节点无法正常工作,其余Standby状态的几点则会通过竞争选举产生新的Active节点。

高可用 HA 架构

ResourceManager的HA通过ActiveStandby体系实现,其底层通过ZooKeeper集群来存储RM的状态信息、应用程序的状态。如果Active状态的RM遇到故障,会通过切换Standby状态的RM为Active来继续为集群提供正常服务。

img

故障转移机制支持自动故障转移和手动故障转移两种方式实现。在生产环境中,自动故障转移应用更为广泛。

  • 第一种:手动故障转移

当没有启用自动故障转移时,管理员必须手动将一个RM转换为活动状态。要从一个RM到另一个RM进行故障转移,需要先把Active状态的RM转换为Standby状态的RM,然后再将Standby状态的RM转换为Active状态的RM。这些操作可用yarn rmadmin 命令来完成。

  • 第二种:自定故障转移

RM可以选择嵌入基于Zookeeper的ActiveStandbyElector(org.apache.hadoop.ha.ActiveStandbyElector类)来实现自动故障转移,以确定哪个RM应该是Active。当Active状态的RM发生故障或无响应时,另一个RM被自动选为Active,然后接管服务。YARN的故障转移不需要像HDFS那样运行单独的ZKFC守护程序,因为ActiveStandbyElector是一个嵌入在RM中充当故障检测器和Leader选举的线程,而不是单独的ZKFC守护进程。

当有多个RM时,Clients和NMs通过读取yarn-site.xml配置找到所有ResourceManager。Clients、AM和NM会轮训所有的ResourceManager并进行连接,直到找着Active状态的RM。如果Active状态的RM也出现故障,它们就会继续查找,直到找着新的Active状态的RM。

故障转移原理

YARN这个Active/Standby模式的RM HA架构在运行期间,会有多个RM同时存在,但只能有一个RM处于Active状态,其他的RM则处于Standby状态,当Active节点无法正常提供服务,其余Standby状态的RM则会通过竞争选举产生新的Active节点。以基于ZooKeeper这个自动故障切换为例,切换的步骤如下:

  • 主备切换,RM使用基于ZooKeeper实现的ActiveStandbyElector组件来确定RM的状态是Active或Standby。
  • 创建锁节点,在ZooKeeper上会创建一个叫做ActiveStandbyElectorLock的锁节点,所有的RM在启动的时候,都会去竞争写这个临时的Lock节点,而ZooKeeper能保证只有一个RM创建成功。创建成功的RM就切换为Active状态,并将信息同步存入到ActiveBreadCrumb这个永久节点,那些没有成功的RM则切换为Standby状态。
  • 注册Watcher监听,所有Standby状态的RM都会向/yarn-leader-election/cluster1/ActiveStandbyElectorLock节点注册一个节点变更的Watcher监听,利用临时节点的特性,能够快速感知到Active状态的RM的运行情况。
  • 准备切换,当Active状态的RM出现故障(如宕机或网络中断),其在ZooKeeper上创建的Lock节点随之被删除,这时其它各个Standby状态的RM都会受到ZooKeeper服务端的Watcher事件通知,然后开始竞争写Lock子节点,创建成功的变为Active状态,其他的则是Standby状态。
  • Fencing(隔离),在分布式环境中,机器经常出现假死的情况(常见的是GC耗时过长、网络中断或CPU负载过高)而导致无法正常对外进行及时响应。如果有一个处于Active状态的RM出现假死,其他的RM刚选举出来新的Active状态的RM,这时假死的RM又恢复正常,还认为自己是Active状态,这就是分布式系统的脑裂现象,即存在多个处于Active状态的RM,可以使用隔离机制来解决此类问题。
  • YARN的Fencing机制是借助ZooKeeper数据节点的ACL权限控制来实现不同RM之间的隔离。这个地方改进的一点是,创建的根ZNode必须携带ZooKeeper的ACL信息,目的是为了独占该节点,以防止其他RM对该ZNode进行更新。借助这个机制假死之后的RM会试图去更新ZooKeeper的相关信息,但发现没有权限去更新节点数据,就把自己切换为Standby状态。

安装 Zookeeper 集群

YARN HA高可用依赖于Zookeeper集群存储状态数据和自动故障转移,所以要配置安装部署Zookeeper集群,具体步骤如下:

step1、解压Zookeeper

上传Zookeeper软件至【/export/software】目录,解压至【/export/server】,命令如下:

[root@node1 ~]# cd /export/software/
[root@node1 software]# rz
[root@node1 software]# tar -zxf zookeeper-3.4.6.tar.gz -C /export/server/

解压完成以后,创建zookeeper软连接,方便后续软件版本升级和管理。

[root@node1 ~]# cd /export/server/
[root@node1 server]# ln -s zookeeper-3.4.6 zookeeper

step2、配置zoo.cfg

进入Zookeeper配置目录【conf】中,重命名【zoo_sample.cfg】为【 zoo.cfg】,添加配置内容,命令和内容如下所示:

执行命令:

[root@node1 ~]# cd /export/server/zookeeper/conf/
[root@node1 conf]# mv zoo_sample.cfg zoo.cfg
[root@node1 conf]# vim zoo.cfg  

增加配置内容:

tickTime=2000
initLimit=10
syncLimit=5
dataDir=/export/server/zookeeper/datas
clientPort=2181
server.1=node1.itcast.cn:2888:3888
server.2=node2.itcast.cn:2888:3888
server.3=node3.itcast.cn:2888:3888

创建数据存储目录,命令如下所示:

[root@node1 ~]# mkdir -p /export/server/zookeeper/datas

step3、创建myid文件

Zookeeper搭建集群时,需要在数据存储目录中创建【myid】文件,其中写入服务进程编号,需要与【zoo.cfg】配置相匹配对应。

  • node1.itcast.cn集群上,myid文件中值为1
[root@node1 ~]# echo "1" > /export/server/zookeeper/datas/myid
  • node2.itcast.cn集群上,myid文件中值为2
[root@node1 ~]# echo "2" > /export/server/zookeeper/datas/myid
  • node3.itcast.cn集群上,myid文件中值为3
[root@node1 ~]# echo "3" > /export/server/zookeeper/datas/myid

step4、配置zkEnv.sh

默认情况下Zookerper服务启动时,日志文件【zookeeper.out】在执行命令目录中生成,可以配置日志文件放在Zookeeper安装目录中【logs】目录下,如下操作:

执行命令:

[root@node1 ~]# vim /export/server/zookeeper/bin/zkEnv.sh

增加配置内容:

ZOO_LOG_DIR=/export/server/zookeeper/logs

创建日志目录,命令如下:

[root@node1 ~]# mkdir -p /export/server/zookeeper/logs

step5、分发配置

将node1.itcast.cn上配置Zookeeper安装软件,分发到node2.itcast.cn和node3.itcast.cn中,具体命令如下所示:

[root@node1 ~]# cd /export/server/
[root@node1 server]# scp -r zookeeper-3.4.6 root@node2.itcast.cn:$PWD
[root@node1 server]# scp -r zookeeper-3.4.6 root@node3.itcast.cn:$PWD

分发完成以后,在node2.itcast.cn和node3.itcast.cn上创建软连接:

[root@node2 ~]# cd /export/server/
[root@node2 server]# ln -s zookeeper-3.4.6 zookeeper

[root@node3 ~]# cd /export/server/
[root@node3 server]# ln -s zookeeper-3.4.6 zookeeper

step6、启动集群

由于Zookeeper官方没有提供启动Zookeeper集群命令,如果想一次性启动Zookeeper服务,需要自己编写脚本启动,执行如下命令:

for i in `seq 1 3`; do echo node$i.itcast.cn; ssh node$i.itcast.cn "source /etc/profile; /export/server/zookeeper/bin/zkServer.sh start"; done

启动完成以后,执行下面命令,查看集群状态:

for i in `seq 1 3`; do echo node$i.itcast.cn; ssh node$i.itcast.cn "source /etc/profile; /export/server/zookeeper/bin/zkServer.sh status"; done

HA 配置及测试

Zookeeper集群配置安装部署启动完成以后,可以参考官方文档配置YARN HA。

文档:http://hadoop.apache.org/docs/r3.1.4/hadoop-yarn/hadoop-yarn-site/ResourceManagerHA.html

注意:先关闭YARN集群,在node2.itcast.cn集群执行命令:stop-yarn.sh即可。

YARN HA高可用部署配置,node2.itcast.cn和node3.itcast.cn为RM服务节点。

img

step1、配置yarn-site.xml

对YARN配置文件进行修改,删除某些配置,如下步骤操作。

  • 编辑文件
[root@node1 ~]# vim /export/server/hadoop/etc/hadoop/yarn-site.xml 
  • 删除内容
<property>
		<name>yarn.resourcemanager.hostname</name>
		<value>node2.itcast.cn</value>
</property>
  • 添加内容
<property>
		<name>yarn.resourcemanager.ha.enabled</name>
		<value>true</value>
	</property>
	<property>
		<name>yarn.resourcemanager.cluster-id</name>
		<value>cluster1</value>
	</property>
	<property>
		<name>yarn.resourcemanager.ha.rm-ids</name>
		<value>rm1,rm2</value>
	</property>
	<property>
		<name>yarn.resourcemanager.hostname.rm1</name>
		<value>node2.itcast.cn</value>
	</property>
	<property>
		<name>yarn.resourcemanager.hostname.rm2</name>
		<value>node3.itcast.cn</value>
	</property>
	<property>
		<name>yarn.resourcemanager.webapp.address.rm1</name>
		<value>node2.itcast.cn:8088</value>
	</property>
	<property>
		<name>yarn.resourcemanager.webapp.address.rm2</name>
		<value>node3.itcast.cn:8088</value>
	</property>
	<property>
		<name>yarn.resourcemanager.ha.automatic-failover.enabled</name>
		<value>true</value>
	</property>
	<property>
		<name>hadoop.zk.address</name>
		<value>node1.itcast.cn:2181,node2.itcast.cn:2181,node3.itcast.cn:2181</value>
	</property>

	<property>
		<name>yarn.resourcemanager.recovery.enabled</name>
		<value>true</value>
	</property>
	<property>
		<name>yarn.resourcemanager.store.class</name>
	<value>org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore</value>
	</property>
  • 同步配置文件
[root@node1 ~]# cd /export/server/hadoop/etc/hadoop
[root@node1 hadoop]# scp -r yarn-site.xml root@node2.itcast.cn:$PWD
[root@node1 hadoop]# scp -r yarn-site.xml root@node3.itcast.cn:$PWD

step2、启动HA集群

当配置YARN为HA时,使用start-yarn.sh命令就会启动所有的RM,如下所示:

[root@node2 ~]# start-yarn.sh 
Starting resourcemanagers on [ node2.itcast.cn node3.itcast.cn]
Starting nodemanagers

查看各个机器服务进程,其中node2和node3启动ResourceManager服务。

img

此外可以使用YARN提供管理命令,查看RM运行状态

[root@node3 ~]# yarn rmadmin -getAllServiceState
node2.itcast.cn:8033                               standby   
node3.itcast.cn:8033                               active   
[root@node3 ~]# yarn rmadmin -getServiceState rm1
Standby

[root@node3 ~]# yarn rmadmin -getServiceState rm2
active

使用WebUI查看RM状态:

img

img

step3、验证故障切换

  • 查看HA状态

当node3.itcast.cn节点的RM为Active状态、node2.itcast.cn节点的RM为Standby状态时,访问http://node2.itcast.cn:8088会自动跳转到http://node3.itcast.cn:8088中,表示YARN HA正确配置。

  • 自动故障切换

强制杀死node3.itcast.cn节点的RM,基于ZooKeeper的ActiveStandbyElector自动故障转移策略将node2.itcast.cn节点的RM选举为Active状态,表示故障转移配置正确。

img

  • 手动故障切换

在非自动故障切换的YARN集群下进行手动故障切换可以使用命令进行故障转移切换。手动故障切换命令yarn rmadmin -failover rm1 rm2是rm1(node2.itcast.cn)故障转移到rm2(node3.itcast.cn)。

YARN 架构组件及原理

YARN(Yet Another Resource Negotiator,另一种资源协调者) 是 Hadoop 2.0 中的资源管理系统,它的基本设计思想是将 MRv1 中的 JobTracker拆分成了两个独立的服务 :一个全局的资源管理器 ResourceManager 和每个应用程序特有的ApplicationMaster。其中 ResourceManager 负责整个系统的资源管理和分配,而 ApplicationMaster负责单个应用程序的管理。

YARN 体系架构

YARN组件及功能

YARN 总体上仍然是 Master/Slave 结构(主从结构),在整个资源管理框架中, ResourceManager 为Master, NodeManager 为 Slave, ResourceManager 负责对各个 NodeManager 上的资源进行统一管理和调度。当用户提交一个应用程序时,需要提供一个用以跟踪和管理这个程序的ApplicationMaster,它负责向 ResourceManager 申请资源,并要求 NodeManger 启动可以占用一定资源的任务。由于不同的 ApplicationMaster 被分布到不同的节点上,因此它们之间不会相互影响。

img

上图描述了 YARN 的基本组成结构, YARN 主要由 ResourceManager、 NodeManager、ApplicationMaster(图中给出了 MapReduce 和 MPI 两种计算框架的 ApplicationMaster,分别为 MR AppMstr 和 MPI AppMstr)和 Container 等几个组件构成。

进程 描述 级别
ResourceManager 使用Scheduler(ResourceScheduler类)和ApplicationManager(RMAppManager类)分配群集资源。 守护进程
ApplicationMaster 通过指示NodeManager创建或销毁Application的Container来管理Application的生命周期。一个Application只有一个ApplicationMaster进程。 用户进程
NodeManager 通过在群集节点中创建和销毁容器来管理特定节点中的作业或工作流。 守护进程
Container Container是Yarn对计算资源的抽象,它其实是一组CPU和内存资源,所有的应用都会运行在Container中。 用户进程

ResourceManager

RM(ResourceManager) 是一个全局的资源管理器,负责整个系统的资源管理和分配。它主要由两个组件构成:调度器(Scheduler)和应用程序管理器(Applications Manager, ASM)。

  • 调度器(Scheduler):根据容量、队列等限制条件(如每个队列分配一定的资源,最多执行一定数量的作业等),将系统中的资源分配给各个正在运行的应用程序。
    • 需要注意的是,该调度器是一个“纯调度器”,它不再从事任何与具体应用程序相关的工作,比如不负责监控或者跟踪应用的执行状态等,也不负责重新启动因应用执行失败或者硬件故障而产生的失败任务,这些均交由应用程序相关的 ApplicationMaster 完成。
    • 调度器仅根据各个应用程序的资源需求进行资源分配,而资源分配单位用一个抽象概念“资源容器”(Resource Container,简称 Container)表示, Container 是一个动态资源分配单位,它将内存、 CPU、磁盘、网络等资源封装在一起,从而限定每个任务使用的资源量。
    • 此外,该调度器是一个可插拔的组件,用户可根据自己的需要设计新的调度器, YARN 提供了多种直接可用的调度器,比如 FairScheduler 和 Capacity Scheduler 等。
  • 应用程序管理器(Applications Manager):负责管理整个系统中所有应用程序,包括应用程序提交、与调度器协商资源以启动 ApplicationMaster、监控 ApplicationMaster 运行状态并在失败时重新启动它等。

ApplicationMaster

用户提交的每个应用程序均包含一个 AM,主要功能包括:

  • 与 RM 调度器协商以获取资源(用 Container 表示);
  • 将得到的任务进一步分配给内部的任务;
  • 与 NM 通信以启动 / 停止任务;
  • 监控所有任务运行状态,并在任务运行失败时重新为任务申请资源以重启任务。

当前YARN自带了两个AM实现,一个是用于演示AM编写方法的例程序distributedshell,它可以申请一定数目的 Container 以并行运行一个 Shell 命令或者 Shell 脚本 ;另一个是运行 MapReduce 应用程序的 AM—MRAppMaster,此外,一些其他的计算框架对应的 AM 正在开发中,比如 Spark、Flink 等。

NodeManager

NM(NodeManager) 是每个节点上的资源和任务管理器,一方面,它会定时地向 RM 汇报本节点上的资源使用情况和各个 Container 的运行状态;另一方面,它接收并处理来自 AM 的 Container启动 / 停止等各种请求。

Container

Container 是 YARN 中的资源抽象,它封装了某个节点上的多维度资源,如内存、CPU、磁盘、网络等,当 AM 向 RM 申请资源时, RM 为 AM 返回的资源便是用 Container表示的。 YARN 会为每个任务分配一个 Container,且该任务只能使用该 Container 中描述的资源。需要注意的是, Container 不同于 MRv1 中的 slot,它是一个动态资源划分单位,是根据应用程序的需求动态生成的。当下, YARN 仅支持 CPU 和内存两种资源,且使用了轻量级资源隔离机制 Cgroups 进行资源隔离 。

img

YARN支持各种数据处理引擎对HDFS中的数据进行批处理、交互式和流处理。在不同的场景中使用不同的框架,常见的包括MapReduce、Spark、Storm和Flink等Application。这种架构可以更好、更优雅地进行扩展。因此从一开始就内置了高可用性、安全性和多租户支持更多用户在大型集群上使用,新架构还将提高创新性,敏捷性和硬件利用率。

img

此外,YARN提供以下功能:

  • 多租户:可以使用多个开放源代码和专有数据访问引擎来批量、交互式和实时访问同一数据集。多租户数据处理可提高企业在Hadoop投资上的回报。
  • Docker容器化:可以使用Docker容器化来并行运行同一应用程序的多个版本。
  • 集群利用率:可以动态分配群集资源以提高资源利用率。
  • 多种资源类型:可以使用多种资源类型,例如CPU和内存。
  • 可扩展性:提高了数据中心的处理能力。YARN的ResourceManager仅专注于调度,并在集群扩展到管理数PB数据的数千个节点时保持同步。
  • 兼容性:Hadoop 1.x的MapReduce应用程序可在YARN上运行,而不会破坏现有流程。YARN与Hadoop的先前稳定版本保持API兼容性。

YARN 通信协议

RPC 协议是连接各个组件的“大动脉”,了解不同组件之间的 RPC 协议有助于更深入地理解学习 YARN 框架。在 YARN 中,任何两个需相互通信的组件之间仅有一个 RPC 协议,而对于任何一个 RPC 协议,通信双方有一端是 Client,另一端为 Server,且 Client 总是主动连接 Server 的,因此, YARN 实际上采用的是拉式(pull-based)通信模型。

img

如上图所示,箭头指向的组件是 RPC Server,而箭头尾部的组件是 RPC Client, YARN 主要由以下几个 RPC 协议组成 :

  • JobClient(作业提交客户端 )与 RM 之间的协议 —ApplicationClientProtocol :JobClient 通过该 RPC 协议提交应用程序、查询应用程序状态等。
  • Admin(管理员)与 RM 之间的通信协议—ResourceManagerAdministrationProtocol:Admin 通过该 RPC 协议更新系统配置文件,比如节点黑白名单、用户队列权限等。
  • AM 与 RM 之间的协议—ApplicationMasterProtocol : AM 通过该 RPC 协议向 RM注册和撤销自己,并为各个任务申请资源。
  • AM 与 NM 之 间 的 协 议 —ContainerManagementProtocol : AM 通 过 该 RPC 要 求NM 启动或者停止Container,获取各个 Container 的使用状态等信息。
  • NM 与 RM 之间的协议—ResourceTracker: NM 通过该 RPC 协议向 RM 注册,并定时发送心跳信息汇报当前节点的资源使用情况和 Container 运行情况。

为了提高 Hadoop 的向后兼容性和不同版本之间的兼容性, YARN 中的序列化框架采用了 Google 开源的 Protocol Buffers。

YARN 工作流程

运行在Hadoop YARN 上的应用程序主要分为两类 :短应用程序和长应用程序。

  • 短应用程序:指一定时间内(可能是秒级、分钟级或小时级,尽管天级别或者更长时间的也存在,但非常少)可运行完成并正常退出的应用程序,比如 MapReduce 作业、 Spark 作业等;
  • 长应用程序:指不出意外,永不终止运行的应用程序,通常是一些服务,比如 Storm Service(主要包括 Nimbus 和 Supervisor 两类服务), Flink(包括 JobManager和 TaskManager两类服务) 等,而它们本身作为一个框架提供了编程接口供用户使用。

尽管这两类应用程序作用不同,一类直接运行数据处理程序,一类用于部署服务(服务之上再运行数据处理程序),但运行在 YARN 上的流程是相同的。

当用户向 YARN 中提交一个应用程序后, YARN 将分两个阶段运行该应用程序 :第一个阶段是启动 ApplicationMaster ;第二个阶段是由 ApplicationMaster 创建应用程序,为它申请资源,并监控它的整个运行过程,直到运行完成。

img

如上图所示, YARN 的工作流程分为以下几个步骤:

  • 第1步、用户向YARN中提交应用程序,其中包括ApplicationMaster程序、启动ApplicationMaster 的命令、用户程序等。
  • 第2步、ResourceManager 为该应用程序分配第一个Container,并与对应的 NodeManager通信,要求它在这个 Container 中启动应用程序的 ApplicationMaster。
  • 第3步、ApplicationMaster 首先向 ResourceManager 注册,这样用户可以直接通过ResourceManage 查看应用程序的运行状态,然后它将为各个任务申请资源,并监控它的运行状态,直到运行结束,即重复步骤 4~7。
  • 第4步、ApplicationMaster 采用轮询的方式通过 RPC 协议向 ResourceManager 申请和领取资源。
  • 第5步、一旦 ApplicationMaster 申请到资源后,便与对应的 NodeManager 通信,要求它启动任务。
  • 第6步、NodeManager 为任务设置好运行环境(包括环境变量、 JAR 包、二进制程序等)后,将任务启动命令写到一个脚本中,并通过运行该脚本启动任务。
  • 第7步、各个任务通过某个 RPC 协议向 ApplicationMaster 汇报自己的状态和进度,以让 ApplicationMaster 随时掌握各个任务的运行状态,从而可以在任务失败时重新启动任务。在应用程序运行过程中,用户可随时通过 RPC 向 ApplicationMaster 查询应用程序的当前运行状态。
  • 第8步、应用程序运行完成后,ApplicationMaster 向 ResourceManager 注销并关闭自己。

YARN 正是一个资源管理系统,它的出现弱化了计算框架之争,引入 YARN 这一层后,各种计算框架可各自发挥自己的优势,并由 YARN 进行统一管理,进而运行在一个大集群上。截至本书出版时,各种开源系统都在开发 YARN 版本,包括 MapReduce、 Spark、 Storm、 Flink等。

img