image-20220228204507178

Apache Hadoop YARN (Yet Another Resource Negotiator,另一种资源协调者)是一种新的 Hadoop 资源管理器,它是一个通用资源管理系统和调度平台,可为上层应用提供统一的资源管理和调度,它的引入为集群在利用率、资源统一管理和数据共享等方面带来了巨大好处。

YARN主要由ResourceManager和NodeManager组成,ResourceManager负责资源的管理与分配,NodeManager则负责具体资源的隔离。YARN中,资源使用container进行封装。

image-20220228204538122

用户如何编写 YARN 应用并提交运行,整个开发流程是怎么样的,及源码跟踪MapReduce程序提交运行YARN集群。

YARN 应用开发流程

YARN的应用开发主要过程如下:

image-20220228204631481

用户在YARN上开发应用时,需要实现如下三个模块:

  • 模块一、Application Client:应用客户端用于将应用提交到YARN上,使应用运行在YARN上,同时,监控应用的运行状态,控制应用的运行;
  • 模块二、Application Master:AM负责整个应用的运行控制,包括向YARN注册应用、申请资源、启动容器等,应用的实际工作在容器中进行;
  • 模块三、Application Worker:应用的实际工作,并不是所有的应用都需要编写worker。NodeManager启动AM发送过来的容器,容器内部封装了该应用worker运行所需的资源和启动命令;

实现上述模块,涉及如下3个RPC协议:

  • ApplicationClientProtocol: Client-RM之间的协议,主要用于应用的提交;
  • ApplicationMasterProtocol: AM-RM之间的协议,AM通过该协议向RM注册并申请资源;
  • ContainerManagementProtocol: AM-NM之间的协议,AM通过该协议控制NM启动容器。

image-20220228204702024

上述协议的定义在hadoop-yarn-api工程中,如下图所示:

image-20220228204709525

从业务的角度看,一个应用需要分两部分进行开发,一个是接入YARN平台,实现上述3个协议,通过YARN实现对集群资源的访问和利用;另一个是业务功能的实现,这个与YARN本身没有太大关系。下面主要阐述如何将一个应用接入YARN平台。

客户端开发

客户端的主要作用是提交(部署)应用和监控应用运行两个部分,开发流程如下图所示:

image-20220228204753408

从上图可以看出,客户端的主要作用是提交(部署)应用和监控应用运行两个部分。

提交应用

提交应用涉及ApplicationClientProtocol协议中的两个方法:

  • 方法一:GetNewApplicationResponse getNewApplication(GetNewApplicationRequest request)
    • 从RM上获取全局唯一的应用ID和最大可申请的资源量(内存和虚拟CPU核数)
  • 方法二:SubmitApplicationResponse submitApplication(SubmitApplicationRequest request)
    • 在获取应用程序ID后,客户端封装应用相关的配置到ApplicationSubmissionContext中,通过submitApplication方法提交到RM上

具体步骤如下:

  • 步骤1:Client通过RPC函数ApplicationClientProtocol#getNewApplication从ResourceManager中获取唯一的Application ID。
  • 步骤2:Client通过RPC函数ApplicationClientProtocol#submitApplication【所有信息都封装在这个参数里】将ApplicationMaster提交到ResourceManager上。
  • 步骤3:RM根据ApplicationSubmissionContext上封装的内容启动AM。
  • 步骤4:客户端通过AM或RM获取应用的运行状态,并控制应用的运行过程。

监控应用运行状态

应用监控涉及ApplicationClientProtocol协议中的如下几个方法:

  • 强制杀死一个应用
    • KillApplicationResponse forceKillApplication(KillApplicationRequest request)
  • 获取应用状态,如进度等
    • GetApplicationReportResponse getApplicationReport(GetApplicationReportRequest request)
  • 获取集群度量
    • GetClusterMetricsResponse getClusterMetrics(GetClusterMetricsRequest request)
  • 获取符合条件的应用的状态(列表)
    • etApplicationsResponse getApplications(GetApplicationsRequest request)
  • 获取集群中各个节点的状态
    • GetClusterNodesResponse getClusterNodes(GetClusterNodesRequest request)
  • 获取RM中的队列信息
    • GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request)
  • 还包括获取用户权限列表和访问权限等方法。

客户端既可以从RM上获取应用的信息,也可以通过AM获取。通常为了减少RM的压力,使用从AM获取应用运行状态的方式。客户端与AM之间的通信使用应用内部的私有协议,与YARN无关。

AppMaster 开发

