Spark Application的調度算法

NorineXGII 8年前發布 | 11K 次閱讀 Spark 分布式/云計算/大數據

來自: http://www.cnblogs.com/francisYoung/p/5205420.html

要想明白spark application調度機制,需要回答一下幾個問題:

1.誰來調度?

2.為誰調度?

3.調度什么?

3.何時調度?

4.調度算法

前四個問題可以用如下一句話里來回答:每當 集群資源發生變化 時, active master 進程 為 所有已注冊的并且沒有調度完畢的application 調度 Worker節點上的Executor 進程。

"active master" , spark集群可能有多個master,但是只有一個active master 參與調度,standby master不參與調度。

集群資源發生變化是什么意思呢?這里的集群資源指的主要是cores的變化,注冊/移除Executor進程使得集群的freeCores變多/變少,添加/移除Worker節點使得集群的freeCores變多/變少... ...,所有導致集群資源發生變化的操作,都會調用schedule()重新為application和driver進行資源調度。

spark提供了兩種資源調度算法:spreadOut和非spreadOut。spreadOut算法會盡可能的將一個application 所需要的Executor進程分布在多個worker幾點上,從而提高并行度,非spreadOut與之相反,他會把一個worker節點的freeCores都耗盡了才會去下一個worker節點分配。

為了詳細說明這兩種算法,我們先來以一個具體的例子來介紹,最后再介紹源碼。

基本概念

每一個application至少包含以下基本屬性:

coresPerExecutor:每一個Executor進程的core個數

memoryPerExecutor:每一個Executor進程的memory大小

maxCores: 這個application最多需要的core個數。

每一個worker至少包含以下基本屬性:

freeCores:worker 節點當前可用的core個數

memoryFree:worker節點當前可用的memory大小。

假設一個待注冊的application如下:

coresPerExecutor:2

memoryPerExecutor:512M

maxCores: 12

這表示這個application 最多需要12個core,每一個Executor進行都要2個core,512M內存。

假設某一時刻spark集群有如下幾個worker節點,他們按照coresFree降序排列:

Worker1:coresFree=10  memoryFree=10G

Worker2:coresFree=7   memoryFree=1G

Worker3:coresFree=3   memoryFree=2G

Worker4:coresFree=2   memoryFree=215M

Worker5:coresFree=1   memoryFree=1G

其中worker5不滿足application的要求:worker5.coresFree < application.coresPerExecutor

worker4也不滿足application的要求:worker4.memoryFree < application.memoryPerExecutor

因此最終滿足調度要求的worker節點只有前三個,我們將這三個節點記作usableWorkers。

spreadOut算法

先介紹spreadOut算法吧。上面已經說了,滿足條件的worker只有前三個:

Worker1:coresFree=10  memoryFree=10G

Worker2:coresFree=7   memoryFree=1G

Worker3:coresFree=3   memoryFree=2G

第一次調度之后,worker列表如下:

Worker1:coresFree=8  memoryFree=9.5G  assignedExecutors=1  assignedCores=2

Worker2:coresFree=7   memoryFree=1G    assignedExecutors=0  assignedCores=0

Worker3:coresFree=3   memoryFree=2G    assignedExecutors=0  assignedCores=0

totalExecutors:1,totalCores=2

可以發現,worker1的coresFree和memoryFree都變小了而worker2,worker3并沒有發生改變,這是因為我們在worker1上面分配了一個Executor進程(這個Executor進程占用兩個2core,512M memory)而沒有在workre2和worker3上分配。

接下來,開始去worker2上分配:

Worker1:coresFree=8  memoryFree=9.5G      assignedExecutors=1  assignedCores=2

Worker2:coresFree=5   memoryFree=512M    assignedExecutors=1  assignedCores=2

Worker3:coresFree=3   memoryFree=2G        assignedExecutors=0  assignedCores=0

totalExecutors:2,totalCores=4

此時已經分配了2個Executor進程,4個core。

接下來去worker3上分配:

Worker1:coresFree=8  memoryFree=9.5G      assignedExecutors=1  assignedCores=2

Worker2:coresFree=5   memoryFree=512M    assignedExecutors=1  assignedCores=2

