Spark启动及提交流程内部核心原理剖析

Apache Spark 是专为大规模数据处理而设计的快速通用的计算引擎,并且拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是——Job中间输出结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于需要迭代MapReduce的算法。接下来带大家探索一下Spark启动及提交流程的内部核心原理。

Netty

在探索Spark启动及提交流程的内部核心原理之前,我们得先简单介绍一下Spark内部的通信框架----Netty

Spark中通信框架的发展:

Spark早期版本中采用Akka作为内部通信部件。

Spark1.3中引入Netty通信框架,为了解决Shuffle的大数据传输问题使用

Spark1.6中Akka和Netty可以配置使用。Netty完全实现了Akka在Spark中的功能。

Spark2系列中,Spark抛弃Akka,使用Netty。

尚硅谷大数据培训_专业的大数据培训机构_值得信赖的大数据教程
大数据
大数据教程
大数据培训
尚硅谷大数据拼课程、论口碑更给力
尚硅谷IT培训
立即咨询
Netty通信架构解析

Netty通讯架构如下:

RpcEndpoint:RPC通信终端。Spark针对每个节点(Client/Master/Worker)都称之为一个RPC终端,且都实现RpcEndpoint接口,内部根据不同端点的需求,设计不同的消息和不同的业务处理,如果需要发送(询问)则调用Dispatcher。在Spark中,所有的终端都存在生命周期:

Constructor

onStart

receive*

onStop

RpcEnv:RPC上下文环境,每个RPC终端运行时依赖的上下文环境称为RpcEnv;在把当前Spark版本中使用的NettyRpcEnv(即每个节点都有环境上下文)

Dispatcher:消息调度(分发)器,针对于RPC终端需要发送远程消息或者从远程RPC接收到的消息,分发至对应的指令收件箱(发件箱)。如果指令接收方是自己则存入收件箱,如果指令接收方不是自己,则放入发件箱;一个环境一个

Inbox:指令消息收件箱。一个本地RpcEndpoint对应一个收件箱,Dispatcher在每次向Inbox存入消息时,都将对应EndpointData加入内部ReceiverQueue中,另外Dispatcher创建时会启动一个单独线程进行轮询ReceiverQueue,进行收件箱消息消费;

RpcEndpointRef:RpcEndpointRef是对远程RpcEndpoint的一个引用。当我们需要向一个具体的RpcEndpoint发送消息时,一般我们需要获取到该RpcEndpoint的引用,然后通过该应用发送消息。

OutBox:指令消息发件箱。对于当前RpcEndpoint来说,一个目标RpcEndpoint对应一个发件箱,如果向多个目标RpcEndpoint发送信息,则有多个OutBox。当消息放入Outbox后,紧接着通过TransportClient将消息发送出去。消息放入发件箱以及发送过程是在同一个线程中进行;

RpcAddress:表示远程的RpcEndpointRef的地址,Host + Port。

TransportClient:Netty通信客户端,一个OutBox对应一个TransportClient,TransportClient不断轮询OutBox,根据OutBox消息的receiver信息,请求对应的远程TransportServer;(类似socket)

TransportServer:Netty通信服务端,一个RpcEndpoint对应一个TransportServer,接受远程消息后调用Dispatcher分发消息至对应收发件箱(通过本地指令);

Netty通信流程总结

在一个rpcEnv里, RpcEndpoint通过持有RpcEndpointRef,向Dispatcher发送消息,Dispatcher识别到消息是远程指令,会把消息发送到OutBox。

TransportClient不断轮询OutBox的队列,一旦OutBox队列有消息,就会将消息发往对应RpcEndpoint的TransportServer。

接收的RpcEndpoint的TransportServer会把消息发往Dispatcher,Dispatcher识别到本地指令后,会把消息给发往自身的InBox里面,这样就实现了通信。

Spark启动流程剖析

在剖析Spark启动流程中,我们主要通过StandAlone模式下的Master / Work启动流程来看Spark是怎么通讯的。

Master启动流程

我们首先从启动命令start-all.sh出发(因为他会启动master和work),一步一步查看启动的调用流程:

start-all.sh

会加载sparkhome作为变量,所以学习spark安装多种模式spark时最好不配

start-master.sh

CLASS="org.apache.spark.deploy.master.Master"

"${SPARK_HOME}/sbin"/spark-daemon.sh start $CLASS 1 \

--host $SPARK_MASTER_HOST --port $SPARK_MASTER_PORT --webui-port $SPARK_MASTER_WEBUI_PORT \

