Hadoop-Yarn学习

Hadoop-Yarn学习

0x01 基本概念

Yarn全名Yet Another Resource Negotiator,即资源协调/管理者,在Hadoop2中引入。

1.1 Yarn是什么

Yarn,英文全名是 Yet Another Resource Negotiator,是由雅虎开发的第二代集群资源调度器。查看论文点这里。Yarn在大数据体系中的示意图如下:
Yarn
而应用层在Application层之上,如Hive等。

和第一代不同,Yarn对各个角色进行了重新抽象。Yarn把JobTracker划分为了管理集群资源的ResourceManager(以下简称RM)和管理集群上运行任务的生命周期的AppMaster(以下简称AM)。此外,还有一个负责管理上报所在节点资源、响应处理AM的任务启停请求的角色NodeManager(以下简称NM)。

  • 基本的思路
    AM向RM申请资源(Container),RM调度分配Container后,App拆分后的task在container上运行。NM监控该节点的Container,确保App使用的资源不会超过配额。

1.2 Yarn的架构

Yarn架构
如上图所示,Yarn的架构里包括以下角色:

1.2.1 Client客户端

负责向Yarn提交App作业

1.2.2 Resource Manager-资源管理器

每个Yarn集群一个RM(还可以设一个standBy节点做HA),负责整个集群的计算资源的管理和分配,RM主要由以下两部分组成

  • 调度器(Scheduler)
    调度器根据容量、队列等限制条件,将系统中的资源(container)分配给各个正在运行的App;不负责具体应用程序相关的工作,比如监控或跟踪状态(AM负责);不负责重新启动失败任务(AM负责)。调度器是一个可拔插的组件,YARN提供了多种直接可用的调度器,比如Fair Scheduler和Capacity Scheduler等,用户还可以编写符合规范的自定义调度器。
  • 应用程序管理器(ASM)
    管理提交到RM的所有App。体现到代码里主要是
    org.apache.hadoop.yarn.server.resourcemanager.RMAppManager

1.2.3 Node Manager-节点管理器

每个Yarn集群中有多个NM,每个NM有多个Container用于Container的启动和监测(每个NM上可能有多个Container)
注:根据Yarn配置的不同,Container可能是一个Unix进程或者一个Linux cgroup实例,在受限的资源范围内(如内存、CPU等)执行特定应用程序的代码。
NM主要功能如下:

  • 启动和监视节点上的计算容器(Container)
  • 以心跳的形式向RM汇报本节点上的资源使用情况和各个Container的运行状态
  • 接收并处理来自AM的Container启动/停止等各种请求

1.2.4 App Master-应用程序管理器

协调、管理App的所有task。AM和task都运行在container中, container资源由RM调度分配, 由NM管理
AM 主要功能如下:

  • 为App向RM申请所需Container资源
  • 将得到的资源进一步分配给内部的任务
  • 与NM通信以启动/停止Container容器(请求带有这些信息:资源配额、安全性令牌(如果已启用)、启动Container的命令、进程环境、必要的二进制文件/ jar 等)、获取Container状态信息
  • 监控App作业的所有task运行状态,并在task运行失败时重新为其申请资源以重跑

1.2.5 Container-资源的抽象

Container是资源的抽象,他表示资源分配单位(CPU,内存等)。Container是一个动态资源分配单位,可以是UNIX进程或Linux cgroup,具体决定于Yarn配置(),它内部封装了内存、CPU、磁盘、网络等资源,用cgroup等方法限定每个task的资源量。Container由RM调度, 由NM管理。

注意:Container不同于MRv1中的slot,它是一个动态资源划分单位,是根据应用程序的需求动态生成的。

0x02 Yarn作业原理

2.1 概述

Yarn作业运行流程

  1. Client提交一个应用到RM,申请第一个Container来运行AM
  2. RM找到合适的NM,NM在Container容器内启动AM
  3. AM将资源申请的请求放在随NM发往RM的心跳中
  4. RM通知NM启动Container来运行应用程序

注意,需要注意的是,用户应用程序间各个组件的通信必须自己实现,Yarn没有提供通信工具。

2.2 作业本地性要求

在AM为tasks向RM申请Container时,该请求包括每个map task的数据本地化信息,特别是输入split所在的节点和机架(rack)信息。调度器会使用这些信息来进行调度决策。调度器会优先将task分配到所需数据所在节点执行,这就是所谓数据本地化策略,避免了远程访问数据的各种开销。

如果不能分配,那就会根据用户设置的RelaxLocality(本地化松弛,默认为true)来将task分配到本地化节点同机架的其他节点上。

比如申请处理HDFS数据的Container时,先考虑拥有该数据block副本的节点,如果没有再考虑这些副本所在节点同机架的其他节点,如果还是没有只能申请任意节点。