Worker3:coresFree=1   memoryFree=1.5G     assignedExecutors=1  assignedCores=2

totalExecutors:3,totalCores=6

接下來再去worker1分配,然后worker2...  ...以round-robin方式分配,由于worker3.coresFree<application.coresPerExecutor,不會在他上面分配資源了:

Worker1:coresFree=6  memoryFree=9.0G      assignedExecutors=2  assignedCores=4

Worker2:coresFree=5   memoryFree=512M    assignedExecutors=1  assignedCores=2

Worker3:coresFree=1   memoryFree=1.5G     assignedExecutors=1  assignedCores=2

totalExecutors:4,totalCores=8

Worker1:coresFree=6  memoryFree=9.0G      assignedExecutors=2  assignedCores=4

Worker2:coresFree=3   memoryFree=0M       assignedExecutors=2  assignedCores=4

Worker3:coresFree=1   memoryFree=1.5G     assignedExecutors=1  assignedCores=2

totalExecutors:5,totalCores=10

此時worker2也不滿足要求了:worker2.memoryFree<application.memoryPerExecutor

因此,下一次分配就去worker1上了:

Worker1:coresFree=4  memoryFree=8.5G      assignedExecutors=3  assignedCores=6

Worker2:coresFree=3   memoryFree=0M        assignedExecutors=2  assignedCores=4

Worker3:coresFree=1   memoryFree=1.5G     assignedExecutors=1  assignedCores=2

totalExecutors:6,totalCores=12

ok,由于已經分配了12個core,達到了application的要求,所以不在為這個application調度了。

非spreadOUt算法

那么非spraadOut算法呢?他是逮到一個worker如果不把他的資源耗盡了是不會放手的:

Worker1:coresFree=8  memoryFree=9.5G  assignedExecutors=1  assignedCores=2

Worker2:coresFree=7   memoryFree=1G    assignedExecutors=0  assignedCores=0

Worker3:coresFree=3   memoryFree=2G    assignedExecutors=0  assignedCores=0

totalExecutors:1,totalCores=2

Worker1:coresFree=6  memoryFree=9.0G  assignedExecutors=2  assignedCores=4

Worker2:coresFree=7   memoryFree=1G    assignedExecutors=0  assignedCores=0

Worker3:coresFree=3   memoryFree=2G    assignedExecutors=0  assignedCores=0

totalExecutors:2,totalCores=4

Worker1:coresFree=4  memoryFree=8.5    assignedExecutors=3  assignedCores=6

Worker2:coresFree=7   memoryFree=1G    assignedExecutors=0  assignedCores=0

Worker3:coresFree=3   memoryFree=2G    assignedExecutors=0  assignedCores=0

totalExecutors:3,totalCores=6

Worker1:coresFree=2  memoryFree=8.0G  assignedExecutors=4  assignedCores=8

Worker2:coresFree=7   memoryFree=1G    assignedExecutors=0  assignedCores=0

Worker3:coresFree=3   memoryFree=2G    assignedExecutors=0  assignedCores=0

totalExecutors:4,totalCores=8

Worker1:coresFree=0  memoryFree=7.5G  assignedExecutors=5  assignedCores=10

Worker2:coresFree=7   memoryFree=1G    assignedExecutors=0  assignedCores=0

Worker3:coresFree=3   memoryFree=2G    assignedExecutors=0  assignedCores=0

totalExecutors:5,totalCores=10

可以發現,worker1的coresfree已經耗盡了,好可憐。由于application需要12個core,而這里才分配了10個,所以還要繼續往下分配:

Worker1:coresFree=0  memoryFree=7.5G      assignedExecutors=5  assignedCores=10

Worker2:coresFree=5   memoryFree=512G    assignedExecutors=1  assignedCores=2

Worker3:coresFree=3   memoryFree=2G        assignedExecutors=0  assignedCores=0

totalExecutors:6,totalCores=12

ok,最終分配來12個core,滿足了application的要求。

對比:

spreadOut算法中,是以round-robin方式,輪詢的在worker節點上分配Executor進程,即以如下序列分配:worker1,worker2... ... worker n,worker1... ....

非spreadOut算法中,逮者一個worker就不放手,直到滿足一下條件之一:

worker.freeCores<application.coresPerExecutor 或者  worker.memoryFree<application.memoryPerExecutor 。

在上面兩個例子中,雖然最終都分配了6個Executor進程和12個core,但是spreadOut方式下,6個Executor進程分散在不同的worker節點上,充分利用了spark集群的worker節點,而非spreadOut方式下,只在worker1和worker2上分配了Executor進程,并沒有充分利用spark worker節點。

小插曲,spreadOut + oneExecutorPerWorker 算法

spark還有一個叫做”oneExecutorPerWorker“機制,即一個worker上啟動一個Executor進程,下面只是簡單的說一下得了:

Worker1:coresFree=8  memoryFree=9.5G  assignedExecutors=1  assignedCores=2

Worker2:coresFree=7   memoryFree=1G    assignedExecutors=0  assignedCores=0

Worker3:coresFree=3   memoryFree=2G    assignedExecutors=0  assignedCores=0

totalExecutors:1,totalCores=2

Worker1:coresFree=8  memoryFree=9.5G      assignedExecutors=1  assignedCores=2

Worker2:coresFree=5   memoryFree=512M    assignedExecutors=1  assignedCores=2

Worker3:coresFree=3   memoryFree=2G        assignedExecutors=0  assignedCores=0

totalExecutors:2,totalCores=4

Worker1:coresFree=8  memoryFree=9.5G      assignedExecutors=1  assignedCores=2

Worker2:coresFree=5   memoryFree=512M    assignedExecutors=1  assignedCores=2

Worker3:coresFree=1   memoryFree=1.5G     assignedExecutors=1  assignedCores=2

totalExecutors:3,totalCores=6

Worker1:coresFree=6  memoryFree=9.0G      assignedExecutors=1  assignedCores=4

Worker2:coresFree=3   memoryFree=512M     assignedExecutors=1  assignedCores=2

Worker3:coresFree=1   memoryFree=1.5G     assignedExecutors=1  assignedCores=2

totalExecutors:3,totalCores=8

Worker1:coresFree=6  memoryFree=9.0G      assignedExecutors=1  assignedCores=4

Worker2:coresFree=2   memoryFree=0   M     assignedExecutors=1  assignedCores=4

Worker3:coresFree=1   memoryFree=1.5G     assignedExecutors=1  assignedCores=2

totalExecutors:3,totalCores=10

Worker1:coresFree=4  memoryFree=9.5G      assignedExecutors=1  assignedCores=6

Worker2:coresFree=2   memoryFree=0   M     assignedExecutors=1  assignedCores=4

Worker3:coresFree=1   memoryFree=1.5G     assignedExecutors=1  assignedCores=2

totalExecutors:3,totalCores=12

和spreadOut+非oneExecutorPerWorker對比發現,唯一的不同就是Executor進程的數量,一個是6,一個是3。

(

這里在額外擴展一下,假設application的maxCores=14,而不是12,那么接著上面那個worker列表來:

Worker1:coresFree=4  memoryFree=9.5G      assignedExecutors=1  assignedCores=6

Worker2:coresFree=0   memoryFree=0   M     assignedExecutors=1  assignedCores=6

Worker3:coresFree=1   memoryFree=1.5G     assignedExecutors=1  assignedCores=2

totalExecutors:3,totalCores=12

雖然worker2.memoryFree=0,但是仍然可以繼續在他上面分配core,因為onExecutorPerWorker機制不檢查內存的限制。

)

接下來看看源碼是怎么實現的:

了解了上面寫的,在閱讀源碼就很輕易了,這里簡單說一下。

org.apache.spark.deploy.master.Master收到application發送的RegisterApplication(description, driver)消息后,開始執行注冊邏輯:

    case RegisterApplication(description, driver) => {
      // TODO Prevent repeated registrations from some driver
      //standby master不調度

      if (state == RecoveryState.STANDBY) {
        // ignore, don't send response
      } else {
        logInfo("Registering app " + description.name)
        val app = createApplication(description, driver)
        //注冊app,即將其加入到waitingApps中
        registerApplication(app)
        logInfo("Registered app " + description.name + " with ID " + app.id)
        //將app加入持久化引擎,主要是為了故障恢復
        persistenceEngine.addApplication(app)
        //向driver發送RegisteredApplication消息表明master已經注冊了這個app
        driver.send(RegisteredApplication(app.id, self))
        //為waitingApps中的app調度資源
        schedule()
      }
    }