AM的主要功能是按照业务需求,从RM处申请资源,并利用这些资源完成业务逻辑。因此,AM既需要与RM通信,又需要与NM通信,涉及两个协议,分别是AM-RM协议

(ApplicationMasterProtocol)和AM-NM协议(ContainerManagementProtocol),如下图所示:

image-20220228205129138

AppMaster与ResourceManager 交互

AM-RM之间使用ApplicationMasterProtocol协议进行通信,该协议提供如下几个方法:

  • 向RM注册AM:

    • RegisterApplicationMasterResponse registerApplicationMaster(RegisterApplicationMasterRequest request)
  • 告知RM,应用已经结束

    • FinishApplicationMasterResponse finishApplicationMaster(FinishApplicationMasterRequest request)
  • 向RM申请/归还资源,维持心跳

    • AllocateResponse allocate(AllocateRequest request)
  • 客户端向RM提交应用后,RM会根据提交的信息,分配一定的资源来启动AM,AM启动后调用ApplicationMasterProtocol协议的registerApplicationMaster方法主动向RM注册。

    完成注册后,AM通过ApplicationMasterProtocol协议的allocate方法向RM申请运行任务的资源,获取资源后,通过ContainerManagementProtocol在NM上启动资源容器,完成任务。

    应用完成后,AM通过ApplicationMasterProtocol协议的finishApplicationMaster方法向RM汇报应用的最终状态,并注销AM。

image-20220228205241056

需要注意的是,ApplicationMasterProtocol#allocate()方法还兼顾维持AM-RM心跳的作用,因此,即便应用运行过程中有一段时间无需申请任何资源,AM都需要周期性的调用相应该方法,以避免触发RM的容错机制。具体看一下每一步所传递的信息:

1、AM向RM注册

  • AM启动后会主动调用registerApplicationMaster方法向RM注册,注册信息中包括该AM所在节点和开放的RPC服务端口,以及一个应用状态跟踪Web接口(将在RM的Web页面上显示)。
  • RM向AM返回一个对象,里面包含了应用最大可申请的单个容器容量、应用访控制列表和一个用于与客户端通信的安全令牌。

2、AM向RM申请资源

  • AM通过allocate方法向RM申请或释放资源。AM向RM发送的信息被封装在AllocateRequest里;
  • RM接受到AM的请求后,扫描其上的资源镜像,按照调度算法分配全部或部分申请的资源给AM,返回一个AllocateResponse对象;

3、AM通知RM应用已结束

  • 在应用完成后,AM通知RM应用结束的消息,同时向RM提供应用的最终状态(成功/失败等)、一些失败时的诊断信息和应用跟踪地址,RM收到通知后注销相应的AM,并将注销结果发送给AM,AM收到注销成功的消息后,退出进程。
  • AM通过调用ApplicationMasterProtocol#finishApplicationMaster方法通知RM

AppMaster与NodeManager 交互

AM通过ContainerManagementProtocol协议与NM交互,包括3个方面的功能:启动容器、查询容器状态、停止容器,分别对应协议中的三个方法:

查询容器状态、停止容器,分别对应协议中的三个方法:

  • 启动容器

    • StartContainersResponse startContainers(StartContainersRequest request)
  • 查询容器状态

    • GetContainerStatusesResponse getContainerStatuses(GetContainerStatusesRequest request)
  • 停止容器

    • StopContainersResponse stopContainers(StopContainersRequest request)

      AM-NM交互过程如图:

image-20220228205540652

  • AM在NM上启动容器
    • AM通过ContainerManagementProtocol# startContainers()方法启动一个NM上的容器,AM通过该接口向NM提供启动容器的必要配置,包括分配到的资源、安全令牌、启动容器的环境变量和命令等,这些信息都被封装在StartContainersRequest中。
    • NM收到请求后,会启动相应的容器,并返回启动成功的容器列表和失败的容器列表,同时还返回其上相应的辅助服务元数据。
  • AM查询NM上的容器运行状态
    • 在应用运行期间,AM需要实时掌握各个Container的运行状态,以便及时响应一些异常,如容器运行失败等。
    • AM通过ContainerManagementProtocol# getContainerStatuses ()方法获取各个容器的运行状态。
  • AM停止NM上的容器
    • 当一个容器运行完成后,分配给它的资源需要被回收。
    • AM通过ContainerManagementProtocol# stopContainers()方法停止NM上的容器,释放相关资源,然后通过AM-RM协议,将释放的资源上报给RM,RM完成最终的资源回收。

