Spark on Yarn: Cluster模式Scheduler實現
背景
Spark on Yarn分yarn-cluster和yarn-client兩種模式。 
本文通過Cluster模式的TaskScheduler實現入手,梳理一遍spark on yarn的大致實現邏輯。 
前提我對兩種模式以及yarn任務的整體運行邏輯不是很清楚。
主體邏輯
cluster模式中,使用的TaskScheduler是YarnClusterScheduler。 
它繼承了默認使用的TaskSchedulerImpl類,額外在postStartHook方法里,喚醒了ApplicationMaster類的設置sparkcontext的方法。 
ApplicationMaster相當于是spark在yarn上的AM,內部的YarnRMClient類,負責向RM注冊和注銷AM,以及拿到attemptId。注冊AM之后,得到一個可以申請/釋放資源的YarnAllocationHandler類,從而可以維護container與executor之間的關系。 
下節具體介紹幾個主要類的實現邏輯。
具體實現
AM
ApplicationMaster,通過YarnRMClient來完成自己的注冊和注銷。
AM的啟動方式
/**
 * This object does not provide any special functionality. It exists so that it's easy to tell
 * apart the client-mode AM from the cluster-mode AM when using tools such as ps or jps.
 */
object ExecutorLauncher {
  def main(args: Array[String]) = {
    ApplicationMaster.main(args)
  }
}main里面調用AM的run方法:
  def main(args: Array[String]) = {
    SignalLogger.register(log)
    val amArgs = new ApplicationMasterArguments(args)
    SparkHadoopUtil.get.runAsSparkUser { () =>
      master = new ApplicationMaster(amArgs, new YarnRMClientImpl(amArgs))
      System.exit(master.run())
    }
  }如果AM的啟動參數里有用戶自己定義的類,則是Driver模式,即cluster模式。用戶自己定義的類里面帶了spark driver,會在單獨一個線程里啟動。這也是cluster模式與client模式的區別,用戶實現了driver vs 用戶只是提交app。
run方法里 
1. 如果不是Driver模式,執行runExecutorLauncher邏輯: 
啟動后,執行registerAM,里面new了YarnAllocator的實現,調用allocateResources,申請并執行container。同時,啟動一個reporter線程,每隔一段時間調用YarnAllocator的allocateResources方法,或匯報有太多executor fail了。 
2. 如果是Driver模式,執行runDriver邏輯: 
也是執行registerAM,但是之前需要反射執行jar包里用戶定義的driver類。 
YarnAllocator
YarnAllocator負責向yarn申請和釋放containers,維護containe、executor相關關系,有一個線程池。申請到container之后,在container里執行ExecutorRunnable。需要子類實現的是申請和釋放這兩個方法:
protected def allocateContainers(count: Int, pending: Int): YarnAllocateResponse protected def releaseContainer(container: Container): Unit
YarnAllocationHandler繼承了YarnAllocator。
- allocateContainers方法: 
 Yarn api里提供ResourceRequest這個類,里面包含了一個app向RM索要不同container的信息,包括機器名/機架名,cpu和mem 資源數,container數,優先級,locality是否放松。然后組成AllocateRequest類,代表AM向RM從集群里獲得 resource。調用ApplicationMasterProtocal的allocate(AllocateRequest),由AM**向RM發起資源請求**。
- releaseContainer方法: 
 每次把需要release的container記錄下來。在每次allocateContainers調用的時候,
 會往AllocateRequest里addAllReleases(releasedContainerList),在請求資源的時候順便把歷史資源釋放掉。
ExecutorRunnable與Yarn的關系: 
1. 向ContainerManager建立連接,讓cm來startContainer。 
2. ContainerLaunchContext包含了yarn的NodeManager啟動一個container需要的所有信息。ExecutorRunnable會構建這個container申請信息。 
可以參考這段啟動邏輯: 
def startContainer = {
    logInfo("Setting up ContainerLaunchContext")
    val ctx = Records.newRecord(classOf[ContainerLaunchContext])
      .asInstanceOf[ContainerLaunchContext]
    ctx.setContainerId(container.getId())
    ctx.setResource(container.getResource())
    val localResources = prepareLocalResources
    ctx.setLocalResources(localResources)
    val env = prepareEnvironment
    ctx.setEnvironment(env)
    ctx.setUser(UserGroupInformation.getCurrentUser().getShortUserName())
    val credentials = UserGroupInformation.getCurrentUser().getCredentials()
    val dob = new DataOutputBuffer()
    credentials.writeTokenStorageToStream(dob)
    ctx.setContainerTokens(ByteBuffer.wrap(dob.getData()))
    val commands = prepareCommand(masterAddress, slaveId, hostname, executorMemory, executorCores,
      appAttemptId, localResources)
    logInfo("Setting up executor with commands: " + commands)
    ctx.setCommands(commands)
    ctx.setApplicationACLs(YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr))
    // If external shuffle service is enabled, register with the Yarn shuffle service already
    // started on the NodeManager and, if authentication is enabled, provide it with our secret
    // key for fetching shuffle files later
    if (sparkConf.getBoolean("spark.shuffle.service.enabled", false)) {
      val secretString = securityMgr.getSecretKey()
      val secretBytes =
        if (secretString != null) {
          // This conversion must match how the YarnShuffleService decodes our secret
          JavaUtils.stringToBytes(secretString)
        } else {
          // Authentication is not enabled, so just provide dummy metadata
          ByteBuffer.allocate(0)
        }
      ctx.setServiceData(Map[String, ByteBuffer]("spark_shuffle" -> secretBytes))
    }
    // Send the start request to the ContainerManager
    val startReq = Records.newRecord(classOf[StartContainerRequest])
    .asInstanceOf[StartContainerRequest]
    startReq.setContainerLaunchContext(ctx)
    cm.startContainer(startReq)
  }值得注意的是setServiceData方法,如果在node manager上啟動了external shuffle service。Yarn的AuxiliaryService支持在NodeManager上啟動輔助服務。spark有一個參數spark.shuffle.service.enabled來設置該服務是否被啟用,我看的1.2.0版本里貌似沒有服務的實現代碼。
Executor
此外,從ExecutorRunnableUtil的prepareCommand方法可以得知,ExecutorRunnable通過命令行啟動了CoarseGrainedExecutorBackend進程,與粗粒度的mesos模式和standalone模式一致,task最終落到CoarseGrainedExecutorBackend里面執行。
全文完:)
來自:http://blog.csdn.net/pelick/article/details/43836563