Yarn相关笔记

简介

概述

Yarn是在MRv1基础上演化而来,MRv1有以下局限性

  • 扩展性差:JobTracker同时兼备了资源管理和作业控制,严重制约了hadoop集群扩展性
  • 可靠性低:MRv1采用Master/Slave结构,其中Master存在单点故障
  • 资源利用率低:MRv1基于槽位粗粒度资源分配模型
  • 无法支持多种计算框架: MapReduce属于磁盘的离线计算框架以及不能满足应用要求,包括内存计算、流式计算和迭代式计算框架等,而MRv1不能支持多种计算框架并存

所以MRv2讲资源管理功能抽象成一个独立的通用系统Yarn

  • 以MapReduce为核心,Yarn是可插拔替换的,比如选择Mesos替换Yarn
  • 以Yarn为核心,Mapreduce和Spark实现了Yarn资源管理系统的接口,即可打造多计算框架的Hadoop生态

MRv1

MapReduce 1.0计算框架由三部分组成, 编程模型,数据处理引擎和运行时环境

编程模型: Map和Reduce两个阶段

  • Map阶段将输入数据解析成Key/Value,迭代调用map()函数处理后,再以Key/Value形式输出到本地目录
  • Reduce阶段将Key相同的value进行规约处理,并将最终结果写到Hdfs

MRv2

MRv2具有与MRv1相同的编程模型和数据处理引擎, 唯一不同的运行时环境

MRv2的运行时环境不再是由JobTracker和TaskTracker等服务组成的,而运行于资源管理框架的Yarn计算框架的MapReduce

计算平台

  • 离线处理MapReduce
  • 在线处理Storm
  • 迭代式计算框架Spark
  • 流失计算框架S4


基本组成

从 YARN 的架构图来看,它主要由ResourceManager、NodeManager、ApplicationMaster和Container等以下几个组件构成。

ResourceManager(RM)

YARN 分层结构的本质是 ResourceManager。这个实体控制整个集群并管理应用程序向基础计算资源的分配。ResourceManager 将各个资源部分(计算、内存、带宽等)精心安排给基础 NodeManager(YARN 的每节点代理)。ResourceManager 还与 ApplicationMaster 一起分配资源,与 NodeManager 一起启动和监视它们的基础应用程序。在此上下文中,ApplicationMaster 承担了以前的 TaskTracker 的一些角色,ResourceManager 承担了 JobTracker 的角色。

总的来说,RM有以下作用

  • 1)处理客户端请求

  • 2)启动或监控ApplicationMaster

  • 3)监控NodeManager

  • 4)资源的分配与调度

ApplicationMaster(AM)

ApplicationMaster 管理在YARN内运行的每个应用程序实例。ApplicationMaster 负责协调来自 ResourceManager 的资源,并通过 NodeManager 监视容器的执行和资源使用(CPU、内存等的资源分配)。请注意,尽管目前的资源更加传统(CPU 核心、内存),但未来会带来基于手头任务的新资源类型(比如图形处理单元或专用处理设备)。从 YARN 角度讲,ApplicationMaster 是用户代码,因此存在潜在的安全问题。YARN 假设 ApplicationMaster 存在错误或者甚至是恶意的,因此将它们当作无特权的代码对待。

总的来说,AM有以下作用

  • 与 RM 调度器协商以获取资源(用 Container 表示);
  • 将得到的任务进一步分配给内部的任务;
  • 与 NM 通信以启动 / 停止任务;
  • 监控所有任务运行状态,并在任务运行失败时重新为任务申请资源以重启任务。

此外,一些其他的计算框架对应的 AM 正在开发中,比如 Open MPI、Spark 等

NodeManager(NM)

NodeManager管理YARN集群中的每个节点。NodeManager 提供针对集群中每个节点的服务,从监督对一个容器的终生管理到监视资源和跟踪节点健康。MRv1 通过插槽管理 Map 和 Reduce 任务的执行,而 NodeManager 管理抽象容器,这些容器代表着可供一个特定应用程序使用的针对每个节点的资源。

总的来说,NM有以下作用

  • 1)管理单个节点上的资源

  • 2)处理来自ResourceManager的命令

  • 3)处理来自ApplicationMaster的命令

Container

Container 是 YARN 中的资源抽象,它封装了某个节点上的多维度资源,如内存、CPU、磁盘、网络等,当AM向RM申请资源时,RM为AM返回的资源便是用Container表示的。YARN会为每个任务分配一个Container,且该任务只能使用该Container中描述的资源。