YARN 编程库开发应用

YARN上的应用开发分为平台接入和业务开发两个部分,其中平台接入就是实现上述三个RPC协议。直接实现上述协议的开发难度较高,需要处理很多细节和性能问题,如系统并发等。为此,YARN提供了一套应用程序编程库来简化应用的开发过程,该编程库是基于事件驱动机制的,利用了YARN内部的服务库、事件库和状态机库,分为三个部分,与上述三个协议一一对应。

YARN 基础库

在YARN基础库中分为服务库、事件库和状态机库,具体说明如下。

服务库

YARN中普遍采用基于服务的对象管理模型,将一些生命周期较长的对应服务化,YARN提供一套抽象的接口对服务进行了统一描述,该服务具有如下特点:

  • 具有标准状态,所有服务都具有4个状态,NOTINITED、INITED、STARTED、STOPPED;
  • 状态驱动,服务状态变化将触发一些动作,使其转变成另一种状态;
  • 服务嵌套,一个服务可以由其他服务组合嵌套而来;

image-20220228205748860

事件库

YARN中大量采用了基于事件驱动的并发模型,该模型由事件、异步调度器和事件处理器三个模块组成。处理请求被抽象为事件,放入异步调度器的事件队列中,调度线程从事件队列中取出事件分发给不同的事件处理器,事件处理器处理事件,产生新的事件放入事件队列,如此循环,直到处理完成(完成事件)。

image-20220228205808683

状态机库

YARN中使用{转换前状态、转换后状态、事件、回调函数}四元组来表示一个状态变换,一个或多个事件的到来,触发绑定在对象上状态转移函数,使对象的状态发生变化。状态机使得事件处理变得简单可控。

image-20220228205907879

总的来说,YARN中的服务由一个或多个含有有限状态机的事件处理系统组成,总体框架如下。

image-20220228205917939

YARN 编程库

YARN 应用客户端库

YARN的Client-RM编程库位于org.apache.hadoop.yarn.client.YarnClient(Hadoop-yarn-api项目),该库实现了通用的ApplicationClientProtocol协议,提供了重试机制。用户利用该库可以快速开发YARN应用的客户端程序,而不需要关心RPC等底层接口。

image-20220228205951983

用户开发自己的应用客户端时,只要设置好ApplicationSubmissionContext对象,调用YarnClient的相关接口,即可实现应用的提交。

AM-RM 编程库

AM-RM编程库主要简化了AM向RM申请资源过程的开发。YARN提供了两套AM-RM编程库,分别为阻塞式和非阻塞式模式。

image-20220228210010435

其中,AMRMClient是阻塞式的,实现了ApplicationMasterProtocol协议,用户调用该类的相应接口,可实现AM与RM的通信。而AMRMClientAsync是AMRMClient的非阻塞式封装,所有响应通过回调函数的形式返回给用户,用户实现自己的AM时,只需要实现AMRMClientAsync的CallbackHandler即可。

NM 编程库

NM编程库对AM和RM与NM之间的交互进行了封装,同样有阻塞式和非阻塞式两种封装(AM与NM和RM与NM的交互逻辑相似)。

image-20220228210032477

同样的,对于异步编程库NMClientAsync,用户只需要在自己的AM上实现相应的回调函数,就可以控制NM上Container的启动/停止和状态监控了。

总得来说,YARN是一个资源管理平台,并不涉及业务逻辑,具体的业务逻辑需要用户自己去实现。YARN的核心作用就是分配资源、保证资源隔离。

YARN 源码解析

为了更全面和更加有意义的分析Hadoop Yarn源码,从一个作业提交到到作业生命周期结束的角度来分析Yarn的源码,将会屏蔽MapReduce部分过程,因为现在的重点是研究Yarn的源码。

YARN 应用运行流程

首先回顾,提交MapReduce程序到YARN集群运行机制,如下图所示:

image-20220228210119615

执行流程步骤如下:

  • 第1步、 MapReduce 程序提交到客户端所在的节点,使用yarn jar命令提交运行;

    [root@node1 ~]# yarn jar \
    /export/server/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.4.jar \
    wordcount /datas/input.data /datas/output
  • 第2步、Client客户端向ResourceManager申请运行一个Application

    image-20220228210208241

  • 第3步、当RM接收到请求后,生成应用Application资源提交路径hdfs://…./.staging以及application_id,并返回客户端Client;

    image-20220228210231549

  • 第4步、Client客户端提交job运行所需资源到资源提交路径

    image-20220228210253238

