Spark进阶(三)

客户端模式和集群模式的区别

这里咱们要区分一下什么是客户端模式(client mode),什么是集群模式(cluster mode)。

  咱们知道,当在YARN上运行Spark做业时,每一个Spark executor做为一个YARN容器(container)运行。Spark可使得多个Tasks在同一个容器(container)里面运行。 yarn-cluster和yarn-client模式的区别其实就是Application Master进程的区别,在yarn-cluster模式下,driver运行在AM(Application Master)中,它负责向YARN申请资源,并监督做业的运行情况。当用户提交了做业以后,就能够关掉Client,做业会继续在YARN上运行。然而yarn-cluster模式不适合运行交互类型的做业。 在yarn-client模式下,Application Master仅仅向YARN请求executor,client会和请求的container通讯来调度他们工做,也就是说Client不能离开。下面的图形象表示了二者的区别。
  这里写图片描述
  这里写图片描述web

1.Spark on YARN集群模式分析app

1.1客户端操做svg

一、根据yarnConf来初始化yarnClient,并启动yarnClient;
二、建立客户端Application,并获取Application的ID,进一步判断集群中的资源是否知足executor和ApplicationMaster申请的资源,若是不知足则抛出IllegalArgumentException;
三、设置资源、环境变量:其中包括了设置Application的Staging目录、准备本地资源(jar文件、log4j.properties)、设置Application其中的环境变量、建立Container启动的Context等;
四、设置Application提交的Context,包括设置应用的名字、队列、AM的申请的Container、标记该做业的类型为Spark;
五、申请Memory,并最终经过yarnClient.submitApplication向ResourceManager提交该Application。
  看成业提交到YARN上以后,客户端就没事了,甚至在终端关掉那个进程也没事,由于整个做业运行在YARN集群上进行,运行的结果将会保存到HDFS或者日志中。
  
1.2提交到YARN集群,YARN操做函数

一、运行ApplicationMaster的run方法;
二、设置好相关的环境变量。
三、建立amClient,并启动;
四、在Spark UI启动以前设置Spark UI的AmIpFilter;
五、在startUserClass函数专门启动了一个线程(名称为Driver的线程)来启动用户提交的Application,也就是启动了Driver。在Driver中将会初始化SparkContext;
六、等待SparkContext初始化完成,最多等待spark.yarn.applicationMaster.waitTries次数(默认为10),若是等待了的次数超过了配置的,程序将会退出;不然用SparkContext初始化yarnAllocator;
七、当SparkContext、Driver初始化完成的时候,经过amClient向ResourceManager注册ApplicationMaster;
八、分配并启动Executeors。在启动Executeors以前,先要经过yarnAllocator获取到numExecutors个Container,而后在Container中启动Executeors。 若是在启动Executeors的过程当中失败的次数达到了maxNumExecutorFailures的次数,maxNumExecutorFailures的计算规则以下:ui

// Default to numExecutors * 2, with minimum of 3
private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures",
    sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3)))

那么这个Application将失败,将Application Status标明为FAILED,并将关闭SparkContext。其实,启动Executeors是经过ExecutorRunnable实现的,而ExecutorRunnable内部是启动CoarseGrainedExecutorBackend的。spa

九、最后,Task将在CoarseGrainedExecutorBackend里面运行,而后运行情况会经过Akka通知CoarseGrainedScheduler,直到做业运行完成。线程

2.Spark on YARN客户端模式分析日志

  和yarn-cluster模式同样,整个程序也是经过spark-submit脚本提交的。可是yarn-client做业程序的运行不须要经过Client类来封装启动,而是直接经过反射机制调用做业的main函数。下面是流程。code

一、经过SparkSubmit类的launch的函数直接调用做业的main函数(经过反射机制实现),若是是集群模式就会调用Client的main函数。
二、而应用程序的main函数必定都有个SparkContent,并对其进行初始化;
三、在SparkContent初始化中将会依次作以下的事情:设置相关的配置、注册MapOutputTracker、BlockManagerMaster、BlockManager,建立taskScheduler和dagScheduler;
四、初始化完taskScheduler后,将建立dagScheduler,而后经过taskScheduler.start()启动taskScheduler,而在taskScheduler启动的过程当中也会调用SchedulerBackend的start方法。 在SchedulerBackend启动的过程当中将会初始化一些参数,封装在ClientArguments中,并将封装好的ClientArguments传进Client类中,并client.runApp()方法获取Application ID。
五、client.runApp里面的作的和上章客户端进行操做那节相似,不一样的是在里面启动是ExecutorLauncher(yarn-cluster模式启动的是ApplicationMaster)。
六、在ExecutorLauncher里面会初始化并启动amClient,而后向ApplicationMaster注册该Application。注册完以后将会等待driver的启动,当driver启动完以后,会建立一个MonitorActor对象用于和CoarseGrainedSchedulerBackend进行通讯(只有事件AddWebUIFilter他们之间才通讯,Task的运行情况不是经过它和CoarseGrainedSchedulerBackend通讯的)。 而后就是设置addAmIpFilter,看成业完成的时候,ExecutorLauncher将经过amClient设置Application的状态为FinalApplicationStatus.SUCCEEDED。
七、分配Executors,这里面的分配逻辑和yarn-cluster里面相似。
八、最后,Task将在CoarseGrainedExecutorBackend里面运行,而后运行情况会经过Akka通知CoarseGrainedScheduler,直到做业运行完成。
九、在做业运行的时候,YarnClientSchedulerBackend会每隔1秒经过client获取到做业的运行情况,并打印出相应的运行信息,当Application的状态是FINISHED、FAILED和KILLED中的一种,那么程序将退出等待。
十、最后有个线程会再次确认Application的状态,当Application的状态是FINISHED、FAILED和KILLED中的一种,程序就运行完成,并中止SparkContext。整个过程就结束了。server