Java線程池框架核心代碼分析

qzbr7800 7年前發布 | 15K 次閱讀 線程池 Java Java開發

前言

多線程編程中,為每個任務分配一個線程是不現實的,線程創建的開銷和資源消耗都是很高的。線程池應運而生,成為我們管理線程的利器。Java 通過 Executor 接口,提供了一種標準的方法將任務的提交過程和執行過程解耦開來,并用 Runnable 表示任務。

下面,我們來分析一下 Java 線程池框架的實現 ThreadPoolExecutor 。

下面的分析基于JDK1.7

生命周期

ThreadPoolExecutor 中,使用 CAPACITY 的高3位來表示運行狀態,分別是:

  1. RUNNING:接收新任務,并且處理任務隊列中的任務
  2. SHUTDOWN:不接收新任務,但是處理任務隊列的任務
  3. STOP:不接收新任務,不出來任務隊列,同時中斷所有進行中的任務
  4. TIDYING:所有任務已經被終止,工作線程數量為 0,到達該狀態會執行 terminated()
  5. TERMINATED: terminated() 執行完畢

狀態轉換圖

ThreadPoolExecutor 中用原子類來表示狀態位

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

線程池模型

核心參數

  • corePoolSize :最小存活的工作線程數量(如果設置 allowCoreThreadTimeOut ,那么該值為 0)
  • maximumPoolSize :最大的線程數量,受限于 CAPACITY
  • keepAliveTime :對應線程的存活時間,時間單位由TimeUnit指定
  • workQueue :工作隊列,存儲待執行的任務
  • RejectExecutionHandler :拒絕策略,線程池滿后會觸發

線程池的最大容量: CAPACITY 中的前三位用作標志位,也就是說工作線程的最大容量為 (2^29)-1

四種模型

  • CachedThreadPool :一個可緩存的線程池,如果線程池的當前規模超過了處理需求時,那么將回收空閑的線程,當需求增加時,則可以添加新的線程,線程池的規模不存在任何的限制。
  • FixedThreadPool :一個固定大小的線程池,提交一個任務時就創建一個線程,直到達到線程池的最大數量,這時線程池的大小將不再變化。
  • SingleThreadPool :一個單線程的線程池,它只有一個工作線程來執行任務,可以確保按照任務在隊列中的順序來串行執行,如果這個線程異常結束將創建一個新的線程來執行任務。
  • ScheduledThreadPool :一個固定大小的線程池,并且以延遲或者定時的方式來執行任務,類似于Timer。

執行任務 execute

核心邏輯:

  1. 當前線程數量 <  corePoolSize ,直接開啟新的核心線程執行任務 addWorker(command, true)
  2. 當前線程數量 >=  corePoolSize ,且任務加入工作隊列成功
    1. 檢查線程池當前狀態是否處于 RUNNING
    2. 如果否,則拒絕該任務
    3. 如果是,判斷當前線程數量是否為 0,如果為 0,就增加一個工作線程。
    </li>
  3. 開啟普通線程執行任務 addWorker(command, false) ,開啟失敗就拒絕該任務
  4. </ol>

    從上面的分析可以總結出線程池運行的四個階段:

    1. poolSize < corePoolSize  且隊列為空,此時會新建線程來處理提交的任務
    2. poolSize == corePoolSize ,此時提交的任務進入工作隊列,工作線程從隊列中獲取任務執行,此時隊列不為空且未滿。
    3. poolSize == corePoolSize ,并且隊列已滿,此時也會新建線程來處理提交的任務,但是 poolSize < maxPoolSize
    4. poolSize == maxPoolSize ,并且隊列已滿,此時會觸發拒絕策略

    拒絕策略

    前面我們提到任務無法執行會被拒絕, RejectedExecutionHandler 是處理被拒絕任務的接口。下面是四種拒絕策略。

    • AbortPolicy :默認策略,終止任務,拋出RejectedException
    • CallerRunsPolicy :在調用者線程執行當前任務,不拋異常
    • DiscardPolicy : 拋棄策略,直接丟棄任務,不拋異常
    • DiscardOldersPolicy :拋棄最老的任務,執行當前任務,不拋異常

    線程池中的 Worker

    Worker 繼承了 AbstractQueuedSynchronizer 和 Runnable ,前者給 Worker 提供鎖的功能,后者執行工作線程的主要方法 runWorker(Worker w) (從任務隊列撈任務執行)。Worker 引用存在 workers 集合里面,用 mainLock 守護。

    private final ReentrantLock mainLock = new ReentrantLock();
    private final HashSet<Worker> workers = new HashSet<Worker>();

    核心函數 runWorker

    下面是簡化的邏輯,注意:每個工作線程的 run 都執行下面的函數

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        while (task != null || (task = getTask()) != null) {
            w.lock();
            beforeExecute(wt, task);
            task.run();
            afterExecute(task, thrown);
            w.unlock();
        }
        processWorkerExit(w, completedAbruptly);
    }
    1. 從 getTask() 中獲取任務
    2. 鎖住 worker
    3. 執行 beforeExecute(wt, task) ,這是 ThreadPoolExecutor 提供給子類的擴展方法
    4. 運行任務,如果該worker有配置了首次任務,則先執行首次任務且只執行一次。
    5. 執行 afterExecute(task, thrown);
    6. 解鎖 worker
    7. 如果獲取到的任務為 null,關閉 worker

    獲取任務 getTask

    線程池內部的任務隊列是一個阻塞隊列,具體實現在構造時傳入。

    private final BlockingQueue<Runnable> workQueue;

    getTask() 從任務隊列中獲取任務,支持阻塞和超時等待任務,四種情況會導致返回 null ,讓 worker 關閉。

    1. 現有的線程數量超過最大線程數量
    2. 線程池處于 STOP 狀態
    3. 線程池處于 SHUTDOWN 狀態且工作隊列為空
    4. 線程等待任務超時,且線程數量超過保留線程數量

    核心邏輯:根據 timed 在阻塞隊列上超時等待或者阻塞等待任務,等待任務超時會導致工作線程被關閉。

    timed = allowCoreThreadTimeOut || wc > corePoolSize;
    Runnable r = timed ?
        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
        workQueue.take();

    在以下兩種情況下等待任務會超時:

    1. 允許核心線程等待超時,即 allowCoreThreadTimeOut(true)
    2. 當前線程是普通線程,此時 wc > corePoolSize

    工作隊列使用的是 BlockingQueue ,這里就不展開了,后面再寫一篇詳細的分析。

    總結

    • ThreadPoolExecutor 基于生產者-消費者模式,提交任務的操作相當于生產者,執行任務的線程相當于消費者。
    • Executors 提供了四種基于 ThreadPoolExecutor 構造線程池模型的方法,除此之外,我們還可以直接繼承 ThreadPoolExecutor ,重寫 beforeExecute 和 afterExecute 方法來定制線程池任務執行過程。
    • 使用有界隊列還是無界隊列需要根據具體情況考慮,工作隊列的大小和線程的數量也是需要好好考慮的。
    • 拒絕策略推薦使用 CallerRunsPolicy ,該策略不會拋棄任務,也不會拋出異常,而是將任務回退到調用者線程中執行。

     

    來自:http://developer.51cto.com/art/201702/530079.htm

     

 本文由用戶 qzbr7800 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!