总的来说,Container有以下作用

  • 1)对任务运行环境进行抽象,封装CPU、内存等多维度的资源以及环境变量、启动命令等任务运行相关的信息

要使用一个 YARN 集群,首先需要一个包含应用程序的客户的请求。ResourceManager 协商一个容器的必要资源,启动一个 ApplicationMaster 来表示已提交的应用程序。通过使用一个资源请求协议,ApplicationMaster 协商每个节点上供应用程序使用的资源容器。执行应用程序时,ApplicationMaster 监视容器直到完成。当应用程序完成时,ApplicationMaster 从 ResourceManager 注销其容器,执行周期就完成了。

通过上面的讲解,应该明确的一点是,旧的 Hadoop 架构受到了 JobTracker 的高度约束,JobTracker 负责整个集群的资源管理和作业调度。新的 YARN 架构打破了这种模型,允许一个新 ResourceManager 管理跨应用程序的资源使用,ApplicationMaster 负责管理作业的执行。这一更改消除了一处瓶颈,还改善了将 Hadoop 集群扩展到比以前大得多的配置的能力。此外,不同于传统的 MapReduce,YARN 允许使用MPI( Message Passing Interface) 等标准通信模式,同时执行各种不同的编程模型,包括图形处理、迭代式处理、机器学习和一般集群计算。


YARN的工作流程

YARN 的作业运行,主要由以下几个步骤组成

作业提交

client 调用job.waitForCompletion方法,向整个集群提交MapReduce作业 (第1步) 。 新的作业ID(应用ID)由资源管理器分配(第2步). 作业的client核实作业的输出, 计算输入的split, 将作业的资源(包括Jar包, 配置文件, split信息)拷贝给HDFS(第3步). 最后, 通过调用资源管理器的submitApplication()来提交作业(第4步).

作业初始化

当资源管理器收到submitApplciation()的请求时, 就将该请求发给调度器(scheduler), 调度器分配container, 然后资源管理器在该container内启动应用管理器进程, 由节点管理器监控(第5步).

MapReduce作业的应用管理器是一个主类为MRAppMaster的Java应用. 其通过创造一些bookkeeping对象来监控作业的进度, 得到任务的进度和完成报告(第6步). 然后其通过分布式文件系统得到由客户端计算好的输入split(第7步). 然后为每个输入split创建一个map任务, 根据mapreduce.job.reduces创建reduce任务对象.

任务分配

如果作业很小, 应用管理器会选择在其自己的JVM中运行任务。

如果不是小作业, 那么应用管理器向资源管理器请求container来运行所有的map和reduce任务(第8步). 这些请求是通过心跳来传输的, 包括每个map任务的数据位置, 比如存放输入split的主机名和机架(rack). 调度器利用这些信息来调度任务, 尽量将任务分配给存储数据的节点, 或者分配给和存放输入split的节点相同机架的节点.

任务运行

当一个任务由资源管理器的调度器分配给一个container后, 应用管理器通过联系节点管理器来启动container(第9步). 任务由一个主类为YarnChild的Java应用执行. 在运行任务之前首先本地化任务需要的资源, 比如作业配置, JAR文件, 以及分布式缓存的所有文件(第10步). 最后, 运行map或reduce任务(第11步).

YarnChild运行在一个专用的JVM中, 但是YARN不支持JVM重用.

进度和状态更新

YARN中的任务将其进度和状态(包括counter)返回给应用管理器, 客户端每秒(通过mapreduce.client.progressmonitor.pollinterval设置)向应用管理器请求进度更新, 展示给用户。

作业完成

除了向应用管理器请求作业进度外, 客户端每5分钟都会通过调用waitForCompletion()来检查作业是否完成. 时间间隔可以通过mapreduce.client.completion.pollinterval来设置. 作业完成之后, 应用管理器和container会清理工作状态, OutputCommiter的作业清理方法也会被调用. 作业的信息会被作业历史服务器存储以备之后用户核查.


Yarn通信协议

  • JobClient(作业提交客户端)与 RM 之间的协议 — ApplicationClientProtocol : JobClient 通过该 RPC 协议提交应用程序、查询应用程序状态等。
  • Admin(管理员)与 RM 之间的通信协议— ResourceManagerAdministrationProtocol : Admin 通过该 RPC 协议更新系统配置文件,比如节点黑白名单、用户队列权限等。
  • AM 与 RM 之间的协议— ApplicationMasterProtocol :AM 通过该 RPC 协议向 RM 注册和撤销自己,并为各个任务申请资源。
  • AM 与 NM 之间的协议 — ContainerManagementProtocol :AM 通过该 RPC 要求 NM 启动或者停止 Container,获取各个 Container 的使用状态等信息。
  • NM 与 RM 之间的协议— ResourceTracker :NM 通过该 RPC 协议向 RM 注册,并 定时发送心跳信息汇报当前节点的资源使用情况和 Container 运行情况。