如下图所示,运行MapReduce时,上传Job资源至HDFS目录

image-20220228210307550

  • 第5步、当Client客户端资源提交完毕,申请运行mrAppMaster。

    image-20220228210319185

  • 第6步、当ResourceManager接收请求以后,将用户的请求初始化成一个Task,将其放到队列中(Apache Hadoop YARN默认使用CapacityScheduler容器调度器,将Task任务放入某个队列Queue中,等待后续执行)。

    image-20220228210344068

  • 第7步、当NodeManager中有资源时,ResourceManager向其发送指令,此时NodeManager领取到Task任务,准备启动容器运行AppMaster。

  • 第8步、NodeManager创建容器Contanier,容器中包含相关资源(比如CPU、内存Memory等),在其中运行MRAppMaster。

    image-20220228210416541

  • 第9步、MRAppMaster启动以后,依据资源提交路径,下载job资源到本地(也就是MRAppMaster所运行NodeManager节点临时目录中)。

  • 第10步、MRAppMaster获取Job运行资源信息以后,计算此MapReduce任务运行所需要的MapTask任务个数,再向ResourceManager申请资源,创建MapTask容器Contanier。
  • 第11步、ResourceManager接收到MRAppMaster请求后,将这些Task任务同样放到队列Queue中,当NodeManager中有资源时,ResourceManager依然向NodeManager发送指令,领取到Task任务,创建容器Contanier。
  • 第12步、当NodeManager中容器Contanier创建完成以后,MRAppMaster将运行MapTask任务的程序脚本发送给Contanier容器,最后在容器Contanier中启动YarnChild进程运行MapTask任务,处理数据。
  • 第13步、当MapTask任务运行完成以后,MRAppMaster再向ResourceManager申请资源,在NodeManager中创建ReduceTask任务运行的容器,启动YarnChild进程运行ReduceTask任务,拉取MapTask处理数据,进行聚合操作。
  • 第14步、当MapReduce应用程序运行完成以后,向ResourceManager注销自己,释放资源,至此整个应用运行完成。

第一阶段:Client提交应用至YARN

以入门程序:WordCount 作业为例,执行程序main方法,核心代码:

image-20220228210529749

当运行程序,执行到MAIN方法中如下代码时:

image-20220228210538017

如果提交运行YARN集群,则最终调用【YARNRunner#submitJob】方法:

image-20220228210558410

向YARN提交Job时,主要流程示意图如下所示:

image-20220228210606893

第一步、JobSubmitter(Job 提交)

MapReduce程序最后执行【job.waitForCompletion】方法时,表示应用提交执行等待完成,提交应用时,方法调用链如下所示:

waitForCompletion -> submit -> submitter.submitJobInternal -> submitJob
  • Job#waitForCompletion 方法

image-20220228210703726

  • Job#submit方法

image-20220228210723583

  • JobSubmmiter#submitJobInternal方法

image-20220228210741193

第二步、createApplicationSubmissionContext(创建应用上下文)

submitClient为ClientProtocol实例对象,有2个实现子类:本地模式运行和YARN集群运行:

image-20220228210814135

  • submitClient#submitJob 调用,选择YARN集群运行

image-20220228210837286

  • YARNRunner#submitJob 方法

image-20220228210853725

  • YARNRunner#createApplicationSubmissionContext方法

image-20220228210909823

构建MR AppMaster运行环境,主要包括:

1)、设置本地资源:Job 配置文件、Job Jar包及提交运行工作目录等

image-20220228210918343

2)、设置容器启动上下文:启动AppMaster进程java命令和运行日志存储等

image-20220228210927597

3)、应用提交上下文设置:比如设置应用ID和运行队列Queueu等

image-20220228210935800

第三步、RMAppManager#submitApplication(提交应用)

在YARNRunner#submitJob方法中,应用提交上下文构建完成后,进行应用提交。

image-20220228211000950

  • ResourceMgrDelegate#submitApplication 方法

image-20220228211118921

  • YARNClientImpl#submitApplication方法

image-20220228211138031

rmClient是一个ApplicationClientProtocol类对象,这是一个RPC的接口协议,对应的实现类ApplicationClientProtocolPBClientImpl。

  • 客户端 ApplicationClientProtocolPBClientImpl#submitApplication方法

    此时ApplicationClientProtocolPBServiceImpl类属于客户端Client实现类,所在包为:org.apache.hadoop.yarn.api.impl.pb.client。

image-20220228211242110