$ORIGINAL_ARGS

run_command class "$@" --运行传过来的所有参数

${SPARK_HOME}"/bin/spark-class "$command" "$@"

java .. org.apache.spark.deploy.master.Master 最终启动这个类,启动java虚拟机

-- main

-- startRpcEnvAndEndpoint // master和worker通讯需要现有通讯环境,先创建通讯环境和endpoint

  1. RpcEnv.create //创建环境

1.1-- RpeEnv return new NettyRpcEnvFactory().create(config) //启动创建环境的工厂

1.2-- val nettyEnv = new NettyRpcEnv //创建netty环境

-- dispatcher: Dispatcher = new Dispatcher(this, numUsableCores)// 创建dispatch(一个环境只有一个

-- endpoints: ConcurrentMap[String, MessageLoop] //存储每个endpoint的消息死循环

-- endpointRefs: ConcurrentMap[RpcEndpoint, RpcEndpointRef] //根据ref找到endpoint通讯实体

-- new DedicatedMessageLoop(name, e, this)//专用消息循环(每个endpoint一个消息循环

-- private val inbox = new Inbox(name, endpoint) //一个endpoint都单独享有一个收件箱

-- receiveLoop()//每个线程都会死循环等待信息

1.3-- nettyEnv.startServer(config.bindAddress, actualPort) //启动netty服务

-- server = transportContext.createServer //创建transportServer

  1. rpcEnv.setupEndpoint(ENDPOINT_NAME, //将endpoint(master)放进环境

new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))

-- dispatcher.registerRpcEndpoint(name, endpoint) //将创建的master注册

-- new Master

//启动了一个线程, 这个线程会每60s检测一次所有worker的超时情况

-- timeOutDeadWorkers()

  1. master启动完毕

Worker启动流程

Worker的启动流程还是先从start-all.sh触发,会走进start-woker.sh。

org.apache.spark.deploy.worker.Worker

val rpcEnv = RpcEnv.create(systemName, host, port, conf, securityMgr)//创建rpcenv

val masterAddresses = masterUrls.map(RpcAddress.fromSparkURL)//与master相比,只多了这一行.

//worker会从配置找到master地址,主动去找master注册.(master有可能会是ha,所以可能找到的是master地址数组)

rpcEnv.setupEndpoint(ENDPOINT_NAME, new Worker(rpcEnv, webUiPort, cores, memory, //注册worker为endpoint

masterAddresses, ENDPOINT_NAME, workDir, conf, securityMgr, resourceFileOpt))

-- onStart

registerWithMaster() //向master注册自己(worker)

sendRegisterMessageToMaster(masterEndpoint) //发送注册信息

-- 注册成功后,每15秒执行一次,发送心跳

sendToMaster(Heartbeat(workerId, self))

masterRef.send(message)

尚硅谷大数据培训_专业的大数据培训机构_值得信赖的大数据教程
大数据
大数据教程
大数据培训
尚硅谷大数据拼课程、论口碑更给力
尚硅谷IT培训
立即咨询
Spark启动流程总结

A跟B通信,A拿到B的EndPointRef,通过send方法发送一个样例类进行通信。样例类携带更多信息,类似通信协议

B会有receive方法收到信息通过模式匹配进行匹配信息

Spark提交流程剖析

因为Spark可以以多种模式运行,国内多以YARN模式进行提交,所以此处以YARN的Cluster模式下的Spark提交流程进行剖析。

SparkSubmit

SparkSubmit的作用主要就是两个:

  1. 解析参数
  2. 提交参数,初始数环境,并获取"org.apache.spark.deploy.yarn.YarnClusterApplication"的对象,调用对象的start方法

org.apache.spark.deploy.SparkSubmit

main

-- submit.doSubmit(args) //执行提交

-- doSubmit

submit(appArgs, uninitLog)

-- doRunMain()

//执行主方法

runMain(args, uninitLog)

//准备提交环境

  1. val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)

//"org.apache.spark.deploy.yarn.YarnClusterApplication"赋给childMainClass 重点!!

//如果是client模式,则childMainClass就是提交jar包的主类

childMainClass = YARN_CLUSTER_SUBMIT_CLASS

//反射加载childMainClass类

  1. mainClass = Utils.classForName(childMainClass)

//创建mainClass实例并且转为SparkApplication类型

//SparkApplication是YarnClusterApplication的父类

  1. val app = mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication]