上面的注釋已經寫的很清楚了... ...

  /**
   * Schedule the currently available resources among waiting apps. This method will be called
   * every time a new app joins or resource availability changes.
   */
  private def schedule(): Unit = {
    if (state != RecoveryState.ALIVE) { return }
    // Drivers take strict precedence over executors
    //為了避免每次schedule,總是在相同的worker上分配資源,所有這里打亂worker順序。

    val shuffledWorkers = Random.shuffle(workers) // Randomization helps balance drivers
    //下面這個for循環是為driver調度資源,因為這里只將application的調度,所以driver的調度不說了。

    for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) {
      for (driver <- waitingDrivers) {
        if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
          launchDriver(worker, driver)
          waitingDrivers -= driver
        }
      }
    }

    //為application調度資源
    startExecutorsOnWorkers()
  }
  /**
   * Schedule and launch executors on workers
   */
  private def startExecutorsOnWorkers(): Unit = {
    // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
    // in the queue, then the second app, etc.
    // 為waitingApps中的app調度資源,app.coresLeft是app還有多少core沒有分配

    for (app <- waitingApps if app.coresLeft > 0) {
      val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor
      // Filter out workers that don't have enough resources to launch an executor
      // 篩選出狀態為ALIVE并且這個worker剩余內存,剩余core都大于等于app的要求,然后按照coresFree降序排列

      val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
        .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
          worker.coresFree >= coresPerExecutor.getOrElse(1))
        .sortBy(_.coresFree).reverse
      //在usableWorkers上為app分配Executor
      val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)

      // Now that we've decided how many cores to allocate on each worker, let's allocate them
      // 在worker上啟動Executor進程

      for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
        allocateWorkerResourceToExecutors(
          app, assignedCores(pos), coresPerExecutor, usableWorkers(pos))
      }
    }
  }

這個方法做了如下事情:

1.篩選出可用的worker,即usableWorkers,如果一個worker滿足以下所有條件,那么這個worker就被添加到usableWorkers中:

Alive

worker.memoryFree >= app.desc.memoryPerExecutorMB

worker.coresFree >= coresPerExecutor