2.3 申请资源方式

  • 在最开始就申请所有需要的资源
    典型的就是Spark,申请固定的vcores, 内存,启动固定数量的Exector
  • 根据变化的需求来动态申请资源
    典型的是MapReduce,最开始申请map task容器,此时还没有申请reduce task容器。
  • 失败恢复
    当有task失败时,会申请额外的容器来重跑失败的task。

2.4 MR On Yarn

2.4.1 MR作业运行流程

先说明下,每次提交Application的作业就是job,一般包括多个任务(task)。一个MapReduce job 包括多个map task和 reduce task。

我们这里引用《Hadoop权威指南》中一张十分经典的的MR Job运行在Yarn中的流程图:
MapReduce Job在Yarn内运行流程

下面说下详细步骤:

  1. 客户端进程启动job

  2. 向RM发出请求,获取一个代表此job的全局唯一appID

  3. Client 检查job的输出说明,计算输入分片数,并将job资源(包括job的jar、Configuration、分片信息等)复制到HDFS内,以便执行task时拉取。

  4. Client向ResourceManager提交job。此提交请求中包含一个ApplicationSubmissionContext,他包括了RM为该App启动AM的所有信息:

    • App ID
    • App User
    • App Name
    • App 优先级
    • ContainerLaunchContext,他包括NM启动Container所需信息,包括 containerId、Container索要的资源量、user、安全token、启动container的本地资源依赖(如binary jar等)、运行环境变量、运行命令等
    • maxAppAttempts AM尝试的最大重试次数
    • attemptFailuresValidityInterval 故障计数时间间隔
  5. 该Job的AppMaster启动,分为两个步骤:
    5a. RM首先为AppMaster在某个NM上分配、启动一个Container
    5b. NM收到RM命令, 使用分配的Container来启动AppMaster

  6. AM的主类MRAppMaster做一些Job的初始化工作,如监控作业进度等。

  7. 通过HDFS得到由客户端计算好的输入split,然后为每个输入split创建一个map task, 再根据mapreduce.job.reduces创建指定数量的reducer task.
    然后AM决定如何运行构成整个job的tasks。如果job很小, AM根据用户配置可以选择在本节点的JVM中运行该job, 这种job称作是uber job。

  8. AM为tasks向RM申请Container
    如果该job不是uber类型,那么AM机会向RM请求container来运行所有的map和reduce任务。 (注:每个任务对应一个container,且只能在该container上运行)。

    该请求包括每个map task的数据本地化信息,特别是输入split所在的节点和机架(rack)信息。调度器会使用这些信息来进行调度决策。调度器会优先将task分配到所需数据所在节点执行,这就是所谓数据本地化策略,避免了远程访问数据的各种开销。如果不能分配,那就会根据用户设置的RelaxLocality(本地化松弛,默认为true)来将task分配到本地化节点同机架的其他节点上。

    请求还包括了task的内存需求, 默认情况下map和reduce任务的内存需求都是1024MB。 可以通过mapreduce.map.memory.mbmapreduce.reduce.memory.mb配置。

    Hadoop2中分配内存的方式和Hadoop1中不一样, Hadoop1中每个tasktracker有固定数量的slot, slot是在集群配置是设置的, 每个任务运行在一个slot中, 每个slot都有最大内存限制, 这也是整个集群固定配置的。这种方式很不灵活。可能的问题如小内存需求任务占用slot导致内存利用率低或是大内存需求任务占有slot但无足够内存导致失败。
    而在YARN中, 资源划分的粒度更细。App的内存需求可以介于最小内存和最大内存之间, 但必须是最小内存的整数倍,分配更灵活。

  9. Container资源成功给一个task后,分两个步骤来启动该task:
    a. AM向NM发送启动指定container的RPC请求StartContainersRequest;
    b. NM通过收到的请求上下文中的信息带参数执行YarnChild类的main方法。

  10. 资源本地化-去HDFS拉取task所需资源
    在运行task之前,先去HDFS拉取task需要的所有文件到本地, 比如作业配置、JAR文件等所有来自HDFS的文件(具体需求在第九步提交的StartContainersRequest中),这一步称为数据资源本地化。

  11. 启动Map或者ReduceTask

    注意:YarnChild运行在一个专用的JVM中, 但是YARN默认不支持JVM重用,因此每一个任务都是运行在一个新的JVM中。但是可以用uber job配置

  12. 应用程序运行完成后,AM向RM注销并关闭自己。

2.4.2 MRJob进度和状态更新

MapReduce Job 进度和状态更新

YARN中的task将其执行进度和状态上报给AM(使用TaskUmbilicalProtocol协议), AM每隔三秒将所有task上报的状态信息合并为该Job的整体状态视图。

