Spark相关笔记

基本概念

组件

简介

  • Application:表示你的应用程序

  • Driver:表示main()函数,创建SparkContext。

    创建spark的上下文
    划分RDD并生成有向无环图(DAGScheduler)
    与spark中的其他组进行协调,协调资源等等(SchedulerBackend)
    生成并发送task到executor(taskScheduler)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    driver运行在哪里(在这里只讨论yarn模式)

    yarn-cluster模式下,client将用户程序提交到到spark集群中就与spark集群断开联系了
    此时client将不会发挥其他任何作用,仅仅负责提交。
    在此模式下AM和driver是同一个东西,但官网上给的是driver运行在AM里,可以理解为AM包括了driver的功能就像Driver运行在AM里一样
    此时的AM既能够向AM申请资源并进行分配,又能完成driver划分RDD提交task等工作

    yarn-client模式下,Driver运行在客户端上,先有driver再用AM,此时driver负责RDD生成、task生成和分发,向AM申请资源等
    AM负责向RM申请资源,其他的都由driver来完成
  • Executor:某个Application运行在Worker节点上的一个进程,该进程负责运行某些task,并且负责将数据存在内存或者磁盘上。

    在Spark on Yarn模式下,其进程名称为 CoarseGrainedExecutor Backend,一个CoarseGrainedExecutor Backend进程有且仅有一个executor对象,它负责将Task包装成taskRunner,并从线程池中抽取出一个空闲线程运行Task,这样,每个CoarseGrainedExecutorBackend能并行运行Task的数据就取决于分配给它的CPU的个数。

  • Worker:集群中可以运行Application代码的节点。

    在Standalone模式中指的是通过slave文件配置的worker节点,在Spark on Yarn模式中指的就是NodeManager节点。

  • Task:在Executor进程中执行任务的工作单元,多个Task组成一个Stage

  • Job:包含多个Task组成的并行计算,是由Action行为触发的

  • Stage:每个Job会被拆分很多组Task,作为一个TaskSet,其名称为Stage

  • DAGScheduler:根据Job构建基于Stage的DAG,并提交Stage给TaskScheduler,其划分Stage的依据是RDD之间的依赖关系

  • TaskScheduler:将TaskSet提交给Worker(集群)运行,每个Executor运行什么Task就是在此处分配的。

组件交互图

基本要点

1
2
3
4
5
6
7
一个Application会启动一个Driver

一个Driver负责跟踪管理该Application运行过程中所有的资源状态和任务状态

一个Driver会管理一组Executor

一个Executor只执行属于一个Driver的Task

橙色:提交用户Spark程序

用户提交一个Spark程序,主要的流程如下所示:

  • 1) 用户spark-submit脚本提交一个Spark程序,会创建一个ClientEndpoint对象,该对象负责与Master通信交互

  • 2) ClientEndpoint向Master发送一个RequestSubmitDriver消息,表示提交用户程序

  • 3) Master收到RequestSubmitDriver消息,向ClientEndpoint回复SubmitDriverResponse,表示用户程序已经完成注册

  • 4) ClientEndpoint向Master发送RequestDriverStatus消息,请求Driver状态

  • 5) 如果当前用户程序对应的Driver已经启动,则ClientEndpoint直接退出,完成提交用户程序

紫色:启动Driver进程

当用户提交用户Spark程序后,需要启动Driver来处理用户程序的计算逻辑,完成计算任务,这时Master协调需要启动一个Driver,具体流程如下所示:

  • 1) Maser内存中维护着用户提交计算的任务Application,每次内存结构变更都会触发调度,向Worker发送LaunchDriver请求

  • 2) Worker收到LaunchDriver消息,会启动一个DriverRunner线程去执行LaunchDriver的任务

  • 3) DriverRunner线程在Worker上启动一个新的JVM实例,该JVM实例内运行一个Driver进程,该Driver会创建SparkContext对象

红色:注册Application

