Java的ThreadPoolExecutor使用幾點建議
背景
前段時間一個項目中因為涉及大量的線程開發,把jdk cocurrent的代碼重新再過了一遍。這篇文章中主要是記錄一下學習ThreadPoolExecutor過程中容易被人忽略的點,Doug Lea的整個類設計還是非常nice的
正文
先看一副圖,描述了ThreadPoolExecutor的工作機制:
整個ThreadPoolExecutor的任務處理有4步操作:
- 第一步,初始的poolSize < corePoolSize,提交的runnable任務,會直接做為new一個Thread的參數,立馬執行
- 第二步,當提交的任務數超過了corePoolSize,就進入了第二步操作。會將當前的runable提交到一個block queue中
- 第三步,如果block queue是個有界隊列,當隊列滿了之后就進入了第三步。如果poolSize < maximumPoolsize時,會嘗試new 一個Thread的進行救急處理,立馬執行對應的runnable任務
- 第四步,如果第三步救急方案也無法處理了,就會走到第四步執行reject操作。
- block queue有以下幾種實現:
1. ArrayBlockingQueue : 有界的數組隊列
2. LinkedBlockingQueue : 可支持有界/無界的隊列,使用鏈表實現
3. PriorityBlockingQueue : 優先隊列,可以針對任務排序
4. SynchronousQueue : 隊列長度為1的隊列,和Array有點區別就是:client thread提交到block queue會是一個阻塞過程,直到有一個worker thread連接上來poll task。 - RejectExecutionHandler是針對任務無法處理時的一些自保護處理:
1. Reject 直接拋出Reject exception
2. Discard 直接忽略該runnable,不可取
3. DiscardOldest 丟棄最早入隊列的的任務
4. CallsRun 直接讓原先的client thread做為worker線程,進行執行
1. pool threads啟動后,以后的任務獲取都會通過block queue中,獲取堆積的runnable task.
</div>
2. corePoolSize 和 maximumPoolSize的區別, 和大家正常理解的數據庫連接池不太一樣。
</div>
3. 善用blockqueue和reject組合. 這里要重點推薦下CallsRun的Rejected Handler,從字面意思就是讓調用者自己來運行。
</div>
Btrace容量規劃
再提供一個btrace腳本,分析線上的thread pool容量規劃是否合理,可以運行時輸出poolSize等一些數據。
import static com.sun.btrace.BTraceUtils.addToAggregation; import static com.sun.btrace.BTraceUtils.field; import static com.sun.btrace.BTraceUtils.get; import static com.sun.btrace.BTraceUtils.newAggregation; import static com.sun.btrace.BTraceUtils.newAggregationKey; import static com.sun.btrace.BTraceUtils.printAggregation; import static com.sun.btrace.BTraceUtils.println; import static com.sun.btrace.BTraceUtils.str; import static com.sun.btrace.BTraceUtils.strcat;import java.lang.reflect.Field; import java.util.concurrent.atomic.AtomicInteger;
import com.sun.btrace.BTraceUtils; import com.sun.btrace.aggregation.Aggregation; import com.sun.btrace.aggregation.AggregationFunction; import com.sun.btrace.aggregation.AggregationKey; import com.sun.btrace.annotations.BTrace; import com.sun.btrace.annotations.Kind; import com.sun.btrace.annotations.Location; import com.sun.btrace.annotations.OnEvent; import com.sun.btrace.annotations.OnMethod; import com.sun.btrace.annotations.OnTimer; import com.sun.btrace.annotations.Self;
/**
- 并行加載監控
@author jianghang 2011-4-7 下午10:59:53 */ @BTrace public class AsyncLoadTracer {
private static AtomicInteger rejecctCount = BTraceUtils.newAtomicInteger(0); private static Aggregation histogram = newAggregation(AggregationFunction.QUANTIZE); private static Aggregation average = newAggregation(AggregationFunction.AVERAGE); private static Aggregation max = newAggregation(AggregationFunction.MAXIMUM); private static Aggregation min = newAggregation(AggregationFunction.MINIMUM); private static Aggregation sum = newAggregation(AggregationFunction.SUM); private static Aggregation count = newAggregation(AggregationFunction.COUNT);
@OnMethod(clazz = "java.util.concurrent.ThreadPoolExecutor", method = "execute", location = @Location(value = Kind.ENTRY)) public static void executeMonitor(@Self Object self) {
Field poolSizeField = field("java.util.concurrent.ThreadPoolExecutor", "poolSize"); Field largestPoolSizeField = field("java.util.concurrent.ThreadPoolExecutor", "largestPoolSize"); Field workQueueField = field("java.util.concurrent.ThreadPoolExecutor", "workQueue"); Field countField = field("java.util.concurrent.ArrayBlockingQueue", "count"); int poolSize = (Integer) get(poolSizeField, self); int largestPoolSize = (Integer) get(largestPoolSizeField, self); int queueSize = (Integer) get(countField, get(workQueueField, self)); println(strcat(strcat(strcat(strcat(strcat("poolSize : ", str(poolSize)), " largestPoolSize : "), str(largestPoolSize)), " queueSize : "), str(queueSize)));
}
@OnMethod(clazz = "java.util.concurrent.ThreadPoolExecutor", method = "reject", location = @Location(value = Kind.ENTRY)) public static void rejectMonitor(@Self Object self) {
String name = str(self); if (BTraceUtils.startsWith(name, "com.alibaba.pivot.common.asyncload.impl.pool.AsyncLoadThreadPool")) { BTraceUtils.incrementAndGet(rejecctCount); }
}
@OnTimer(1000) public static void rejectPrintln() {
int reject = BTraceUtils.getAndSet(rejecctCount, 0); println(strcat("reject count in 1000 msec: ", str(reject))); AggregationKey key = newAggregationKey("rejectCount"); addToAggregation(histogram, key, reject); addToAggregation(average, key, reject); addToAggregation(max, key, reject); addToAggregation(min, key, reject); addToAggregation(sum, key, reject); addToAggregation(count, key, reject);
}
@OnEvent public static void onEvent() {
BTraceUtils.truncateAggregation(histogram, 10); println("---------------------------------------------"); printAggregation("Count", count); printAggregation("Min", min); printAggregation("Max", max); printAggregation("Average", average); printAggregation("Sum", sum); printAggregation("Histogram", histogram); println("---------------------------------------------");
} }</pre>
運行結果:
poolSize : 1 , largestPoolSize = 10 , queueSize = 10 reject count in 1000 msec: 0
說明:
1. poolSize 代表為當前的線程數
2. largestPoolSize 代表為歷史最大的線程數
3. queueSize 代表blockqueue的當前堆積的size
4. reject count 代表在1000ms內的被reject的數量。
轉自:http://www.iteye.com/topic/1118660