如果开启了verbose,Client每秒(默认值为1秒,可以通过
mapreduce.client.progressmonitor.pollinterval设置)向AM请求查询任务实时的执行情况,方便用户查看。

在YARN中, 可以通过UI看到所有的App运行情况。具体来说,RM的WebUI 展示运行中的App以及对应的AM。AM展示管理的tasks进度等细节信息。

2.4.3 MRJob的完成

在Job生命周期内,Client可从AM查询Job进度, 还会每5秒(未开启verbose时的默认值,可通过mapreduce.client.completion.pollinterval设置)检查是否完成。

Job完成之后, AM和container会清理自己的工作状态, OutputCommiter的作业清理方法也会被调用。Job的信息会被Job历史服务器存储以备之后用户核查。

2.4.4 MRJob失败处理

Yarn的失败包括task失败、AM失败、NM失败甚至是RM失败,下面详细介绍下这几种情况。

2.4.4.1 task运行失败

task程序运行时异常和JVM进程突然退出会上报到AM,此次task尝试失败。

还有一种情况是task挂起。task运行过程中会定期调用TaskUmbilicalProtocol协议中ping方法联系AM,如果程序挂起导致AM一直收不到ping(通过mapreduce.task.timeout设置),就会判定该task超时,AM将该次task尝试标记为失败。如果将上述配置设为0 ,代表task不会被标记为失败,导致其资源无法释放,可能会出大问题,不推荐。

task尝试失败后,AM尝试重新执行该任务,同时要避免在失败节点上重跑。重试次数的配置如下:
map task:mapreduce.map.maxattempts
reduce task:mapreduce.reduce.maxattempts
task尝试失败次数超过这个阈值,就会认为这个task失败,不再进行重试。

最后说下怎么判定整个job的失败。如果一个MapReduce job中超过mapreduce.map.failures.maxpercent的map task 或者mapreduce.reduce.failures.maxpercent reduce task运行失败,就判定该job失败。

2.4.4.2 AM失败

AM会定时向RM发送心跳。如果AM故障(如硬件故障或网络错误等),RM可感知到并在新的container中运行一个新的AM实例。在默认情况下,只要AM运行失败一次,然后重试一次又失败就被认为是AM失败了不再重试。可以通过mapreduce.am.max-attempts来设置该值。

Yarn有一个全局参数yarn.resourcemanager.am.max-attempts,默认值为2,可以控制所在yarn上运行的AM的重试次数,也就是说上面讲的每个任务的重试次数不能超过这个全局参数。

MapReduce的AM恢复后,可以通过job历史来恢复故障App所运行tasks的状态,使其不需要重跑。可以通过设置yarn.app.mapreduce.am.job.recovery.enable为false,关闭此功能。

Client会向AM轮询job进度报告,当AM运行失败时,客户端需要重新定位新的AM实例。在job初始化的时候,client就通过请求RM得到并缓存了AM地址,这样做的好处是Client每次向AM查询时变得更快。在AM运行失败时,Client会因为请求超时而重新向RM请求最新的AM地址,这个过程对用户完全透明。

2.4.4.3 NM失败

NM也会定期向RM发送心跳。如果RM在10分钟内(默认值,可通过yarn.resourcemanager.nm.liveness-monitor.expiry-interval-ms设置)发现未收到任何来自某个NM节点的心跳,RM会将该节点从节点池中移除并通知该节点。

在该失败的NM上运行的所有App或AM就按照上文所述机制恢复。

注意:由于map task输出结果驻留在失败NM上的原因,所以在这些节点上运行成功的map task(所属job还未完成)还是需要重新调度运行,以免造成NM节点被踢掉后reduce任务无法拉取所需数据

最后要注意的是,如果累积的App运行失败次数过多,那么所在NM节点可能会被AM放入自己管理的黑名单(不管NM是否失败过)。对于MapReduce任务,默认累积超过3个task失败就会将此NM拉黑,任务会尽量分配到其他节点。该阈值的配置是mapreduce.job.maxtaskfailures.per.tracker

注意:当前版本hadoop中,黑名单由应用程序的AM管理,对RM透明,也就是说就算老的App把某个NM节点拉黑,但是新的App的任务依然有可能分配到这些故障节点

2.4.4.4 RM失败

RM作为Yarn的大脑,如果失败后果十分严重,如果没有配置StandBy RM的话会存在单点故障风险,会导致所有运行的job全部失败且无法恢复。可以用共享存储解决这个问题,源码中该接口为RMStateStore,利用ZooKeeper(ZKRMStateStore)或HDFS(FileSystemRMStateStore)做了RM HA那么active RM挂掉的情况下会自动切换到standBy RM,Client无明显感知。