Dirver启动以后,它会创建SparkContext对象,初始化计算过程中必需的基本组件,并向Master注册Application,流程描述如下:

  • 1) 创建SparkEnv对象,创建并管理一些数基本组件

  • 2) 创建TaskScheduler,负责Task调度

  • 3) 创建StandaloneSchedulerBackend,负责与ClusterManager进行资源协商

  • 4) 创建DriverEndpoint,其它组件可以与Driver进行通信

  • 5) 在StandaloneSchedulerBackend内部创建一个StandaloneAppClient,负责处理与Master的通信交互

  • 6) StandaloneAppClient创建一个ClientEndpoint,实际负责与Master通信

  • 7) ClientEndpoint向Master发送RegisterApplication消息,注册Application

  • 8) Master收到RegisterApplication请求后,回复ClientEndpoint一个RegisteredApplication消息,表示已经注册成功

蓝色:启动Executor进程

  • 1) Master向Worker发送LaunchExecutor消息,请求启动Executor;同时Master会向Driver发送ExecutorAdded消息,表示Master已经新增了一个Executor(此时还未启动)

  • 2) Worker收到LaunchExecutor消息,会启动一个ExecutorRunner线程去执行LaunchExecutor的任务

  • 3) Worker向Master发送ExecutorStageChanged消息,通知Executor状态已发生变化

  • 4) Master向Driver发送ExecutorUpdated消息,此时Executor已经启动

粉色:启动Task执行

  • 1) StandaloneSchedulerBackend启动一个DriverEndpoint

  • 2) DriverEndpoint启动后,会周期性地检查Driver维护的Executor的状态,如果有空闲的Executor便会调度任务执行

  • 3) DriverEndpoint向TaskScheduler发送Resource Offer请求

  • 4) 如果有可用资源启动Task,则DriverEndpoint向Executor发送LaunchTask请求

  • 5) Executor进程内部的CoarseGrainedExecutorBackend调用内部的Executor线程的launchTask方法启动Task

  • 6) Executor线程内部维护一个线程池,创建一个TaskRunner线程并提交到线程池执行

绿色:Task运行完成

  • 1) Executor进程内部的Executor线程通知CoarseGrainedExecutorBackend,Task运行完成

  • 2) CoarseGrainedExecutorBackend向DriverEndpoint发送StatusUpdated消息,通知Driver运行的Task状态发生变更

  • 3) StandaloneSchedulerBackend调用TaskScheduler的updateStatus方法更新Task状态

  • 4) StandaloneSchedulerBackend继续调用TaskScheduler的resourceOffers方法,调度其他任务运行


基本概念

1
2
3
4
5
6
res = sc.wholeTextFiles("hdfs://192.168.4.220:8020/datas", minPartitions=40) \
.map(map_extract) \
.flatMap(lambda x:x) \
.map(lambda x:(x[0], int(x[1].split(',')[2]))) \
.reduceByKey(operator.add) \
.collect()

整个逻辑实际上就用了sparkContext的一个函数,rdd的 3 个 transformation 和 1 个action

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
1,transformation是得到一个新的RDD,方式很多,比如从数据源生成一个新的RDD,从RDD生成一个新的RDD
2,action是得到一个值,或者一个结果(直接将RDDcache到内存中)
所有的transformation都是采用的懒策略,就是如果只是将transformation提交是不会执行计算的,计算只有在action被提交的时候才被触发。


transformation操作:

map(func):对调用map的RDD数据集中的每个element都使用func,然后返回一个新的RDD,这个返回的数据集是分布式的数据集
filter(func): 对调用filter的RDD数据集中的每个元素都使用func,然后返回一个包含使func为true的元素构成的RDD
flatMap(func):和map差不多,但是flatMap生成的是多个结果
mapPartitions(func):和map很像,但是map是每个element,而mapPartitions是每个partition
mapPartitionsWithSplit(func):和mapPartitions很像,但是func作用的是其中一个split上,所以func中应该有index
sample(withReplacement,faction,seed):抽样
union(otherDataset):返回一个新的dataset,包含源dataset和给定dataset的元素的集合
distinct([numTasks]):返回一个新的dataset,这个dataset含有的是源dataset中的distinct的element
groupByKey(numTasks):返回(K,Seq[V]),也就是hadoop中reduce函数接受的key-valuelist
reduceByKey(func,[numTasks]):就是用一个给定的reducefunc再作用在groupByKey产生的(K,Seq[V]),比如求和,求平均数
sortByKey([ascending],[numTasks]):按照key来进行排序,是升序还是降序,ascending是boolean类型
join(otherDataset,[numTasks]):当有两个KV的dataset(K,V)和(K,W),返回的是(K,(V,W))的dataset,numTasks为并发的任务数
cogroup(otherDataset,[numTasks]):当有两个KV的dataset(K,V)和(K,W),返回的是(K,Seq[V],Seq[W])的dataset,numTasks为并发的任务数
cartesian(otherDataset):笛卡尔积就是m*n,大家懂的

 
action操作:

reduce(func):说白了就是聚集,但是传入的函数是两个参数输入返回一个值,这个函数必须是满足交换律和结合律的
collect():一般在filter或者足够小的结果的时候,再用collect封装返回一个数组
count():返回的是dataset中的element的个数
first():返回的是dataset中的第一个元素
take(n):返回前n个elements,这个士driverprogram返回的
takeSample(withReplacement,num,seed):抽样返回一个dataset中的num个元素,随机种子seed
saveAsTextFile(path):把dataset写到一个textfile中,或hdfs,或者hdfs支持的文件系统中,spark把每条记录都转为行记录,然后写file中
saveAsSequenceFile(path):只能用在key-value对上,然后生成SequenceFile写到本地或者hadoop文件系统
countByKey():返回的是key对应的个数的一个map,作用于一个RDD
foreach(func):对dataset中的每个元素都使用func

Job

就是由一个 rdd 的 action 触发的动作,可以简单的理解为,当你需要执行一个 rdd 的 action 的时候,会生成一个 job。

1
2
3
首先job的划分是遇到action操作时,被发现后经过sparkcontext的runjob方法来到DAGscheduler
这个类中它会通过依赖关系划分出stage,一个stage是一个taskset
里面的每个task对应着rdd的一个分区。task可以理解为并行的分片。

Stage

是一个 job 的组成单位,就是说,一个 job 会被切分成 1 个或 1 个以上的 stage,然后各个 stage 会按照执行顺序依次执行。

一个job会被拆为多组task,每组任务称为一个stage,以shuffle进行划分

Task

即 stage 下的一个任务执行单元,一般来说,一个 rdd 有多少个 partition,就会有多少个 task,因为每一个 task 只是处理一个 partition 上的数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
如果在spark-default.conf中没有显示设置的话。
会按照不同模式给不同的默认分区数。(spark.default.parallelism)
对于local[N]模式,因为开辟了N个线程,所以有N个core,也就默认分区为N。
如果单用local那么只会开起一个分区。

如果是伪分布模式,local-cluster[x,y,z]
那么默认分区是x*y,x代表的是运行的executor数量,y是每个executor有多少个core。

如果是yarn或者是standalone模式。
是用的函数max(x*y,2)前者的含义和伪分布一样,后者表示如果x*y<2,分区就取2。
在程序读取创建RDD的时候,一般会用textFile,这个函数可以读取本地或者是hdfs的文件。

分区数为:
rdd的分区数 = max(本地file的分片数, sc.defaultMinPartitions)
rdd的分区数 = max(hdfs文件的block数目, sc.defaultMinPartitions)

Partition

一个rdd分成几个partition,则有几个task,task被分配到节点中

每个节点的executor有几个core,则有几个task可以被并行执行

最大并行度即为节点数*core(虚拟核,并不是每个节点的cpu物理核,但一般虚拟核<=物理核)

1
假设:有5个节点,每个节点的executor有2个core;有1万条数据组成一个rdd,分成10个partition,则有10个task。则每个节点分配到两个task并行执行。

宽窄依赖

在DAG调度中需要对计算的过程划分Stage,划分的依据就是RDD之间的依赖关系。

RDD之间的依赖关系分为两种:宽依赖(wide dependency/shuffle dependency)和窄依赖(narrow dependency)

窄依赖

窄依赖就是指父RDD的每个分区只被一个子RDD分区使用,子RDD分区通常只对应常数个父RDD分区

宽依赖

宽依赖就是指父RDD的每个分区都有可能被多个子RDD分区使用,子RDD分区通常对应父RDD所有分区

窄依赖的函数

map, filter, union, join(父RDD是hash-partitioned ), mapPartitions, mapValues

宽依赖的函数

groupByKey, join(父RDD不是hash-partitioned ), partitionBy


运行流程

1
2
3
4
5
6
7
8
9
(1)构建Spark Application的运行环境(启动SparkContext),SparkContext向资源管理器(可以是Standalone、Mesos或YARN)注册并申请运行Executor资源;