2.assignedCores是一個數組,assignedCores[i]里面存儲了需要在usableWorkers[i]上分配的core個數,譬如如果assingedCores[1]=2,那么就需要在usableWorkers[1]上分配2個core。

  /**
   * Schedule executors to be launched on the workers.
   * Returns an array containing number of cores assigned to each worker.
   *
   * There are two modes of launching executors. The first attempts to spread out an application's
   * executors on as many workers as possible, while the second does the opposite (i.e. launch them
   * on as few workers as possible). The former is usually better for data locality purposes and is
   * the default.
   *
   * The number of cores assigned to each executor is configurable. When this is explicitly set,
   * multiple executors from the same application may be launched on the same worker if the worker
   * has enough cores and memory. Otherwise, each executor grabs all the cores available on the
   * worker by default, in which case only one executor may be launched on each worker.
   *
   * It is important to allocate coresPerExecutor on each worker at a time (instead of 1 core
   * at a time). Consider the following example: cluster has 4 workers with 16 cores each.
   * User requests 3 executors (spark.cores.max = 48, spark.executor.cores = 16). If 1 core is
   * allocated at a time, 12 cores from each worker would be assigned to each executor.
   * Since 12 < 16, no executors would launch [SPARK-8881].
   */
  private def scheduleExecutorsOnWorkers(
      app: ApplicationInfo,
      usableWorkers: Array[WorkerInfo],
      spreadOutApps: Boolean): Array[Int] = {
    val coresPerExecutor = app.desc.coresPerExecutor
    val minCoresPerExecutor = coresPerExecutor.getOrElse(1)
    val oneExecutorPerWorker = coresPerExecutor.isEmpty
    val memoryPerExecutor = app.desc.memoryPerExecutorMB
    val numUsable = usableWorkers.length
    val assignedCores = new Array[Int](numUsable) // Number of cores to give to each worker
    val assignedExecutors = new Array[Int](numUsable) // Number of new executors on each worker
    var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)

    /** Return whether the specified worker can launch an executor for this app. */
    //是否可以在一個worker上分配Executor
    def canLaunchExecutor(pos: Int): Boolean = {
      val keepScheduling = coresToAssign >= minCoresPerExecutor
      val enoughCores = usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor

      // If we allow multiple executors per worker, then we can always launch new executors.
      // Otherwise, if there is already an executor on this worker, just give it more cores.
      val launchingNewExecutor = !oneExecutorPerWorker || assignedExecutors(pos) == 0
      if (launchingNewExecutor) {
        //在不里,需要檢查worker的空閑core和內存是否夠用
        val assignedMemory = assignedExecutors(pos) * memoryPerExecutor
        val enoughMemory = usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutor
        val underLimit = assignedExecutors.sum + app.executors.size < app.executorLimit
        keepScheduling && enoughCores && enoughMemory && underLimit
      } else {
        // We're adding cores to an existing executor, so no need
        // to check memory and executor limits
        //尤其需要注意的是,oneExecutorPerWorker機制下,不檢測內存限制,很重要。
        keepScheduling && enoughCores
      }
    }

    // Keep launching executors until no more workers can accommodate any
    // more executors, or if we have reached this application's limits
    var freeWorkers = (0 until numUsable).filter(canLaunchExecutor)
    while (freeWorkers.nonEmpty) {
      freeWorkers.foreach { pos =>
        var keepScheduling = true
        while (keepScheduling && canLaunchExecutor(pos)) {
          //要分配的cores
          coresToAssign -= minCoresPerExecutor
          //已分配的cores
          assignedCores(pos) += minCoresPerExecutor

          // If we are launching one executor per worker, then every iteration assigns 1 core
          // to the executor. Otherwise, every iteration assigns cores to a new executor.
          //一個worker只啟動一個Executor
          if (oneExecutorPerWorker) {
            assignedExecutors(pos) = 1
          } else {
            assignedExecutors(pos) += 1
          }

          // Spreading out an application means spreading out its executors across as
          // many workers as possible. If we are not spreading out, then we should keep
          // scheduling executors on this worker until we use all of its resources.
          // Otherwise, just move on to the next worker.
          //如果沒有開啟spreadOUt算法,就一直在一個worker上分配,直到不能再分配為止。

          if (spreadOutApps) {
            keepScheduling = false
          }
        }
      }
      freeWorkers = freeWorkers.filter(canLaunchExecutor)
    }
    assignedCores
  }
  /**
   * Allocate a worker's resources to one or more executors.
   * @param app the info of the application which the executors belong to
   * @param assignedCores number of cores on this worker for this application
   * @param coresPerExecutor number of cores per executor
   * @param worker the worker info
   */
  private def allocateWorkerResourceToExecutors(
      app: ApplicationInfo,
      assignedCores: Int,
      coresPerExecutor: Option[Int],
      worker: WorkerInfo): Unit = {
    // If the number of cores per executor is specified, we divide the cores assigned
    // to this worker evenly among the executors with no remainder.
    // Otherwise, we launch a single executor that grabs all the assignedCores on this worker.
    //計算要創建多少個Executor進程,默認值是1.

    val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1)
    val coresToAssign = coresPerExecutor.getOrElse(assignedCores)
    for (i <- 1 to numExecutors) {
      val exec = app.addExecutor(worker, coresToAssign)
      //真正的啟動Executor進程了。
      launchExecutor(worker, exec)
      app.state = ApplicationState.RUNNING
    }
  }

由于本人接觸spark時間不長,如有錯誤或者任何意見可以在留言或者發送郵件到franciswbs@163.com,讓我們一起交流。

作者:FrancisWang

郵箱:franciswbs@163.com

出處:http://www.cnblogs.com/francisYoung/

本文地址:http://www.cnblogs.com/francisYoung/

本文版權歸作者和博客園共有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文連接,否則保留追究法律責任的權利。

 本文由用戶 NorineXGII 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!