Spark on Yarn: Cluster模式Scheduler實現
背景
Spark on Yarn分yarn-cluster和yarn-client兩種模式。
本文通過Cluster模式的TaskScheduler實現入手,梳理一遍spark on yarn的大致實現邏輯。
前提我對兩種模式以及yarn任務的整體運行邏輯不是很清楚。
主體邏輯
cluster模式中,使用的TaskScheduler是YarnClusterScheduler。
它繼承了默認使用的TaskSchedulerImpl類,額外在postStartHook方法里,喚醒了ApplicationMaster類的設置sparkcontext的方法。
ApplicationMaster相當于是spark在yarn上的AM,內部的YarnRMClient類,負責向RM注冊和注銷AM,以及拿到attemptId。注冊AM之后,得到一個可以申請/釋放資源的YarnAllocationHandler類,從而可以維護container與executor之間的關系。
下節具體介紹幾個主要類的實現邏輯。
具體實現
AM
ApplicationMaster,通過YarnRMClient來完成自己的注冊和注銷。
AM的啟動方式
/** * This object does not provide any special functionality. It exists so that it's easy to tell * apart the client-mode AM from the cluster-mode AM when using tools such as ps or jps. */ object ExecutorLauncher { def main(args: Array[String]) = { ApplicationMaster.main(args) } }
main里面調用AM的run方法:
def main(args: Array[String]) = { SignalLogger.register(log) val amArgs = new ApplicationMasterArguments(args) SparkHadoopUtil.get.runAsSparkUser { () => master = new ApplicationMaster(amArgs, new YarnRMClientImpl(amArgs)) System.exit(master.run()) } }
如果AM的啟動參數里有用戶自己定義的類,則是Driver模式,即cluster模式。用戶自己定義的類里面帶了spark driver,會在單獨一個線程里啟動。這也是cluster模式與client模式的區別,用戶實現了driver vs 用戶只是提交app。
run方法里
1. 如果不是Driver模式,執行runExecutorLauncher邏輯:
啟動后,執行registerAM,里面new了YarnAllocator的實現,調用allocateResources,申請并執行container。同時,啟動一個reporter線程,每隔一段時間調用YarnAllocator的allocateResources方法,或匯報有太多executor fail了。
2. 如果是Driver模式,執行runDriver邏輯:
也是執行registerAM,但是之前需要反射執行jar包里用戶定義的driver類。
YarnAllocator
YarnAllocator負責向yarn申請和釋放containers,維護containe、executor相關關系,有一個線程池。申請到container之后,在container里執行ExecutorRunnable。需要子類實現的是申請和釋放這兩個方法:
protected def allocateContainers(count: Int, pending: Int): YarnAllocateResponse protected def releaseContainer(container: Container): Unit
YarnAllocationHandler繼承了YarnAllocator。
- allocateContainers方法:
Yarn api里提供ResourceRequest這個類,里面包含了一個app向RM索要不同container的信息,包括機器名/機架名,cpu和mem 資源數,container數,優先級,locality是否放松。然后組成AllocateRequest類,代表AM向RM從集群里獲得 resource。調用ApplicationMasterProtocal的allocate(AllocateRequest),由AM**向RM發起資源請求**。 - releaseContainer方法:
每次把需要release的container記錄下來。在每次allocateContainers調用的時候,
會往AllocateRequest里addAllReleases(releasedContainerList),在請求資源的時候順便把歷史資源釋放掉。
ExecutorRunnable與Yarn的關系:
1. 向ContainerManager建立連接,讓cm來startContainer。
2. ContainerLaunchContext包含了yarn的NodeManager啟動一個container需要的所有信息。ExecutorRunnable會構建這個container申請信息。
可以參考這段啟動邏輯:
def startContainer = { logInfo("Setting up ContainerLaunchContext") val ctx = Records.newRecord(classOf[ContainerLaunchContext]) .asInstanceOf[ContainerLaunchContext] ctx.setContainerId(container.getId()) ctx.setResource(container.getResource()) val localResources = prepareLocalResources ctx.setLocalResources(localResources) val env = prepareEnvironment ctx.setEnvironment(env) ctx.setUser(UserGroupInformation.getCurrentUser().getShortUserName()) val credentials = UserGroupInformation.getCurrentUser().getCredentials() val dob = new DataOutputBuffer() credentials.writeTokenStorageToStream(dob) ctx.setContainerTokens(ByteBuffer.wrap(dob.getData())) val commands = prepareCommand(masterAddress, slaveId, hostname, executorMemory, executorCores, appAttemptId, localResources) logInfo("Setting up executor with commands: " + commands) ctx.setCommands(commands) ctx.setApplicationACLs(YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr)) // If external shuffle service is enabled, register with the Yarn shuffle service already // started on the NodeManager and, if authentication is enabled, provide it with our secret // key for fetching shuffle files later if (sparkConf.getBoolean("spark.shuffle.service.enabled", false)) { val secretString = securityMgr.getSecretKey() val secretBytes = if (secretString != null) { // This conversion must match how the YarnShuffleService decodes our secret JavaUtils.stringToBytes(secretString) } else { // Authentication is not enabled, so just provide dummy metadata ByteBuffer.allocate(0) } ctx.setServiceData(Map[String, ByteBuffer]("spark_shuffle" -> secretBytes)) } // Send the start request to the ContainerManager val startReq = Records.newRecord(classOf[StartContainerRequest]) .asInstanceOf[StartContainerRequest] startReq.setContainerLaunchContext(ctx) cm.startContainer(startReq) }
值得注意的是setServiceData方法,如果在node manager上啟動了external shuffle service。Yarn的AuxiliaryService支持在NodeManager上啟動輔助服務。spark有一個參數spark.shuffle.service.enabled來設置該服務是否被啟用,我看的1.2.0版本里貌似沒有服務的實現代碼。
Executor
此外,從ExecutorRunnableUtil的prepareCommand方法可以得知,ExecutorRunnable通過命令行啟動了CoarseGrainedExecutorBackend進程,與粗粒度的mesos模式和standalone模式一致,task最終落到CoarseGrainedExecutorBackend里面執行。
全文完:)
來自:http://blog.csdn.net/pelick/article/details/43836563