(2)资源管理器分配Executor资源并启动StandaloneExecutorBackend,Executor运行情况将随着心跳发送到资源管理器上;

(3)SparkContext构建成DAG图,将DAG图分解成Stage,并把Taskset发送给Task Scheduler。Executor向SparkContext申请Task

(4)Task Scheduler将Task发放给Executor运行同时SparkContext将应用程序代码发放给Executor。

(5)Task在Executor上运行,运行完毕释放所有资源。

DAGScheduler

Job=多个stage,Stage=多个同种task, Task分为ShuffleMapTask和ResultTask,Dependency分为ShuffleDependency和NarrowDependency

职责

面向stage的切分,切分依据为宽依赖

维护waiting jobs和active jobs,维护waiting stages、active stages和failed stages,以及与jobs的映射关系

主要职能

  • 1、接收提交Job的主入口,submitJob(rdd, …)或runJob(rdd, …)。在SparkContext里会调用这两个方法。

    • 生成一个Stage并提交,接着判断Stage是否有父Stage未完成,若有,提交并等待父Stage,以此类推。结果是:DAGScheduler里增加了一些waiting stage和一个running stage。
    • running stage提交后,分析stage里Task的类型,生成一个Task描述,即TaskSet。
    • 调用TaskScheduler.submitTask(taskSet, …)方法,把Task描述提交给TaskScheduler。TaskScheduler依据资源量和触发分配条件,会为这个TaskSet分配资源并触发执行。
    • DAGScheduler提交job后,异步返回JobWaiter对象,能够返回job运行状态,能够cancel job,执行成功后会处理并返回结果
  • 2、处理TaskCompletionEvent

    • 如果task执行成功,对应的stage里减去这个task,做一些计数工作:
      • 如果task是ResultTask,计数器Accumulator加一,在job里为该task置true,job finish总数加一。加完后如果finish数目与partition数目相等,说明这个stage完成了,标记stage完成,从running stages里减去这个stage,做一些stage移除的清理工作
      • 如果task是ShuffleMapTask,计数器Accumulator加一,在stage里加上一个output location,里面是一个MapStatus类。MapStatus是ShuffleMapTask执行完成的返回,包含location信息和block size(可以选择压缩或未压缩)。同时检查该stage完成,向MapOutputTracker注册本stage里的shuffleId和location信息。然后检查stage的output location里是否存在空,若存在空,说明一些task失败了,整个stage重新提交;否则,继续从waiting stages里提交下一个需要做的stage
    • 如果task是重提交,对应的stage里增加这个task
    • 如果task是fetch失败,马上标记对应的stage完成,从running stages里减去。如果不允许retry,abort整个stage;否则,重新提交整个stage。另外,把这个fetch相关的location和map任务信息,从stage里剔除,从MapOutputTracker注销掉。最后,如果这次fetch的blockManagerId对象不为空,做一次ExecutorLost处理,下次shuffle会换在另一个executor上去执行。
    • 其他task状态会由TaskScheduler处理,如Exception, TaskResultLost, commitDenied等。
  • 3、其他与job相关的操作还包括:cancel job, cancel stage, resubmit failed stage等

TaskScheduler

维护task和executor对应关系,executor和物理资源对应关系,在排队的task和正在跑的task。

内部维护一个任务队列,根据FIFO或Fair策略,调度任务。

TaskScheduler本身是个接口,spark里只实现了一个TaskSchedulerImpl,理论上任务调度可以定制。