//最终调用YarnClusterApplication的start方法

  1. app.start(childArgs.toArray, sparkConf)

//client的构造器创建YarnClient对象,用于连接ResourceManager

new Client(new ClientArguments(args), conf, null).run()

--run

this.appId = submitApplication()//提交应用到ResourceManager,返回appid

-- submitApplication

val containerContext = createContainerLaunchContext(newAppResponse)//

//确定applicationMaster是谁

//如果yarn模式:就是applicationMaster

//如果是client模式:就是executorLauncher

-- amClass = org.apache.spark.deploy.yarn.ApplicationMaster

val appContext = createApplicationSubmissionContext(newApp, containerContext)

yarnClient.submitApplication(appContext)

//提交应用到ResourceManager,让resourcemanager启动container(applicationMaster)

此时第一个进程spark submit已经完成(可以直接把这个进程kill掉也没问题)

ApplicationMaster

此时到am进程启动,而这个进程主要的作用如下:

  1. 封装ApplicationMaster的参数
  2. 根据参数,创建ApplicationMaster对象
  3. 执行ApplicationMaster的run方法,在run方法中,最后调用到runDriver方法

//解析完各种参数,new 一个applicationMaster

master = new ApplicationMaster(amArgs, sparkConf, yarnConf)

//执行applicationMaster的run方法

master.run

if (isClusterMode) { //集群模式

runDriver()//集群模式就运行driver

} else { // client 模式

runExecutorLauncher()

}

//am启动第一件事就是跑driver,启动应用程序

runDriver()

  1. userClassThread = startUserApplication() //启动应用程序,也就是执行提交的jar包中的主函数

//加载参数穿过来的用户类 即提交时指定的--class

1.1 val mainMethod = userClassLoader.loadClass(args.userClass)

.getMethod("main", classOf[Array[String]])

1.2 new Thread ... //创建一个线程,线程名就叫driver,并返回这个线程

2.userThread.start()//执行这个driver线程的run方法,线程执行的就是我们提交应用程序类的主函数

//等待sc初始化,初始化完后才继续往下执行

  1. val sc: SparkContext = ThreadUtils.awaitResult(sparkContextPromise.future,

Duration(totalWaitTime, TimeUnit.MILLISECONDS))

//cs初始化完后向ResourceManager注册applicationMaster

//注册的本质就是向rm申请资源运行executor进程

  1. registerAM(host, port, userConf, sc.ui.map(_.webUrl), appAttemptId)

-- client.register(host, port, yarnConf, _sparkConf, uiAddress, historyAddress)

-- amClient.registerApplicationMaster(driverHost, driverPort, trackingUrl)

//am向rm注册成功后,创建分配器

  1. -- createAllocator(driverRef, userConf, rpcEnv, appAttemptId, distCacheConf)

-- allocator.allocateResources()//分配资源

-- val allocatedContainers = allocateResponse.getAllocatedContainers()//通过分配器响应 获取 分配到的容器列表

allocatorBlacklistTracker.setNumClusterNodes(allocateResponse.getNumClusterNodes)

handleAllocatedContainers(allocatedContainers.asScala)//资源列表大于0,就处理分配到的资源

runAllocatedContainers(containersToUse)//运行分配后的资源,封装指令

ExecutorRunnable.run//启动executor

-- startContainer()

-- val commands = prepareCommand()//封装指令

-- bin/java org.apache.spark.executor.YarnCoarseGrainedExecutorBackend

-- nmClient.startContainer(container.get, ctx)//启动容器,也就是启动executor

!!AM启动完毕

AM只有两个子线程,一个主线程,一个子线程(driver)

子线程driver执行用户类(用户传过来的jar包main方法)

主线程

1.主要是注册am(向rm请求分配资源) , rm返回容器给am

  1. am拿到返回容器列表,让nm在容器上执行java命令

-- bin/java org.apache.spark.executor.YarnCoarseGrainedExecutorBackend这个命令

3.最终在nm上启动executor进程

CoarseGrainedExecutorBackend

执行一次- bin/java org.apache.spark.executor.YarnCoarseGrainedExecutorBackend这个命令 就会执行一个新的进程,则是属于并行执行的感觉,和之前执行的内容是分开的。类似我们在Windows中开了一个微信和qq程序一样,各自执行,互不影响。因为这就是我们平时说的executor进程

  1. commands=/bin/java/org.apache.spark.executor.CoarseGrainedExecutorBackend,