Yarn基本架构

应用设计之法

用户编写客户端和ApplicationMaster两个组件
其中客户端负责向ResourceManager提交ApplicationMaster并查询应用程序的运行状态
ApplicationMaster负责向ResourceManager神器资源(以Container形式表示),并与NodeManager通信以启动各个Container
ApplicationMaster还得监控各个人物运行状态,并在失败时为其重新申请资源




Client设计

设计出三个RPC协议

  • ApplicationClientProtocol:用于Client与ResourceManager之间
  • ApplicationMasterProtocol:用于ApplicationMaster与ResourceManager之间
  • ContainerManagementProtocol:用于ApplicationMaster与NodeManager之间

Client与RM

Client与ResourceManager通信

  • Client通过RPC函数ApplicationClientProtocol#getNewApplication从ResourceManager中获取唯一的applicationId
  • Client通过RPC函数applicationClientProtocol#submitApplication将ApplicationMaster提交到ResourceManager上

之后就通过以下RPC相互交流任务情况





ApplicationMaster设计

ApplicationMaster(AM),ResourceManager(RM),NodeManager(NM)

AM与RM通信

ApplicationMaster与ResourceManager通信流程

  • ApplicationMaster通过RPC函数ApplicationMasterProtocol#registerApplicationMaster向ResourceManager注册

  • ApplicaitonMaster通过RPC函数ApplicationMasterProtocol#allocate向ResourceManager申请资源(以Container形式标识)

  • ApplicationMaster通过RPC函数ApplicationMasterProtocol#finishApplicationMaster告诉ResourceManager应用程序执行完毕,并退出

ApplicationMaster将重复步骤2,不断为应用程序申请资源,直到资源得到满足或整个应用程序运行完成

ApplicationMaster与ResourceManager交互部分做成通用的编程库

两者交互的核心逻辑均有AMRMClientImpl和AMRMClientAsync实现

AMRMClientImpl是阻塞式实现即每个函数执行完成之后才返回

AMRMClientAsync则是基于AMRMClientImpl的非阻塞式实现,ApplicationMaster触发任何一个操作后,AMRMClientAsync讲它封装成事件放入事件队列后便返回,而事件的处理是由一个专门的线程池完成,从而达到了异步非阻塞

用户自己实现的ApplicationMaster则需要实现回调类AMRMClientAsync.CallbackHandler




AM与NM通信