主要功能:

  • 1、submitTasks(taskSet),接收DAGScheduler提交来的tasks

    • 为tasks创建一个TaskSetManager,添加到任务队列里。TaskSetManager跟踪每个task的执行状况,维护了task的许多具体信息。
    • 触发一次资源的索要。
      • 首先,TaskScheduler对照手头的可用资源和Task队列,进行executor分配(考虑优先级、本地化等策略),符合条件的executor会被分配给TaskSetManager。
      • 然后,得到的Task描述交给SchedulerBackend,调用launchTask(tasks),触发executor上task的执行。task描述被序列化后发给executor,executor提取task信息,调用task的run()方法执行计算。
  • 2、cancelTasks(stageId),取消一个stage的tasks

    • 调用SchedulerBackend的killTask(taskId, executorId, …)方法。taskId和executorId在TaskScheduler里一直维护着。
  • 3、resourceOffer(offers: Seq[Workers]),这是非常重要的一个方法,调用者是SchedulerBacnend,用途是底层资源SchedulerBackend把空余的workers资源交给TaskScheduler,让其根据调度策略为排队的任务分配合理的cpu和内存资源,然后把任务描述列表传回给SchedulerBackend

    • 从worker offers里,搜集executor和host的对应关系、active executors、机架信息等等
    • worker offers资源列表进行随机洗牌,任务队列里的任务列表依据调度策略进行一次排序
    • 遍历每个taskSet,按照进程本地化、worker本地化、机器本地化、机架本地化的优先级顺序,为每个taskSet提供可用的cpu核数,看是否满足
      • 默认一个task需要一个cpu,设置参数为”spark.task.cpus=1”
      • 为taskSet分配资源,校验是否满足的逻辑,最终在TaskSetManager的resourceOffer(execId, host, maxLocality)方法里
      • 满足的话,会生成最终的任务描述,并且调用DAGScheduler的taskStarted(task, info)方法,通知DAGScheduler,这时候每次会触发DAGScheduler做一次submitMissingStage的尝试,即stage的tasks都分配到了资源的话,马上会被提交执行
  • 4、statusUpdate(taskId, taskState, data),另一个非常重要的方法,调用者是SchedulerBacnend,用途是SchedulerBacnend会将task执行的状态汇报给TaskScheduler做一些决定

    • 若TaskLost,找到该task对应的executor,从active executor里移除,避免这个executor被分配到其他task继续失败下去。
    • task finish包括四种状态:finished, killed, failed, lost。只有finished是成功执行完成了。其他三种是失败。
    • task成功执行完,调用TaskResultGetter.enqueueSuccessfulTask(taskSet, tid, data),否则调用TaskResultGetter.enqueueFailedTask(taskSet, tid, state, data)。TaskResultGetter内部维护了一个线程池,负责异步fetch task执行结果并反序列化。默认开四个线程做这件事,可配参数”spark.resultGetter.threads”=4。

TaskResultGetter取task result的逻辑

  • 1、对于success task,如果taskResult里的数据是直接结果数据,直接把data反序列出来得到结果;如果不是,会调用blockManager.getRemoteBytes(blockId)从远程获取。如果远程取回的数据是空的,那么会调用TaskScheduler.handleFailedTask,告诉它这个任务是完成了的但是数据是丢失的。否则,取到数据之后会通知BlockManagerMaster移除这个block信息,调用TaskScheduler.handleSuccessfulTask,告诉它这个任务是执行成功的,并且把result data传回去。

  • 2、对于failed task,从data里解析出fail的理由,调用TaskScheduler.handleFailedTask,告诉它这个任务失败了,理由是什么。

SchedulerBackend

在TaskScheduler下层,用于对接不同的资源管理系统,SchedulerBackend是个接口,需要实现的主要方法如下

1
2
3
4
5
def start(): Unit
def stop(): Unit
def reviveOffers(): Unit // 重要方法:SchedulerBackend把自己手头上的可用资源交给TaskScheduler,TaskScheduler根据调度策略分配给排队的任务吗,返回一批可执行的任务描述,SchedulerBackend负责launchTask,即最终把task塞到了executor模型上,executor里的线程池会执行task的run()
def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit =
throw new UnsupportedOperationException

粗粒度:进程常驻的模式,典型代表是standalone模式,mesos粗粒度模式,yarn

细粒度:mesos细粒度模式

这里讨论粗粒度模式,更好理解:CoarseGrainedSchedulerBackend。

维护executor相关信息(包括executor的地址、通信端口、host、总核数,剩余核数)

手头上executor有多少被注册使用了,有多少剩余,总共还有多少核是空的等等。