RM HA时可以把运行中的App(不包括AM管理的任务信息)信息存储在高可用的Zookeeper或HDFS中备份,以备failover时备节点恢复关键状态信息。NM的信息未保存,因为是心跳调度机制,当备RM节点恢复服务收到NM心跳时可以快速重构NM状态信息。

RM备节点接管后,会从上面的状态存储区中读取信息进行恢复,为所有App重启AM。

上述RM HA过程是由FailoverController自动处理,他在默认情况下运行在RM内,使用zookeeper的leader选举机制来确保同一时刻只有一个主RM。

2.5 UberJob

2.5.1 JVM重用

首先,简单回顾一下Hadoop 1.x中的JVM重用功能:用户可以通过更改配置,来指定TaskTracker在同一个JVM里面最多可以累积执行的Task的数量(默认是1)。这样的好处是减少JVM启动、退出的次数,从而达到提高任务执行效率的目的。 配置的方法也很简单:通过设置mapred-site.xml里面参数mapred.job.reuse.jvm.num.tasks的值。该值默认是1,意味着TaskTracker会为每一个Map任务或Reduce任务都启动一个JVM,当任务执行完后再退出该JVM。依次类推,如果该值设置为3,TaskTracker则会在同一个JVM里面最多依次执行3个Task,然后才会退出该JVM。

在 Yarn(Hadoop MapReduce v2)里面,不再有参数mapred.job.reuse.jvm.num.tasks,但它也有类似JVM Reuse的功能——uber。据Arun的说法,启用该功能能够让一些任务的执行效率提高2到3倍(“we’ve observed 2x-3x speedup for some jobs”)。不过,由于Yarn的结构已经大不同于MapReduce v1中JobTracker/TaskTracker的结构,因此uber的原理和配置都和之前的JVM重用机制大不相同。

2.5.2 uber的原理

Yarn的默认配置会禁用uber组件,即不允许JVM重用。我们先看看在这种情况下,Yarn是如何执行一个MapReduce job的。首先,Resource Manager里的Application Manager会为每一个application(比如一个用户提交的MapReduce Job)在NodeManager里面申请一个container,然后在该container里面启动一个Application Master。container在Yarn中是分配资源的容器(内存、cpu、硬盘等),它启动时便会相应启动一个JVM。此时,Application Master便陆续为application包含的每一个task(一个Map task或Reduce task)向Resource Manager申请一个container。等每得到一个container后,便要求该container所属的NodeManager将此container启动,然后就在这个container里面执行相应的task。等这个task执行完后,这个container便会被NodeManager收回,而container所拥有的JVM也相应地被退出。在这种情况下,可以看出每一个JVM仅会执行一Task, JVM并未被重用。

用户可以通过启用uber组件来允许JVM重用——即在同一个container里面依次执行多个task。在yarn-site.xml文件中,改变一下几个参数的配置即可启用uber的方法:

参数 默认值 描述
mapreduce.job.ubertask.enable (false) 是否启用user功能。如果启用了该功能,则会将一个“小的application”的所有子task在同一个JVM里面执行,达到JVM重用的目的。这个JVM便是负责该application的ApplicationMaster所用的JVM(运行在其container里)。那具体什么样的application算是“小的application"呢?下面几个参数便是用来定义何谓一个“小的application"
mapreduce.job.ubertask.maxmaps 9 map任务数的阀值,如果一个application包含的map数小于该值的定义,那么该application就会被认为是一个小的application
mapreduce.job.ubertask.maxreduces 1 reduce任务数的阀值,如果一个application包含的reduce数小于该值的定义,那么该application就会被认为是一个小的application。不过目前Yarn不支持该值大于1的情况“CURRENTLY THE CODE CANNOT SUPPORT MORE THAN ONE REDUCE”
mapreduce.job.ubertask.maxbytes application的输入大小的阀值。默认为dfs.block.size的值。当实际的输入大小部超过该值的设定,便会认为该application为一个小的application。

最后,我们来看当uber功能被启用的时候,Yarn是如何执行一个application的:

  1. Resource Manager里的Application Manager会为每一个application在NodeManager里面申请一个container,然后在该container里面启动一个Application Master。
  2. containe启动时便会相应启动一个JVM。
  3. 此时,如果uber功能被启用,并且该application被认为是一个“小的application”,那么Application Master便会将该application包含的每一个task依次在这个container里的JVM里顺序执行,直到所有task被执行完(“WIth ‘uber’ mode enabled, you’ll run everything within the container of the AM itself”)。这样Application Master便不用再为每一个task向Resource Manager去申请一个单独的container,最终达到了 JVM重用(资源重用)的目的。

0x03 Yarn调度

0x04 Yarn调优

Hadoop-Yarn-调优

0xFF 参考文档

  • Hadoop 2.6.0-cdh5.8.2 源码
  • Hadoop权威指南