ApplicationMaster与NodeManager通信流程

  • ApplicationMaster将申请到的资源二次分配给内容的任务,并通过RPC函数ContainerManagementProtocol#startContainer与对应的NodeManager通信以启动Container

  • 为了实时掌握各个Container运行状态,ApplicationMaster可通过RPC函数ContainerManagementProtocol#getContainerStatus向NodeManager询问Container运行状态,一旦发现某个Container运行失败,ApplicationMaster可尝试重新为对应的任务申请资源

  • 一旦一个Container运行完成后,ApplicationMaster可通过RPC函数ContainerManagementProtocol#stopContainer释放Container

    1
    2
    3
    4
    注意,Yarn是一个资源管理系统,不仅负责分配资源,还负责回收资源
    当一个Container运行完成后,它会主动确认Container是否将对应的资源释放了
    也就是说任何一个Container运行结束后
    ApplicationMaster必须调用RPC函数(ContainerManagerProtocol#stopContainer)释放Container

ApplicationMaster与NodeManager交互的核心逻辑均有NMClientImpl和NMClientAsync实现

其中NMClientImpl是阻塞式实现,即每个函数执行完成之后返回

NMClientAsync则是基于NMClientImpl的非阻塞式实现,ApplicationMaster触发任何一个操作后,NMClientAsync将封装成事件放入事件队列后便返回

自己实现的ApplicationMaster则需要实现回调类NMClientAsync.CallbackHandler




Yarn自带的AMs(除MRAM之外)

DistributedShell 使用案例

1
2
3
4
bin/hadoop jar \
share/hadoop/yarn/hadoop-yarn-application-distributeshell-*.jar \
org.apache.hadoop.yarn.applications.distributeshell.client
[COMMAND_OPTIONS]

Unmanaged AM

在Yarn中,一个ApplicationMaster需要占用一个Container, 该Container可能位于任意的NodeManager上

给ApplicationMaster调试带来很大麻烦,所以引入Unmanaged AM,这种AM运行在客户端

不再由ResourceManager启动和销毁,用户只需在提交应用程序设置参数,Yarn便运行将ApplicationMaster运行在客户端的一个单独进程中

1
2
3
4
bin/hadoop jar \
share/hadoop/yarn/hadoop-yarn-applications-unmanaged-am-launcher-*.jar \
org.apache.hadoop.yarn.applications.unmanagedamlauncher.unmanagedAMLauncher
[COMMAND_OPTIONS]




Resource Manager

Resource Manager内部架构

Resource Manager相关的PRC协议

Resource Manager事件交互图


ApplicationMaster管理

主要由三个服务构成,分别是ApplicationMasterLauncher,AMLiveLinessMonitor和ApplicationMasterService

共同管理应用程序的ApplicationMaster的生存周期

组件功能

  • ApplicationMasterLauncher

既是服务也是事件处理器, 处理AMLauncherEvent类型的事件,分别请求启动一个ApplicationMaster的”LAUNCH”事件和请求清理一个ApplicationMaster的”CLEANUP”事件

  • AMLiveLinessMonitor

该服务周期性遍历所有应用程序的ApplicationMaster,如果一个ApplicationMaster在一定时间内未汇报心跳信息,则认为离线

它上面所有正在运行的Container被置为运行失败

如果AM运行失败,则由RM重新为它申请资源,以便能够重新分配到另一个节点上

  • ApplicationMasterService

主要负责处理ApplicationMaster的请求,请求包括:注册、心跳、清理三种

其中注册是ApplicationMaster启动时发生的行为,包含AM所在节点,RPC端口号和trackingURL等信息

心跳是周期性行为,包含请求资源的类型描述符,待释放的Container列表等

清理是应用程序运行结束时发送的行为,ApplicationMaster向RM发送清理应用程序请求,以回收资源和清理各种内存空间

整个步骤体系

  • 用户向Yarn RM提交应用程序,RM收到提交请求后,先向资源调度器申请用以启动AM的资源,待申请资源后,再由ApplicationMasterLauncher与对应的NodeManager通信,从而启动应用程序的ApplicationMaster

  • AM启动完成后,ApplicationMasterLauncher会通过事件的形式,将刚刚启动的AM注册到AMLiveLinessMonitor,以启动心跳监控

  • AM启动后,先向ApplicationMasterService注册,并将自己所在的host、端口号等信息汇报给它

  • AM运行过程中,周期性地向ApplicationMasterService汇报心跳(“心跳”信息中包含想要申请的资源描述)

  • ApplicationMasterService每次收到AM的心跳信息后,将通知AMLiveLinessMonitor更新该应用程序最近汇报的心跳时间

  • 当应用程序运行完成后,ApplicationMaster向ApplicationMasterService发送请求,注销自己

  • ApplicationMasterService收到注销请求后,标注应用程序运行状态为完成,同时通知AMLiveLinessMonitor移除对它的心跳监控




NodeManager管理

NodeManager管理部分分三个服务构成,NMLiveLinessMonitor,NodesListManager,ResourceTrackerService

共同管理nodeManager的生存周期




Resource Manager状态机

RMContainer 状态机

资源调度器

ResourceManager中的一个插拔式服务组件,负责整个集群资源的管理和分配

MRv1仅采用了简单的FIFO调度机制分配任务

Yarn资源调度直接从MRv1基础上修改而来,提供了三种可用资源调度

  • FIFO
  • Capacity Scheduler
  • Facebook的Fair Scheduler

调度器背景

初期Hadoop仅提供了非常简单的调度机制,FIFO先来先服务,在该调度机制下,所有作业呗统一提交到一个队列中,Hadoop按照提交顺序依次来运行这些作业

现在随着用户量越来越大,不同用户提交的应用程序往往具有不同的服务质量要求(QOS)

  • 批处理,这类耗时长,如数据挖掘和机器学习
  • 交互式作业,希望及时返回结果,如SQL查询(hive)等
  • 生产性作业,有一定量的资源保障,如统计值计算、垃圾数据分析等

作为事件处理器

YARN资源管理器实际上是事件处理器,需要处理来自外部6种Scheduler-EventType类型的时间

NodeManager

附录