主要职能

  • 1、Driver端主要通过actor监听和处理下面这些事件:

    • RegisterExecutor(executorId, hostPort, cores, logUrls)。这是executor添加的来源,通常worker拉起、重启会触发executor的注册。CoarseGrainedSchedulerBackend把这些executor维护起来,更新内部的资源信息,比如总核数增加。最后调用一次makeOffer(),即把手头资源丢给TaskScheduler去分配一次,返回任务描述回来,把任务launch起来。这个makeOffer()的调用会出现在任何与资源变化相关的事件中,下面会看到。
    • StatusUpdate(executorId, taskId, state, data)。task的状态回调。首先,调用TaskScheduler.statusUpdate上报上去。然后,判断这个task是否执行结束了,结束了的话把executor上的freeCore加回去,调用一次makeOffer()。
    • ReviveOffers。这个事件就是别人直接向SchedulerBackend请求资源,直接调用makeOffer()。
    • KillTask(taskId, executorId, interruptThread)。这个killTask的事件,会被发送给executor的actor,executor会处理KillTask这个事件。
    • StopExecutors。通知每一个executor,处理StopExecutor事件。
    • RemoveExecutor(executorId, reason)。从维护信息中,那这堆executor涉及的资源数减掉,然后调用TaskScheduler.executorLost()方法,通知上层我这边有一批资源不能用了,你处理下吧。TaskScheduler会继续把executorLost的事件上报给DAGScheduler,原因是DAGScheduler关心shuffle任务的output location。DAGScheduler会告诉BlockManager这个executor不可用了,移走它,然后把所有的stage的shuffleOutput信息都遍历一遍,移走这个executor,并且把更新后的shuffleOutput信息注册到MapOutputTracker上,最后清理下本地的CachedLocationsMap。
  • 2、reviveOffers()方法的实现。直接调用了makeOffers()方法,得到一批可执行的任务描述,调用launchTasks。

  • 3、launchTasks(tasks: Seq[Seq[TaskDescription]])方法。

    • 遍历每个task描述,序列化成二进制,然后发送给每个对应的executor这个任务信息
      • 如果这个二进制信息太大,超过了9.2M(默认的akkaFrameSize 10M 减去 默认 为akka留空的200K),会出错,abort整个taskSet,并打印提醒增大akka frame size
      • 如果二进制数据大小可接受,发送给executor的actor,处理LaunchTask(serializedTask)事件。

Executor

Executor是spark里的进程模型,可以套用到不同的资源管理系统上,与SchedulerBackend配合使用。

内部有个线程池,有个running tasks map,有个actor,接收上面提到的由SchedulerBackend发来的事件。

事件处理

  • launchTask。根据task描述,生成一个TaskRunner线程,丢尽running tasks map里,用线程池执行这个TaskRunner
  • killTask。从running tasks map里拿出线程对象,调它的kill方法。




不同集群中的运行架构

Standalone运行过程

运行过程

  • 1、我们提交一个任务,任务就叫Application
  • 2、初始化程序的入口SparkContext,
    • 2.1 初始化DAG Scheduler
    • 2.2 初始化Task Scheduler
  • 3、Task Scheduler向master去进行注册并申请资源(CPU Core和Memory)
  • 4、Master根据SparkContext的资源申请要求和Worker心跳周期内报告的信息决定在哪个Worker上分配资源,然后在该Worker上获取资源,然后启动StandaloneExecutorBackend;顺便初始化好了一个线程池
  • 5、StandaloneExecutorBackend向Driver(SparkContext)注册,这样Driver就知道哪些Executor为他进行服务了。
      到这个时候其实我们的初始化过程基本完成了,我们开始执行transformation的代码,但是代码并不会真正的运行,直到我们遇到一个action操作。生产一个job任务,进行stage的划分
  • 6、SparkContext将Applicaiton代码发送给StandaloneExecutorBackend;并且SparkContext解析Applicaiton代码,构建DAG图,并提交给DAG Scheduler分解成Stage(当碰到Action操作 时,就会催生Job;每个Job中含有1个或多个Stage,Stage一般在获取外部数据和shuffle之前产生)。
  • 7、将Stage(或者称为TaskSet)提交给Task Scheduler。Task Scheduler负责将Task分配到相应的Worker,最后提交给StandaloneExecutorBackend执行;
  • 8、对task进行序列化,并根据task的分配算法,分配task
  • 9、对接收过来的task进行反序列化,把task封装成一个线程
  • 10、开始执行Task,并向SparkContext报告,直至Task完成。
  • 11、资源注销
1
2
3
4
5
6
7
8
9
Standalone模式是Spark实现的资源调度框架,其主要的节点有Client节点、Master节点和Worker节点。

其中Driver既可以运行在Master节点上中,也可以运行在本地Client端。

