Java ThreadPool

jopen 12年前發布 | 33K 次閱讀 Java Java開發

線程池類為 java.util.concurrent.ThreadPoolExecutor,常用構造方法為:
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler)

corePoolSize: 線程池維護線程的最少數量
maximumPoolSize:線程池維護線程的最大數量
keepAliveTime: 線程池維護線程所允許的空閑時間
unit: 線程池維護線程所允許的空閑時間的單位
workQueue: 線程池所使用的緩沖隊列,主要使用ArrayBlockingQueue, LinkedBlockingQueue, SynchronousQueue
handler: 線程池對拒絕任務的處理策略

創建新線程

使用ThreadFactory創建新線程.如果沒有另外說明,則在同一個ThreadGroup中一律使用 Executors.defaultThreadFactory()創建線程,并且這些線程具有相同的NORM_PRIORITY優先級和非守護進程狀態.通過提供不同的ThreadFactory,可以改變線程的名稱,線程組,優先級,守護進程狀態,等等.如果從newThread返回null時 ThreadFactory未能創建線程,則執行程序將繼續運行,但不能執行任何任務.

如果池中當前有多于corePoolSize的線程,則這些多出的線程在空閑時間超過keepAliveTime時將會終止(參見 getKeepAliveTime(java.util.concurrent.TimeUnit)).這提供了當池處于非活動狀態時減少資源消耗的方法.如果池后來變得更為活動,則可以創建新的線程.也可以使用方法 setKeepAliveTime(long,java.util.concurrent.TimeUnit)動態地更改此參數.

排隊

所有BlockingQueue都可用于傳輸和保持提交的任務.可以使用此隊列與池大小進行交互:

  • 如果運行的線程少于corePoolSize,則Executor始終首選添加新的線程,而不進行排隊.
  • 如果運行的線程等于或多于corePoolSize,則Executor始終首選將請求加入隊列,而不添加新的線程.
  • 如果無法將請求加入隊列,則創建新的線程,除非創建此線程超出maximumPoolSize,在這種情況下,任務將被拒絕.

根據以上的三種類型衍生出三種策略

  1. 直接提交.工作隊列的默認選項是 SynchronousQueue,它將任務直接提交給線程而不保持它們.在此,如果不存在可用于立即運行任務的線程,則試圖把任務加入隊列將失敗,因此會構造一個新的線程.此策略可以避免在處理可能具有內部依賴性的請求集時出現鎖.直接提交通常要求無界 maximumPoolSizes 以避免拒絕新提交的任務.當命令以超過隊列所能處理的平均數連續到達時,此策略允許無界線程具有增長的可能性.
  2. 無界隊列.使用無界隊列(不具有預定義容量的LinkedBlockingQueue)將導致在所有 corePoolSize 線程都忙時新任務在隊列中等待.這樣,創建的線程就不會超過 corePoolSize.(因此maximumPoolSize 的值也就無效了)當每個任務完全獨立于其他任務,即任務執行互不影響時,適合于使用無界隊列.例如,在 Web 頁服務器中.這種排隊可用于處理瞬態突發請求,當命令以超過隊列所能處理的平均數連續到達時,此策略允許無界線程具有增長的可能性.
  3. 有界隊列.當使用有限的 maximumPoolSizes 時,有界隊列(如 ArrayBlockingQueue)有助于有助于防止資源耗盡,但是可能較難調整和控制.隊列大小和最大池大小可能需要相互折衷,使用大型隊列和小型池可以最大限度地降低 CPU 使用率,操作系統資源和上下文切換開銷,但是可能導致人工降低吞吐量.如果任務頻繁阻塞(如I/O 操作),則系統可能為超過您許可的更多線程安排時間.使用小型隊列通常要求較大的池大小,CPU 使用率較高,但是可能遇到不可接受的調度開銷,這樣也會降低吞吐量.

舉例

new ThreadPoolExecutor(2, 3, 30, TimeUnit.SECONDS, new SynchronousQueue(), new RecorderThreadFactory("CookieRecorderPool"), new ThreadPoolExecutor.CallerRunsPolicy());

當池中線程已經有兩個正在運行,此時繼續來了一個任務a,根據前面介紹的”如果運行的線程等于或多于 corePoolSize,則 Executor 始終首選將請求加入隊列,而不添加新的線程.”所以A被添加到queue中.

此時又來了一個任務B,且池中的兩個線程還沒有忙完,但是由于使用的SynchronousQueue,所以一定無法加入進去.此時便滿足了上面提到的”如果無法將請求加入隊列,則創建新的線程,除非創建此線程超出maximumPoolSize,在這種情況下,任務將被拒絕.”所以此時會新建一個線程來運行這個任務.但如果這三個任務都還沒完成,連續又來了兩個任務,第一個添加入queue中,后一個呢?queue中無法插入,而線程數達到了 maximumPoolSize,所以只好執行異常策略了.

所以在使用SynchronousQueue通常要求maximumPoolSize是無界的,這樣就可以避免上述情況發生(如果希望限制就直接使用有界隊列).對于使用SynchronousQueue的作用jdk中寫的很清楚:此策略可以避免在處理可能具有內部依賴性的請求集時出現鎖.
如何理解?如果你的任務A1,A2有內部關聯,A1需要先運行,那么先提交A1,再提交A2,當使用SynchronousQueue我們可以保證,A1必定先被執行,在A1么有被執行前,A2不可能添加入queue中.

使用無界隊列策略,即LinkedBlockingQueue,根據前文提到的規則:”如果運行的線程少于corePoolSize,則 Executor始終首選添加新的線程,而不進行排隊.”那么當任務繼續增加,會發生什么呢?如果運行的線程等于或多于corePoolSize,則 Executor始終首選將請求加入隊列,而不添加新的線程,此時任務變加入隊列之中了,對于無界隊列來說,總是可以加入的(除非資源耗盡).換句說,永遠也不會觸發產生新的線程!corePoolSize大小的線程數會一直運行,忙完當前的,就從隊列中拿任務開始運行.所以開發時要防止任務瘋漲.

綜上所述,在使用時需要注意以下幾點:

  1. ThreadPoolExecutor的使用還是很有技巧的,需要關注corePoolSize,maximumPoolSizes以及Queue的選擇.
  2. 使用無界queue可能會耗盡系統資源.
  3. 使用有界queue可能不能很好的滿足性能,需要調節線程數和queue大小.
  4. 線程數自然也有開銷,所以需要根據不同應用進行調節.

來自:http://www.pigg.co/java-threadpool-queue.html

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!