RPC Client端proxy.submitApplication()对应的RPC Server端的方法数为: ApplicationClientProtocolPBServiceImpl#submitApplication(),是对称的关系,都实现了ApplicationClientProtocalPB接口。

  • 服务端 ApplicationClientProtocolPBServiceImpl#submitApplication 方法

    此时ApplicationClientProtocolPBServiceImpl类属于服务端Service实现类,所在包为:org.apache.hadoop.yarn.api.impl.pb.service。

image-20220228211312867

ApplicationClientProtocolPBServiceImpl类对象由ClientRMService构建,属于RM端的服务类,专门用于服务Client,包括Client的作业提交,作业查询等服务。

  • lientRMService#submitApplication方法

    img

    由ClientRMService.submitApplication()直接把作业交给RMAppManager类的对象 rmAppManager进行提交,这也是作业最终上岸了,接下来就是RM的事情。

作业提交调用层次

客户端Client进行作业提交时,分为Client端和Service服务端2个层次:

  • 客户端层次流程图

image-20220228211425137

  • 服务端层次流程图

image-20220228211436768

第二阶段:YARN启动AppMaster

MapReduce作业提交已经到达ResourceManager端,并且交给RMAppManager进行继续运转,将此应用当做任务提交到队列Queue中,开始执行MRAppMaster任务,流程图如下。

image-20220228211524852

从MRAppMaster类中main查看,启动MRAppMaster进程流程步骤。

  • MRAppMaster#main 方法

image-20220228211546575

  • MRAppMaster#initAndStartAppMaster 方法

image-20220228211620856

第一步、AppMaster 初始化

ResourceManager在启动AppMaster之前,先对AppMaster服务进行初始化操作。

  • 初始化AbstractServie#init方法

image-20220228211645239

  • 启动 MRAppMaster#serviceInit 方法

image-20220228211704754

当MRAppMaster初始化完成以后,开始启动MRAppMaster进程服务。

第二步、AppMaster 启动

  • 启动 MRAppMaster#start 方法

image-20220228211736929

  • 启动 MRAppMaster#serviceStart 方法

image-20220228211757937

  • MRAppMaster#startJobs方法

image-20220228211814959

从分发器Dispatcher(实例为AysncDispatcher异步分发器)中获取事件处理器EventHandler,处理器的实例对象GenericEventHandler(通用事件处理器),调用handle方法启动Job作业执行。

  • GenericEventHandler#handle 方法

image-20220228211949104

最后将Job作业存放到队列Queue中,对于Apache Hadoop YARN来说,默认使用Capacity Scheduler容量调度器的default队列中,等待调度执行。

第三阶段:调度执行应用进程

任何一个应用提交运行至YARN集群,首先为应用启动AppMaster,当启动完成以后,为每个应用启动应用进程,调度任务Task执行,其中不同应用对应的应用进程不一样。

image-20220228212010034

针对MapReduce应用提交运行YARN上来说,当MRAppMaster启动以后,计算整个Job的MapTask和ReduceTask数量,然后向ResourceManager申请资源,运行Task任务。无论运行MapTask还是ReduceTask,都是YarnChild中执行Task,运行流程图:

image-20220228212026426

查看YarnChild类中 main 方法,核心源码:

image-20220228212033192

Task类2个实现子类:MapTask和ReduceTask,查看其中run方法,如何执行任务。

image-20220228212041258

第一步、MapTask 任务执行

查看MapTask 任务中run方法,主要判断是否是MapReduce New API编写程序,如果是的话直接调用:runNewMapper方法,运行MapTask任务。

image-20220228212112281

  • MapTask#runNewMapper 方法

image-20220228212129430

  • Mapper#run 方法

image-20220228212142530

第二步、ReduceTask 任务执行

在MapReduce计算引擎中,先运行MapTask处理每个Split分片数据,当完成以后告知MRAppMaster主节点,接着通知所有ReduceTask到MapTask输出目录拉取所属自己文件数据。

接下来,查看ReduceTask类中run方法,核心执行流程。

image-20220228212207282

  • ReduceTask#runNewReducer 方法

image-20220228212231035

  • Reducer#run 方法

image-20220228212245577

当ReduceTask运行完成后,将数据输出到外部存储引擎(比如HDFS文件系统),告知MRAppMaster。MRAppMaster等到所有ReduceTask任务运行完成后,向ResourceManager发送信息,要求ResourceManager注销自己,释放资源,以便其他应用运行使用,至此一个MapReduce应用程序运行YARN集群完成。