执行这个指令,那么是调用这个类的main方法。

  1. main方法中:

// 1. 首先是对一些参数进行封装

// 2. 执行run方法

-- run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath)

// 1.通过driver的uri和Driver进行关联

--driver = fetcher.setupEndpointRefByURI(driverUrl)

// 2.通过通信环境创建了一个终端,名字为executor,

在底层:Executor启动后会注册通信,并收到信息onStart,收到消息后,会执行通信对象CoarseGrainedExecutorBackend

的onStart方法,点击CoarseGrainedExecutorBackend

--env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend( env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env))

// 1.获取driver的引用

-- driver = Some(ref)

// 2.ExecutorBackend向driver发送消息,注册executor的消息,也称之为反向注册

--ref.askBoolean)

// 3.在driver端会接收到这个消息,因为在driver端,有一个上下文的对象,sparkcontext,在这个类有一个属性:

private var _schedulerBackend: SchedulerBackend = _,点击SchedulerBackend,是一个trait,找到

实现类:CoarseGrainedSchedulerBackend,在这个类中,有一个方法:receiveAndReply():

// executor的引用,在driver端,发送消息给到ExecutorBackend,注册executor成功

--executorRef.send(RegisteredExecutor)

// ExecutorBackend类中有一个recive方法,用来接收driver返回的executor注册成功的消息,executor是一个计算对象,在这个对象里面有一个线程池,每一个线程来处理一个从driver端发送过来的任务

--executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)

整体提交流程图如下图所示:

YARN ClusterClient模式异同

到此,YARN模式下的Cluster提交流程结束,而Client模式的提交与Cluster模式提交的不同如下:

SparkSubmit类中:

childMainClass = args.mainClass//用户类 即提交时的 --class

//而在cluster模式则是org.apache.spark.deploy.yarn.YarnClusterApplication

开始执行用户的main函数:其实在执行 drvier

driver在sparkSubmit进程的主线程中运行

//而cluster的driver则是ApplicationMaster中的子线程,而AM一定在某一个NM上,所以叫cluster模式

//driver在客户端上运行,所以叫client模式

SparkContext类中//driver中会创建SparkContext

client.submitApplication()

amClass = org.apache.spark.deploy.yarn.ExecutorLauncher

开始启动 AM, 表明上是ExecutorLauncher, 本质还是ApplicationMaster

runExecutorLauncher()


不同:

  1. driver的位置不一样

cluset: 在ApplicationMaster进程中, 是它的一个子线程

client: 在SparkSubmit的进程中, 而且是在他的主线程中执行的.

  1. AM的名字不一样

cluster: ApplicationMaster

client: ExecutorLauntcher

Spark提交流程总结

用大白话解释提交流程源码就是:

执行suspark-submit后会有三个进程

SparkSubmit

ApplicationMaster

YarnCoarseGrainedExecutorBackend:粗粒度执行器后端, 也就是Executor

找个客户端执行sparksubmit

执行SparkSubmit类里的main方法,准备环境,解析一堆参数

获取一个childMainClass的值并且反射创建这个类

如果是cluster模式他的值就是YarnClusterApplication

如果是client模式他的值就是提交jar包的主类

通过反射创建childMainClass得到YarnClusterApplication并且强转为SparkApplication

调用SparkApplication的start方法

创建一个client去连接rm,并且获取到rm返回appId

封装一个指令让rm找一台nm启动一个am

这个指令如果是cluster那么启动的类就是applicationMaster,如果是client就是启动executorLauncher

此时SparkSubmit工作完成,如果是cluster模式,那么直接把这个进程kill掉也没事

ApplicationMaster

启动一个ApplicationMaster进程后,解析各种参数后封装一个ApplicationMaster对象

封装好的ApplicationMaster对象会开启一个线程运行用户类(提交的jar包)的main函数,这个线程就是driver线程,

在am进程主方法,会等待获取SparkContext,等到获取后就会向rm注册自己并申请资源,rm返回容器列表(这里申请资源细节比较多)

am拿到容器列表,就会在nm启动executor进程

YarnCoarseGrainedExecutorBackend进程启动成功后

启动后第一件事向driver线程反向注册

注册成功后,executor进程会创建executor计算对象

计算对象里有一个线程池,每一个线程来处理一个driver端发过来的任务

关键词:大数据培训

作者:江湖大侠原文地址:https://segmentfault.com/a/1190000041594210

%s 个评论

要回复文章请先登录注册