Java的ThreadPoolExecutor使用幾點建議

fmms 12年前發布 | 85K 次閱讀 Java Java開發

背景

前段時間一個項目中因為涉及大量的線程開發,把jdk cocurrent的代碼重新再過了一遍。這篇文章中主要是記錄一下學習ThreadPoolExecutor過程中容易被人忽略的點,Doug Lea的整個類設計還是非常nice的

 

正文

先看一副圖,描述了ThreadPoolExecutor的工作機制: 

ThreadPoolExecutor幾點使用建議

 

整個ThreadPoolExecutor的任務處理有4步操作:

 

  • 第一步,初始的poolSize < corePoolSize,提交的runnable任務,會直接做為new一個Thread的參數,立馬執行
  • 第二步,當提交的任務數超過了corePoolSize,就進入了第二步操作。會將當前的runable提交到一個block queue中
  • 第三步,如果block queue是個有界隊列,當隊列滿了之后就進入了第三步。如果poolSize < maximumPoolsize時,會嘗試new 一個Thread的進行救急處理,立馬執行對應的runnable任務
  • 第四步,如果第三步救急方案也無法處理了,就會走到第四步執行reject操作。
幾點說明:(相信這些網上一搜一大把,我這里簡單介紹下,為后面做一下鋪墊)
  • block queue有以下幾種實現:
    1. ArrayBlockingQueue :  有界的數組隊列
    2. LinkedBlockingQueue : 可支持有界/無界的隊列,使用鏈表實現
    3. PriorityBlockingQueue : 優先隊列,可以針對任務排序
    4. SynchronousQueue : 隊列長度為1的隊列,和Array有點區別就是:client thread提交到block queue會是一個阻塞過程,直到有一個worker thread連接上來poll task。
  • RejectExecutionHandler是針對任務無法處理時的一些自保護處理:
    1. Reject 直接拋出Reject exception
    2. Discard 直接忽略該runnable,不可取
    3. DiscardOldest 丟棄最早入隊列的的任務
    4. CallsRun 直接讓原先的client thread做為worker線程,進行執行

容易被人忽略的點:

1.  pool threads啟動后,以后的任務獲取都會通過block queue中,獲取堆積的runnable task. 

</div>


所以建議:  block size >= corePoolSize ,不然線程池就沒任何意義

2.  corePoolSize 和 maximumPoolSize的區別, 和大家正常理解的數據庫連接池不太一樣。 

</div>

  *  據dbcp pool為例,會有minIdle , maxActive配置。minIdle代表是常駐內存中的threads數量,maxActive代表是工作的最大線程數。
  *  這里的corePoolSize就是連接池的maxActive的概念,它沒有minIdle的概念(每個線程可以設置keepAliveTime,超過多少時間多有任務后銷毀線程,但不會固定保持一定數量的threads)。 
  * 這里的maximumPoolSize,是一種救急措施的第一層。當threadPoolExecutor的工作threads存在滿負荷,并且block queue隊列也滿了,這時代表接近崩潰邊緣。這時允許臨時起一批threads,用來處理runnable,處理完后立馬退出。

所以建議:   maximumPoolSize >= corePoolSize =期望的最大線程數。 (我曾經配置了corePoolSize=1, maximumPoolSize=20, blockqueue為無界隊列,最后就成了單線程工作的pool。典型的配置錯誤)

3. 善用blockqueue和reject組合. 這里要重點推薦下CallsRun的Rejected Handler,從字面意思就是讓調用者自己來運行。 

</div>

我們經常會在線上使用一些線程池做異步處理,比如我前面做的 (業務層)異步并行加載技術分析和設計將原本串行的請求都變為了并行操作,但過多的并行會增加系統的負載(比如軟中斷,上下文切換)。所以肯定需要對線程池做一個size限制。但是為了引入異步操作后,避免因在block queue的等待時間過長,所以需要在隊列滿的時,執行一個callsRun的策略,并行的操作又轉為一個串行處理,這樣就可以保證盡量少的延遲影響。

所以建議:   RejectExecutionHandler = CallsRun ,  blockqueue size = 2 * poolSize (為啥是2倍poolSize,主要一個考慮就是瞬間高峰處理,允許一個thread等待一個runnable任務)

Btrace容量規劃

再提供一個btrace腳本,分析線上的thread pool容量規劃是否合理,可以運行時輸出poolSize等一些數據。

