Java使用Fork/Join框架來并行執行任務
現代的計算機已經向多CPU方向發展,即使是普通的PC,甚至現在的智能手機、多核處理器已被廣泛應用。在未來,處理器的核心數將會發展的越來越多。
雖然硬件上的多核CPU已經十分成熟,但是很多應用程序并未這種多核CPU做好準備,因此并不能很好地利用多核CPU的性能優勢。
為了充分利用多CPU、多核CPU的性能優勢,級軟基軟件系統應該可以充分“挖掘”每個CPU的計算能力,決不能讓某個CPU處于“空閑”狀態。為此,可以考慮把一個任務拆分成多個“小任務”,把多個"小任務"放到多個處理器核心上并行執行。當多個“小任務”執行完成之后,再將這些執行結果合并起來即可。
如下面的示意圖所示:
第一步分割任務。首先我們需要有一個fork類來把大任務分割成子任務,有可能子任務還是很大,所以還需要不停的分割,直到分割出的子任務足夠小。
第二步執行任務并合并結果。分割的子任務分別放在雙端隊列里,然后幾個啟動線程分別從雙端隊列里獲取任務執行。子任務執行完的結果都統一放在一個隊列里,啟動一個線程從隊列里拿數據,然后合并這些數據。
Java提供了ForkJoinPool來支持將一個任務拆分成多個“小任務”并行計算,再把多個“小任務”的結果合成總的計算結果。
ForkJoinPool是ExecutorService的實現類,因此是一種特殊的線程池。ForkJoinPool提供了如下兩個常用的構造器。
- public ForkJoinPool(int parallelism):創建一個包含parallelism個并行線程的ForkJoinPool
- public ForkJoinPool() :以Runtime.getRuntime().availableProcessors()的返回值作為parallelism來創建ForkJoinPool
創建ForkJoinPool實例后,可以釣魚ForkJoinPool的submit(ForkJoinTask<T> task)或者invoke(ForkJoinTask<T> task)來執行指定任務。其中ForkJoinTask代表一個可以并行、合并的任務。ForkJoinTask是一個抽象類,它有兩個抽象子類:RecursiveAction和RecursiveTask。
- RecursiveTask代表有返回值的任務
- RecursiveAction代表沒有返回值的任務。
一、RecursiveAction
下面以一個沒有返回值的大任務為例,介紹一下RecursiveAction的用法。
大任務是:打印0-200的數值。
小任務是:每次只能打印50個數值。
import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveAction; import java.util.concurrent.TimeUnit; //RecursiveAction為ForkJoinTask的抽象子類,沒有返回值的任務 class PrintTask extends RecursiveAction { // 每個"小任務"最多只打印50個數 private static final int MAX = 50; private int start; private int end; PrintTask(int start, int end) { this.start = start; this.end = end; } @Override protected void compute() { // 當end-start的值小于MAX時候,開始打印 if ((end - start) < MAX) { for (int i = start; i < end; i++) { System.out.println(Thread.currentThread().getName() + "的i值:" + i); } } else { // 將大任務分解成兩個小任務 int middle = (start + end) / 2; PrintTask left = new PrintTask(start, middle); PrintTask right = new PrintTask(middle, end); // 并行執行兩個小任務 left.fork(); right.fork(); } } } public class ForkJoinPoolTest { /** * @param args * @throws Exception */ public static void main(String[] args) throws Exception { // 創建包含Runtime.getRuntime().availableProcessors()返回值作為個數的并行線程的ForkJoinPool ForkJoinPool forkJoinPool = new ForkJoinPool(); // 提交可分解的PrintTask任務 forkJoinPool.submit(new PrintTask(0, 200)); forkJoinPool.awaitTermination(2, TimeUnit.SECONDS);//阻塞當前線程直到 ForkJoinPool 中所有的任務都執行結束 // 關閉線程池 forkJoinPool.shutdown(); } }
運行結果如下:
ForkJoinPool-1-worker-2的i值:75 ForkJoinPool-1-worker-2的i值:76 ForkJoinPool-1-worker-2的i值:77 ForkJoinPool-1-worker-2的i值:78 ForkJoinPool-1-worker-2的i值:79 ForkJoinPool-1-worker-2的i值:80 ForkJoinPool-1-worker-2的i值:81 ForkJoinPool-1-worker-2的i值:82 ForkJoinPool-1-worker-2的i值:83 ForkJoinPool-1-worker-2的i值:84 ForkJoinPool-1-worker-2的i值:85 ForkJoinPool-1-worker-2的i值:86 ForkJoinPool-1-worker-2的i值:87 ForkJoinPool-1-worker-2的i值:88 ForkJoinPool-1-worker-2的i值:89 ForkJoinPool-1-worker-2的i值:90 ForkJoinPool-1-worker-2的i值:91 ForkJoinPool-1-worker-2的i值:92 ForkJoinPool-1-worker-2的i值:93 ForkJoinPool-1-worker-2的i值:94 ForkJoinPool-1-worker-2的i值:95 ForkJoinPool-1-worker-2的i值:96 ForkJoinPool-1-worker-2的i值:97 ForkJoinPool-1-worker-2的i值:98 ForkJoinPool-1-worker-2的i值:99 ForkJoinPool-1-worker-2的i值:50 ForkJoinPool-1-worker-2的i值:51 ForkJoinPool-1-worker-2的i值:52 ForkJoinPool-1-worker-2的i值:53 ForkJoinPool-1-worker-2的i值:54 ForkJoinPool-1-worker-2的i值:55 ForkJoinPool-1-worker-2的i值:56 ForkJoinPool-1-worker-2的i值:57 ForkJoinPool-1-worker-2的i值:58 ForkJoinPool-1-worker-2的i值:59 ForkJoinPool-1-worker-2的i值:60 ForkJoinPool-1-worker-2的i值:61 ForkJoinPool-1-worker-2的i值:62 ForkJoinPool-1-worker-2的i值:63 ForkJoinPool-1-worker-2的i值:64 ForkJoinPool-1-worker-2的i值:65 ForkJoinPool-1-worker-2的i值:66 ForkJoinPool-1-worker-2的i值:67 ForkJoinPool-1-worker-2的i值:68 ForkJoinPool-1-worker-2的i值:69 ForkJoinPool-1-worker-1的i值:175 ForkJoinPool-1-worker-1的i值:176 ForkJoinPool-1-worker-1的i值:177 ForkJoinPool-1-worker-1的i值:178 ForkJoinPool-1-worker-1的i值:179 ForkJoinPool-1-worker-1的i值:180 ForkJoinPool-1-worker-1的i值:181 ForkJoinPool-1-worker-1的i值:182 ForkJoinPool-1-worker-1的i值:183 ForkJoinPool-1-worker-1的i值:184 ForkJoinPool-1-worker-1的i值:185 ForkJoinPool-1-worker-1的i值:186 ForkJoinPool-1-worker-1的i值:187 ForkJoinPool-1-worker-1的i值:188 ForkJoinPool-1-worker-1的i值:189 ForkJoinPool-1-worker-1的i值:190 ForkJoinPool-1-worker-1的i值:191 ForkJoinPool-1-worker-1的i值:192 ForkJoinPool-1-worker-1的i值:193 ForkJoinPool-1-worker-1的i值:194 ForkJoinPool-1-worker-1的i值:195 ForkJoinPool-1-worker-1的i值:196 ForkJoinPool-1-worker-1的i值:197 ForkJoinPool-1-worker-1的i值:198 ForkJoinPool-1-worker-1的i值:199 ForkJoinPool-1-worker-1的i值:150 ForkJoinPool-1-worker-1的i值:151 ForkJoinPool-1-worker-1的i值:152 ForkJoinPool-1-worker-1的i值:153 ForkJoinPool-1-worker-1的i值:154 ForkJoinPool-1-worker-1的i值:155 ForkJoinPool-1-worker-1的i值:156 ForkJoinPool-1-worker-1的i值:157 ForkJoinPool-1-worker-1的i值:158 ForkJoinPool-1-worker-1的i值:159 ForkJoinPool-1-worker-1的i值:160 ForkJoinPool-1-worker-1的i值:161 ForkJoinPool-1-worker-1的i值:162 ForkJoinPool-1-worker-1的i值:163 ForkJoinPool-1-worker-1的i值:164 ForkJoinPool-1-worker-1的i值:165 ForkJoinPool-1-worker-1的i值:166 ForkJoinPool-1-worker-1的i值:167 ForkJoinPool-1-worker-1的i值:168 ForkJoinPool-1-worker-1的i值:169 ForkJoinPool-1-worker-1的i值:170 ForkJoinPool-1-worker-1的i值:171 ForkJoinPool-1-worker-1的i值:172 ForkJoinPool-1-worker-1的i值:173 ForkJoinPool-1-worker-1的i值:174 ForkJoinPool-1-worker-1的i值:125 ForkJoinPool-1-worker-1的i值:126 ForkJoinPool-1-worker-1的i值:127 ForkJoinPool-1-worker-1的i值:128 ForkJoinPool-1-worker-1的i值:129 ForkJoinPool-1-worker-1的i值:130 ForkJoinPool-1-worker-1的i值:131 ForkJoinPool-1-worker-1的i值:132 ForkJoinPool-1-worker-1的i值:133 ForkJoinPool-1-worker-1的i值:134 ForkJoinPool-1-worker-1的i值:135 ForkJoinPool-1-worker-1的i值:136 ForkJoinPool-1-worker-1的i值:137 ForkJoinPool-1-worker-1的i值:138 ForkJoinPool-1-worker-1的i值:139 ForkJoinPool-1-worker-1的i值:140 ForkJoinPool-1-worker-1的i值:141 ForkJoinPool-1-worker-1的i值:142 ForkJoinPool-1-worker-1的i值:143 ForkJoinPool-1-worker-1的i值:144 ForkJoinPool-1-worker-1的i值:145 ForkJoinPool-1-worker-1的i值:146 ForkJoinPool-1-worker-1的i值:147 ForkJoinPool-1-worker-1的i值:148 ForkJoinPool-1-worker-1的i值:149 ForkJoinPool-1-worker-1的i值:100 ForkJoinPool-1-worker-1的i值:101 ForkJoinPool-1-worker-1的i值:102 ForkJoinPool-1-worker-1的i值:103 ForkJoinPool-1-worker-1的i值:104 ForkJoinPool-1-worker-1的i值:105 ForkJoinPool-1-worker-1的i值:106 ForkJoinPool-1-worker-1的i值:107 ForkJoinPool-1-worker-1的i值:108 ForkJoinPool-1-worker-1的i值:109 ForkJoinPool-1-worker-1的i值:110 ForkJoinPool-1-worker-1的i值:111 ForkJoinPool-1-worker-1的i值:112 ForkJoinPool-1-worker-1的i值:113 ForkJoinPool-1-worker-1的i值:114 ForkJoinPool-1-worker-1的i值:115 ForkJoinPool-1-worker-1的i值:116 ForkJoinPool-1-worker-1的i值:117 ForkJoinPool-1-worker-1的i值:118 ForkJoinPool-1-worker-1的i值:119 ForkJoinPool-1-worker-1的i值:120 ForkJoinPool-1-worker-1的i值:121 ForkJoinPool-1-worker-1的i值:122 ForkJoinPool-1-worker-1的i值:123 ForkJoinPool-1-worker-1的i值:124 ForkJoinPool-1-worker-1的i值:25 ForkJoinPool-1-worker-1的i值:26 ForkJoinPool-1-worker-1的i值:27 ForkJoinPool-1-worker-1的i值:28 ForkJoinPool-1-worker-1的i值:29 ForkJoinPool-1-worker-1的i值:30 ForkJoinPool-1-worker-1的i值:31 ForkJoinPool-1-worker-1的i值:32 ForkJoinPool-1-worker-1的i值:33 ForkJoinPool-1-worker-1的i值:34 ForkJoinPool-1-worker-1的i值:35 ForkJoinPool-1-worker-1的i值:36 ForkJoinPool-1-worker-1的i值:37 ForkJoinPool-1-worker-1的i值:38 ForkJoinPool-1-worker-1的i值:39 ForkJoinPool-1-worker-1的i值:40 ForkJoinPool-1-worker-1的i值:41 ForkJoinPool-1-worker-1的i值:42 ForkJoinPool-1-worker-1的i值:43 ForkJoinPool-1-worker-1的i值:44 ForkJoinPool-1-worker-1的i值:45 ForkJoinPool-1-worker-1的i值:46 ForkJoinPool-1-worker-1的i值:47 ForkJoinPool-1-worker-1的i值:48 ForkJoinPool-1-worker-1的i值:49 ForkJoinPool-1-worker-1的i值:0 ForkJoinPool-1-worker-1的i值:1 ForkJoinPool-1-worker-1的i值:2 ForkJoinPool-1-worker-1的i值:3 ForkJoinPool-1-worker-1的i值:4 ForkJoinPool-1-worker-1的i值:5 ForkJoinPool-1-worker-1的i值:6 ForkJoinPool-1-worker-1的i值:7 ForkJoinPool-1-worker-1的i值:8 ForkJoinPool-1-worker-1的i值:9 ForkJoinPool-1-worker-1的i值:10 ForkJoinPool-1-worker-1的i值:11 ForkJoinPool-1-worker-1的i值:12 ForkJoinPool-1-worker-1的i值:13 ForkJoinPool-1-worker-1的i值:14 ForkJoinPool-1-worker-1的i值:15 ForkJoinPool-1-worker-1的i值:16 ForkJoinPool-1-worker-1的i值:17 ForkJoinPool-1-worker-1的i值:18 ForkJoinPool-1-worker-1的i值:19 ForkJoinPool-1-worker-1的i值:20 ForkJoinPool-1-worker-1的i值:21 ForkJoinPool-1-worker-1的i值:22 ForkJoinPool-1-worker-1的i值:23 ForkJoinPool-1-worker-1的i值:24 ForkJoinPool-1-worker-2的i值:70 ForkJoinPool-1-worker-2的i值:71 ForkJoinPool-1-worker-2的i值:72 ForkJoinPool-1-worker-2的i值:73 ForkJoinPool-1-worker-2的i值:74
從上面結果來看,ForkJoinPool啟動了兩個線程來執行這個打印任務,這是因為筆者的計算機的CPU是雙核的。不僅如此,讀者可以看到程序雖然打印了0-199這兩百個數字,但是并不是連續打印的,這是因為程序將這個打印任務進行了分解,分解后的任務會并行執行,所以不會按順序從0打印 到199。
二、RecursiveTask
下面以一個有返回值的大任務為例,介紹一下RecursiveTask的用法。
大任務是:計算隨機的100個數字的和。
小任務是:每次只能20個數值的和。
import java.util.Random; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Future; import java.util.concurrent.RecursiveTask; //RecursiveTask為ForkJoinTask的抽象子類,有返回值的任務 class SumTask extends RecursiveTask<Integer> { // 每個"小任務"最多只打印50個數 private static final int MAX = 20; private int arr[]; private int start; private int end; SumTask(int arr[], int start, int end) { this.arr = arr; this.start = start; this.end = end; } @Override protected Integer compute() { int sum = 0; // 當end-start的值小于MAX時候,開始打印 if ((end - start) < MAX) { for (int i = start; i < end; i++) { sum += arr[i]; } return sum; } else { System.err.println("=====任務分解======"); // 將大任務分解成兩個小任務 int middle = (start + end) / 2; SumTask left = new SumTask(arr, start, middle); SumTask right = new SumTask(arr, middle, end); // 并行執行兩個小任務 left.fork(); right.fork(); // 把兩個小任務累加的結果合并起來 return left.join() + right.join(); } } } public class ForkJoinPoolTest2 { /** * @param args * @throws Exception */ public static void main(String[] args) throws Exception { int arr[] = new int[100]; Random random = new Random(); int total = 0; // 初始化100個數字元素 for (int i = 0; i < arr.length; i++) { int temp = random.nextInt(100); // 對數組元素賦值,并將數組元素的值添加到total總和中 total += (arr[i] = temp); } System.out.println("初始化時的總和=" + total); // 創建包含Runtime.getRuntime().availableProcessors()返回值作為個數的并行線程的ForkJoinPool ForkJoinPool forkJoinPool = new ForkJoinPool(); // 提交可分解的PrintTask任務 Future<Integer> future = forkJoinPool.submit(new SumTask(arr, 0, arr.length)); System.out.println("計算出來的總和=" + future.get()); // 關閉線程池 forkJoinPool.shutdown(); } }
計算結果如下:
初始化時的總和=4283 =====任務分解====== =====任務分解====== =====任務分解====== =====任務分解====== =====任務分解====== =====任務分解====== =====任務分解====== 計算出來的總和=4283
從上面結果來看,ForkJoinPool將任務分解了7次,程序通過SumTask計算出來的結果,和初始化數組時統計出來的總和是相等的,這表明計算結果一切正常。
讀者還參考以下文章加深對ForkJoinPool的理解:
http://www.infoq.com/cn/articles/fork-join-introduction/
http://www.ibm.com/developerworks/cn/java/j-lo-forkjoin/
==================================================================================================
作者:歐陽鵬 歡迎轉載,與人分享是進步的源泉!
轉載請保留原文地址:http://blog.csdn.net/ouyang_peng