当用spark-shell交互式工具提交Spark的Job时,Driver在Master节点上运行;

当使用spark-submit工具提交Job或者在Eclips、IDEA等

开发平台上使用`new SparkConf().setMaster(“spark://master:7077”)`方式运行Spark任务时,Driver是运行在本地Client端上的。


YARN运行过程

YARN-Client

Yarn-Client模式中,Driver在客户端本地运行,这种模式可以使得Spark Application和客户端进行交互,因为Driver在客户端

所以可以通过webUI访问Driver的状态,默认是http://hadoop1:4040访问,而YARN通过http://hadoop1:8088访问。

YARN-client的工作流程分为以下几个步骤:

  • 1.Spark Yarn Client向YARN的ResourceManager申请启动Application Master。同时在SparkContent初始化中将创建DAGScheduler和TASKScheduler等,由于我们选择的是Yarn-Client模式,程序会选择YarnClientClusterScheduler和YarnClientSchedulerBackend;

  • 2.ResourceManager收到请求后,在集群中选择一个NodeManager,为该应用程序分配第一个Container,要求它在这个Container中启动应用程序的ApplicationMaster,与YARN-Cluster区别的是在该ApplicationMaster不运行SparkContext,只与SparkContext进行联系进行资源的分派;

  • 3.Client中的SparkContext初始化完毕后,与ApplicationMaster建立通讯,向ResourceManager注册,根据任务信息向ResourceManager申请资源(Container);

  • 4.一旦ApplicationMaster申请到资源(也就是Container)后,便与对应的NodeManager通信,要求它在获得的Container中启动CoarseGrainedExecutorBackend,CoarseGrainedExecutorBackend启动后会向Client中的SparkContext注册并申请Task;

  • 5.Client中的SparkContext分配Task给CoarseGrainedExecutorBackend执行,CoarseGrainedExecutorBackend运行Task并向Driver汇报运行的状态和进度,以让Client随时掌握各个任务的运行状态,从而可以在任务失败时重新启动任务;

  • 6.应用程序运行完成后,Client的SparkContext向ResourceManager申请注销并关闭自己。


YARN-Cluster

在YARN-Cluster模式中,当用户向YARN中提交一个应用程序后,YARN将分两个阶段运行该应用程序:第一个阶段是把Spark的Driver作为一个ApplicationMaster在YARN集群中先启动;第二个阶段是由ApplicationMaster创建应用程序,然后为它向ResourceManager申请资源,并启动Executor来运行Task,同时监控它的整个运行过程,直到运行完成。

YARN-cluster的工作流程分为以下几个步骤:

  • 1.Spark Yarn Client向YARN中提交应用程序,包括ApplicationMaster程序、启动ApplicationMaster的命令、需要在Executor中运行的程序等;

  • 2.ResourceManager收到请求后,在集群中选择一个NodeManager,为该应用程序分配第一个Container,要求它在这个Container中启动应用程序的ApplicationMaster,其中ApplicationMaster进行SparkContext等的初始化;

  • 3.ApplicationMaster向ResourceManager注册,这样用户可以直接通过ResourceManage查看应用程序的运行状态,然后它将采用轮询的方式通过RPC协议为各个任务申请资源,并监控它们的运行状态直到运行结束;

  • 4.一旦ApplicationMaster申请到资源(也就是Container)后,便与对应的NodeManager通信,要求它在获得的Container中启动CoarseGrainedExecutorBackend,CoarseGrainedExecutorBackend启动后会向ApplicationMaster中的SparkContext注册并申请Task。这一点和Standalone模式一样,只不过SparkContext在Spark Application中初始化时,使用CoarseGrainedSchedulerBackend配合YarnClusterScheduler进行任务的调度,其中YarnClusterScheduler只是对TaskSchedulerImpl的一个简单包装,增加了对Executor的等待逻辑等;

  • 5.ApplicationMaster中的SparkContext分配Task给CoarseGrainedExecutorBackend执行,CoarseGrainedExecutorBackend运行Task并向ApplicationMaster汇报运行的状态和进度,以让ApplicationMaster随时掌握各个任务的运行状态,从而可以在任务失败时重新启动任务;

  • 6.应用程序运行完成后,ApplicationMaster向ResourceManager申请注销并关闭自己。



资料附录