import static com.sun.btrace.BTraceUtils.addToAggregation;
import static com.sun.btrace.BTraceUtils.field;
import static com.sun.btrace.BTraceUtils.get;
import static com.sun.btrace.BTraceUtils.newAggregation;
import static com.sun.btrace.BTraceUtils.newAggregationKey;
import static com.sun.btrace.BTraceUtils.printAggregation;
import static com.sun.btrace.BTraceUtils.println;
import static com.sun.btrace.BTraceUtils.str;
import static com.sun.btrace.BTraceUtils.strcat;

import java.lang.reflect.Field; import java.util.concurrent.atomic.AtomicInteger;

import com.sun.btrace.BTraceUtils; import com.sun.btrace.aggregation.Aggregation; import com.sun.btrace.aggregation.AggregationFunction; import com.sun.btrace.aggregation.AggregationKey; import com.sun.btrace.annotations.BTrace; import com.sun.btrace.annotations.Kind; import com.sun.btrace.annotations.Location; import com.sun.btrace.annotations.OnEvent; import com.sun.btrace.annotations.OnMethod; import com.sun.btrace.annotations.OnTimer; import com.sun.btrace.annotations.Self;

/**

  • 并行加載監控
  • @author jianghang 2011-4-7 下午10:59:53 */ @BTrace public class AsyncLoadTracer {

    private static AtomicInteger rejecctCount = BTraceUtils.newAtomicInteger(0); private static Aggregation histogram = newAggregation(AggregationFunction.QUANTIZE); private static Aggregation average = newAggregation(AggregationFunction.AVERAGE); private static Aggregation max = newAggregation(AggregationFunction.MAXIMUM); private static Aggregation min = newAggregation(AggregationFunction.MINIMUM); private static Aggregation sum = newAggregation(AggregationFunction.SUM); private static Aggregation count = newAggregation(AggregationFunction.COUNT);

    @OnMethod(clazz = "java.util.concurrent.ThreadPoolExecutor", method = "execute", location = @Location(value = Kind.ENTRY)) public static void executeMonitor(@Self Object self) {

     Field poolSizeField = field("java.util.concurrent.ThreadPoolExecutor", "poolSize");
     Field largestPoolSizeField = field("java.util.concurrent.ThreadPoolExecutor", "largestPoolSize");
     Field workQueueField = field("java.util.concurrent.ThreadPoolExecutor", "workQueue");
    
     Field countField = field("java.util.concurrent.ArrayBlockingQueue", "count");
     int poolSize = (Integer) get(poolSizeField, self);
     int largestPoolSize = (Integer) get(largestPoolSizeField, self);
     int queueSize = (Integer) get(countField, get(workQueueField, self));
    
     println(strcat(strcat(strcat(strcat(strcat("poolSize : ", str(poolSize)), " largestPoolSize : "),
                                  str(largestPoolSize)), " queueSize : "), str(queueSize)));
    

    }

    @OnMethod(clazz = "java.util.concurrent.ThreadPoolExecutor", method = "reject", location = @Location(value = Kind.ENTRY)) public static void rejectMonitor(@Self Object self) {

     String name = str(self);
     if (BTraceUtils.startsWith(name, "com.alibaba.pivot.common.asyncload.impl.pool.AsyncLoadThreadPool")) {
         BTraceUtils.incrementAndGet(rejecctCount);
     }
    

    }

    @OnTimer(1000) public static void rejectPrintln() {

     int reject = BTraceUtils.getAndSet(rejecctCount, 0);
     println(strcat("reject count in 1000 msec: ", str(reject)));
     AggregationKey key = newAggregationKey("rejectCount");
     addToAggregation(histogram, key, reject);
     addToAggregation(average, key, reject);
     addToAggregation(max, key, reject);
     addToAggregation(min, key, reject);
     addToAggregation(sum, key, reject);
     addToAggregation(count, key, reject);
    

    }

    @OnEvent public static void onEvent() {

     BTraceUtils.truncateAggregation(histogram, 10);
     println("---------------------------------------------");
     printAggregation("Count", count);
     printAggregation("Min", min);
     printAggregation("Max", max);
     printAggregation("Average", average);
     printAggregation("Sum", sum);
     printAggregation("Histogram", histogram);
     println("---------------------------------------------");
    

    } }</pre>

    運行結果:

    poolSize : 1 , largestPoolSize = 10 , queueSize = 10
    reject count in 1000 msec: 0

    說明:

    1. poolSize 代表為當前的線程數

    2. largestPoolSize 代表為歷史最大的線程數

    3. queueSize 代表blockqueue的當前堆積的size

    4. reject count 代表在1000ms內的被reject的數量。

    轉自:http://www.iteye.com/topic/1118660

 本文由用戶 fmms 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!