一分鐘吃透 Spark 之 TaskScheduler

smallstone 7年前發布 | 24K 次閱讀 Spark 分布式/云計算/大數據

DagScheduler 和 TaskScheduler 的任務交接

spark 調度器分為兩個部分, 一個是 DagScheduler, 一個是 TaskScheduler, DagScheduler 主要是用來把一個 Job 根據寬依賴劃分為多個Stage(階段),

對于劃分出來的每個 stage 都抽象為一個  TaskSet任務集 交給  TaskScheduler 來進行進一步的調度運行,

我們來看一張圖, 來理清里面的概念, 我們用戶編程使用的 RDD, 每個 RDD都有一個分區數,  這個分區數目創建 RDD 的時候有一個初始值,運行過程中,根據配置的 parallelism 參數 和 shuffle 過程中顯示指定的分區數目 來調整個數

我們可以看到, 一個 task 對應一個 stage 里面一個分區數據的處理任務,  task 又分為 ShuffleMapTask 和 ResultTask , 區分任務是中間階段的任務 還是最后一個階段的任務。

而一個 stage 里面 所有分區的任務集合 就被包裝為一個  TaskSet 交給了 TaskScheduler,

TaskScheduler 調度方式

TaskScheduler 會為每個 TaskSet 創建一個 TaskScheduler, 作為一個調度單位, 放在任務池子里面,

調度池分為兩種, 一種使用 FIFO調度方式 , 還有一種使用 Fair調度方式

FIFO調度方式

這種方式 rootPool 下面直接就是 TaskSetManager , 沒有子 Pool,

根據  FIFOSchedulingAlgorithm 算法排序, 這種方式排序方式很簡單, 直接就是先進先出隊列的排序方式對多個  TaskSetManager 進行排隊,

Fair調度方式

這種方式 rootPool 是根 pool, 下一級是 用戶定義的 Pool,  這一層是為了給不同的用戶定義不同的優先級用的,

用戶 Pool 下面才是 TaskSetManager

FairSchedulingAlgorithm 算法,排序方式是由兩個因子控制,

  • weight: 控制資源池相對其他資源池,可以分配到資源的比例。默認所有資源池的weight都是1。如果你將某個資源池的weight設為2,那么該資源池中的資源將是其他池子的2倍。如果將weight設得很高,如1000,可以實現資源池之間的調度優先級 – 也就是說,weight=1000的資源池總能立即啟動其對應的作業。

  • minShare:除了整體weight之外,每個資源池還能指定一個最小資源分配值(CPU個數),管理員可能會需要這個設置。公平調度器總是會嘗試優先滿足所有活躍(active)資源池的最小資源分配值,然后再根據各個池子的weight來分配剩下的資源。因此,minShare屬性能夠確保每個資源池都能至少獲得一定量的集群資源。minShare的默認值是0。

排序也是遞歸的, 因為 rootPool 下面有多個 用戶自己的 Pool, 要先根據 FairSchedulingAlgorithm 算法對多個 用戶的Pool 排序, 然后對一個 Pool 中的多個 TaskSetManager 也使用 FairSchedulingAlgorithm 算法排序。

任務實際運行的觸發方式

觸發方式有兩種,

  • 一堆任務  從 DagScheduler 到 TaskSetManager 提交過來了, 這個時候可能有 大量 executor 的cpu 都閑著呢, 所以要 喚醒他們過來領任務去執行,  這種我們叫做喚醒方式,

  • 一個正在執行的任務跑完了,  executor 報告任務已經執行完的時候, 這個時候 這個 cpu 又閑著了,看看有沒有 任務去領一下,  這種我們叫做干完了手里的活接著干方式

  • 有可能增加了新的 Executor  ,這個 executor 來注冊了, 相當于產生了新的勞動力, 肯定也要去領活干, 我們叫新勞動力領活干方式,

對于 喚醒方式 就是  TaskScheduler 把  TaskScheduler, 作為一個調度單位, 放在任務池子里面后, 然后 調用 reviveOffers 來喚醒, 里面是調用 makeOffers() 方法,

對于 干完了手里的活接著干方式,  就是 接受到了 StatusUpdate 消息,  會去調用一下  makeOffers(executorId)

對于  新勞動力領活干方式 就是接受到  RegisterExecutor 消息后調用一下  makeOffers()

我們來看下  makeOffers() 方法,

 val workOffers = IndexedSeq(
          new WorkerOffer(executorId, executorData.executorHost, executorData.freeCores))
        launchTasks(scheduler.resourceOffers(workOffers))

把執行單位包裝為 一個 WorkerOffer ,然后調用  taskSchedule 的 resourceOffers方法, 這個方法的注釋如下

Called by cluster manager to offer resources on slaves. We respond by asking our active task

sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so

that tasks are balanced across the cluster.

這個是  resourceOffers 函數的注釋,  就是從把任務池子 中的任務排隊, 然后取出最前面的任務, 來和 執行單位相結合, 這里需要注意的是, 會把任務盡可能均勻的分配到每個node 上,  一個任務和一個執行單位的結合包裝為一個 TaskDescription,  然后把任務發送到執行單位上去執行,

上圖中我們可以看到, 只要有空閑的 executor 就會提供資源給 task, 首先要把 workerOffer shuffle  打亂一下, 免得過分蹂躪個別的 executor。

 

來自:http://mp.weixin.qq.com/s/UaIxXjTFdSauknbj_Ndj1w

 

 本文由用戶 smallstone 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!