Java使用Fork/Join框架來并行執行任務
現代的計算機已經向多CPU方向發展,即使是普通的PC,甚至現在的智能手機、多核處理器已被廣泛應用。在未來,處理器的核心數將會發展的越來越多。
雖然硬件上的多核CPU已經十分成熟,但是很多應用程序并未這種多核CPU做好準備,因此并不能很好地利用多核CPU的性能優勢。
為了充分利用多CPU、多核CPU的性能優勢,級軟基軟件系統應該可以充分“挖掘”每個CPU的計算能力,決不能讓某個CPU處于“空閑”狀態。為此,可以考慮把一個任務拆分成多個“小任務”,把多個"小任務"放到多個處理器核心上并行執行。當多個“小任務”執行完成之后,再將這些執行結果合并起來即可。
如下面的示意圖所示:
第一步分割任務。首先我們需要有一個fork類來把大任務分割成子任務,有可能子任務還是很大,所以還需要不停的分割,直到分割出的子任務足夠小。
第二步執行任務并合并結果。分割的子任務分別放在雙端隊列里,然后幾個啟動線程分別從雙端隊列里獲取任務執行。子任務執行完的結果都統一放在一個隊列里,啟動一個線程從隊列里拿數據,然后合并這些數據。
Java提供了ForkJoinPool來支持將一個任務拆分成多個“小任務”并行計算,再把多個“小任務”的結果合成總的計算結果。
ForkJoinPool是ExecutorService的實現類,因此是一種特殊的線程池。ForkJoinPool提供了如下兩個常用的構造器。
- public ForkJoinPool(int parallelism):創建一個包含parallelism個并行線程的ForkJoinPool
- public ForkJoinPool() :以Runtime.getRuntime().availableProcessors()的返回值作為parallelism來創建ForkJoinPool
創建ForkJoinPool實例后,可以釣魚ForkJoinPool的submit(ForkJoinTask<T> task)或者invoke(ForkJoinTask<T> task)來執行指定任務。其中ForkJoinTask代表一個可以并行、合并的任務。ForkJoinTask是一個抽象類,它有兩個抽象子類:RecursiveAction和RecursiveTask。
- RecursiveTask代表有返回值的任務
- RecursiveAction代表沒有返回值的任務。
一、RecursiveAction
下面以一個沒有返回值的大任務為例,介紹一下RecursiveAction的用法。
大任務是:打印0-200的數值。
小任務是:每次只能打印50個數值。
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.TimeUnit;
//RecursiveAction為ForkJoinTask的抽象子類,沒有返回值的任務
class PrintTask extends RecursiveAction {
// 每個"小任務"最多只打印50個數
private static final int MAX = 50;
private int start;
private int end;
PrintTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected void compute() {
// 當end-start的值小于MAX時候,開始打印
if ((end - start) < MAX) {
for (int i = start; i < end; i++) {
System.out.println(Thread.currentThread().getName() + "的i值:"
+ i);
}
} else {
// 將大任務分解成兩個小任務
int middle = (start + end) / 2;
PrintTask left = new PrintTask(start, middle);
PrintTask right = new PrintTask(middle, end);
// 并行執行兩個小任務
left.fork();
right.fork();
}
}
}
public class ForkJoinPoolTest {
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
// 創建包含Runtime.getRuntime().availableProcessors()返回值作為個數的并行線程的ForkJoinPool
ForkJoinPool forkJoinPool = new ForkJoinPool();
// 提交可分解的PrintTask任務
forkJoinPool.submit(new PrintTask(0, 200));
forkJoinPool.awaitTermination(2, TimeUnit.SECONDS);//阻塞當前線程直到 ForkJoinPool 中所有的任務都執行結束
// 關閉線程池
forkJoinPool.shutdown();
}
} 運行結果如下:
ForkJoinPool-1-worker-2的i值:75
ForkJoinPool-1-worker-2的i值:76
ForkJoinPool-1-worker-2的i值:77
ForkJoinPool-1-worker-2的i值:78
ForkJoinPool-1-worker-2的i值:79
ForkJoinPool-1-worker-2的i值:80
ForkJoinPool-1-worker-2的i值:81
ForkJoinPool-1-worker-2的i值:82
ForkJoinPool-1-worker-2的i值:83
ForkJoinPool-1-worker-2的i值:84
ForkJoinPool-1-worker-2的i值:85
ForkJoinPool-1-worker-2的i值:86
ForkJoinPool-1-worker-2的i值:87
ForkJoinPool-1-worker-2的i值:88
ForkJoinPool-1-worker-2的i值:89
ForkJoinPool-1-worker-2的i值:90
ForkJoinPool-1-worker-2的i值:91
ForkJoinPool-1-worker-2的i值:92
ForkJoinPool-1-worker-2的i值:93
ForkJoinPool-1-worker-2的i值:94
ForkJoinPool-1-worker-2的i值:95
ForkJoinPool-1-worker-2的i值:96
ForkJoinPool-1-worker-2的i值:97
ForkJoinPool-1-worker-2的i值:98
ForkJoinPool-1-worker-2的i值:99
ForkJoinPool-1-worker-2的i值:50
ForkJoinPool-1-worker-2的i值:51
ForkJoinPool-1-worker-2的i值:52
ForkJoinPool-1-worker-2的i值:53
ForkJoinPool-1-worker-2的i值:54
ForkJoinPool-1-worker-2的i值:55
ForkJoinPool-1-worker-2的i值:56
ForkJoinPool-1-worker-2的i值:57
ForkJoinPool-1-worker-2的i值:58
ForkJoinPool-1-worker-2的i值:59
ForkJoinPool-1-worker-2的i值:60
ForkJoinPool-1-worker-2的i值:61
ForkJoinPool-1-worker-2的i值:62
ForkJoinPool-1-worker-2的i值:63
ForkJoinPool-1-worker-2的i值:64
ForkJoinPool-1-worker-2的i值:65
ForkJoinPool-1-worker-2的i值:66
ForkJoinPool-1-worker-2的i值:67
ForkJoinPool-1-worker-2的i值:68
ForkJoinPool-1-worker-2的i值:69
ForkJoinPool-1-worker-1的i值:175
ForkJoinPool-1-worker-1的i值:176
ForkJoinPool-1-worker-1的i值:177
ForkJoinPool-1-worker-1的i值:178
ForkJoinPool-1-worker-1的i值:179
ForkJoinPool-1-worker-1的i值:180
ForkJoinPool-1-worker-1的i值:181
ForkJoinPool-1-worker-1的i值:182
ForkJoinPool-1-worker-1的i值:183
ForkJoinPool-1-worker-1的i值:184
ForkJoinPool-1-worker-1的i值:185
ForkJoinPool-1-worker-1的i值:186
ForkJoinPool-1-worker-1的i值:187
ForkJoinPool-1-worker-1的i值:188
ForkJoinPool-1-worker-1的i值:189
ForkJoinPool-1-worker-1的i值:190
ForkJoinPool-1-worker-1的i值:191
ForkJoinPool-1-worker-1的i值:192
ForkJoinPool-1-worker-1的i值:193
ForkJoinPool-1-worker-1的i值:194
ForkJoinPool-1-worker-1的i值:195
ForkJoinPool-1-worker-1的i值:196
ForkJoinPool-1-worker-1的i值:197
ForkJoinPool-1-worker-1的i值:198
ForkJoinPool-1-worker-1的i值:199
ForkJoinPool-1-worker-1的i值:150
ForkJoinPool-1-worker-1的i值:151
ForkJoinPool-1-worker-1的i值:152
ForkJoinPool-1-worker-1的i值:153
ForkJoinPool-1-worker-1的i值:154
ForkJoinPool-1-worker-1的i值:155
ForkJoinPool-1-worker-1的i值:156
ForkJoinPool-1-worker-1的i值:157
ForkJoinPool-1-worker-1的i值:158
ForkJoinPool-1-worker-1的i值:159
ForkJoinPool-1-worker-1的i值:160
ForkJoinPool-1-worker-1的i值:161
ForkJoinPool-1-worker-1的i值:162
ForkJoinPool-1-worker-1的i值:163
ForkJoinPool-1-worker-1的i值:164
ForkJoinPool-1-worker-1的i值:165
ForkJoinPool-1-worker-1的i值:166
ForkJoinPool-1-worker-1的i值:167
ForkJoinPool-1-worker-1的i值:168
ForkJoinPool-1-worker-1的i值:169
ForkJoinPool-1-worker-1的i值:170
ForkJoinPool-1-worker-1的i值:171
ForkJoinPool-1-worker-1的i值:172
ForkJoinPool-1-worker-1的i值:173
ForkJoinPool-1-worker-1的i值:174
ForkJoinPool-1-worker-1的i值:125
ForkJoinPool-1-worker-1的i值:126
ForkJoinPool-1-worker-1的i值:127
ForkJoinPool-1-worker-1的i值:128
ForkJoinPool-1-worker-1的i值:129
ForkJoinPool-1-worker-1的i值:130
ForkJoinPool-1-worker-1的i值:131
ForkJoinPool-1-worker-1的i值:132
ForkJoinPool-1-worker-1的i值:133
ForkJoinPool-1-worker-1的i值:134
ForkJoinPool-1-worker-1的i值:135
ForkJoinPool-1-worker-1的i值:136
ForkJoinPool-1-worker-1的i值:137
ForkJoinPool-1-worker-1的i值:138
ForkJoinPool-1-worker-1的i值:139
ForkJoinPool-1-worker-1的i值:140
ForkJoinPool-1-worker-1的i值:141
ForkJoinPool-1-worker-1的i值:142
ForkJoinPool-1-worker-1的i值:143
ForkJoinPool-1-worker-1的i值:144
ForkJoinPool-1-worker-1的i值:145
ForkJoinPool-1-worker-1的i值:146
ForkJoinPool-1-worker-1的i值:147
ForkJoinPool-1-worker-1的i值:148
ForkJoinPool-1-worker-1的i值:149
ForkJoinPool-1-worker-1的i值:100
ForkJoinPool-1-worker-1的i值:101
ForkJoinPool-1-worker-1的i值:102
ForkJoinPool-1-worker-1的i值:103
ForkJoinPool-1-worker-1的i值:104
ForkJoinPool-1-worker-1的i值:105
ForkJoinPool-1-worker-1的i值:106
ForkJoinPool-1-worker-1的i值:107
ForkJoinPool-1-worker-1的i值:108
ForkJoinPool-1-worker-1的i值:109
ForkJoinPool-1-worker-1的i值:110
ForkJoinPool-1-worker-1的i值:111
ForkJoinPool-1-worker-1的i值:112
ForkJoinPool-1-worker-1的i值:113
ForkJoinPool-1-worker-1的i值:114
ForkJoinPool-1-worker-1的i值:115
ForkJoinPool-1-worker-1的i值:116
ForkJoinPool-1-worker-1的i值:117
ForkJoinPool-1-worker-1的i值:118
ForkJoinPool-1-worker-1的i值:119
ForkJoinPool-1-worker-1的i值:120
ForkJoinPool-1-worker-1的i值:121
ForkJoinPool-1-worker-1的i值:122
ForkJoinPool-1-worker-1的i值:123
ForkJoinPool-1-worker-1的i值:124
ForkJoinPool-1-worker-1的i值:25
ForkJoinPool-1-worker-1的i值:26
ForkJoinPool-1-worker-1的i值:27
ForkJoinPool-1-worker-1的i值:28
ForkJoinPool-1-worker-1的i值:29
ForkJoinPool-1-worker-1的i值:30
ForkJoinPool-1-worker-1的i值:31
ForkJoinPool-1-worker-1的i值:32
ForkJoinPool-1-worker-1的i值:33
ForkJoinPool-1-worker-1的i值:34
ForkJoinPool-1-worker-1的i值:35
ForkJoinPool-1-worker-1的i值:36
ForkJoinPool-1-worker-1的i值:37
ForkJoinPool-1-worker-1的i值:38
ForkJoinPool-1-worker-1的i值:39
ForkJoinPool-1-worker-1的i值:40
ForkJoinPool-1-worker-1的i值:41
ForkJoinPool-1-worker-1的i值:42
ForkJoinPool-1-worker-1的i值:43
ForkJoinPool-1-worker-1的i值:44
ForkJoinPool-1-worker-1的i值:45
ForkJoinPool-1-worker-1的i值:46
ForkJoinPool-1-worker-1的i值:47
ForkJoinPool-1-worker-1的i值:48
ForkJoinPool-1-worker-1的i值:49
ForkJoinPool-1-worker-1的i值:0
ForkJoinPool-1-worker-1的i值:1
ForkJoinPool-1-worker-1的i值:2
ForkJoinPool-1-worker-1的i值:3
ForkJoinPool-1-worker-1的i值:4
ForkJoinPool-1-worker-1的i值:5
ForkJoinPool-1-worker-1的i值:6
ForkJoinPool-1-worker-1的i值:7
ForkJoinPool-1-worker-1的i值:8
ForkJoinPool-1-worker-1的i值:9
ForkJoinPool-1-worker-1的i值:10
ForkJoinPool-1-worker-1的i值:11
ForkJoinPool-1-worker-1的i值:12
ForkJoinPool-1-worker-1的i值:13
ForkJoinPool-1-worker-1的i值:14
ForkJoinPool-1-worker-1的i值:15
ForkJoinPool-1-worker-1的i值:16
ForkJoinPool-1-worker-1的i值:17
ForkJoinPool-1-worker-1的i值:18
ForkJoinPool-1-worker-1的i值:19
ForkJoinPool-1-worker-1的i值:20
ForkJoinPool-1-worker-1的i值:21
ForkJoinPool-1-worker-1的i值:22
ForkJoinPool-1-worker-1的i值:23
ForkJoinPool-1-worker-1的i值:24
ForkJoinPool-1-worker-2的i值:70
ForkJoinPool-1-worker-2的i值:71
ForkJoinPool-1-worker-2的i值:72
ForkJoinPool-1-worker-2的i值:73
ForkJoinPool-1-worker-2的i值:74 從上面結果來看,ForkJoinPool啟動了兩個線程來執行這個打印任務,這是因為筆者的計算機的CPU是雙核的。不僅如此,讀者可以看到程序雖然打印了0-199這兩百個數字,但是并不是連續打印的,這是因為程序將這個打印任務進行了分解,分解后的任務會并行執行,所以不會按順序從0打印 到199。
二、RecursiveTask
下面以一個有返回值的大任務為例,介紹一下RecursiveTask的用法。
大任務是:計算隨機的100個數字的和。
小任務是:每次只能20個數值的和。
import java.util.Random;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;
//RecursiveTask為ForkJoinTask的抽象子類,有返回值的任務
class SumTask extends RecursiveTask<Integer> {
// 每個"小任務"最多只打印50個數
private static final int MAX = 20;
private int arr[];
private int start;
private int end;
SumTask(int arr[], int start, int end) {
this.arr = arr;
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
int sum = 0;
// 當end-start的值小于MAX時候,開始打印
if ((end - start) < MAX) {
for (int i = start; i < end; i++) {
sum += arr[i];
}
return sum;
} else {
System.err.println("=====任務分解======");
// 將大任務分解成兩個小任務
int middle = (start + end) / 2;
SumTask left = new SumTask(arr, start, middle);
SumTask right = new SumTask(arr, middle, end);
// 并行執行兩個小任務
left.fork();
right.fork();
// 把兩個小任務累加的結果合并起來
return left.join() + right.join();
}
}
}
public class ForkJoinPoolTest2 {
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
int arr[] = new int[100];
Random random = new Random();
int total = 0;
// 初始化100個數字元素
for (int i = 0; i < arr.length; i++) {
int temp = random.nextInt(100);
// 對數組元素賦值,并將數組元素的值添加到total總和中
total += (arr[i] = temp);
}
System.out.println("初始化時的總和=" + total);
// 創建包含Runtime.getRuntime().availableProcessors()返回值作為個數的并行線程的ForkJoinPool
ForkJoinPool forkJoinPool = new ForkJoinPool();
// 提交可分解的PrintTask任務
Future<Integer> future = forkJoinPool.submit(new SumTask(arr, 0,
arr.length));
System.out.println("計算出來的總和=" + future.get());
// 關閉線程池
forkJoinPool.shutdown();
}
} 計算結果如下:
初始化時的總和=4283
=====任務分解======
=====任務分解======
=====任務分解======
=====任務分解======
=====任務分解======
=====任務分解======
=====任務分解======
計算出來的總和=4283 從上面結果來看,ForkJoinPool將任務分解了7次,程序通過SumTask計算出來的結果,和初始化數組時統計出來的總和是相等的,這表明計算結果一切正常。
讀者還參考以下文章加深對ForkJoinPool的理解:
http://www.infoq.com/cn/articles/fork-join-introduction/
http://www.ibm.com/developerworks/cn/java/j-lo-forkjoin/
==================================================================================================
作者:歐陽鵬 歡迎轉載,與人分享是進步的源泉!
轉載請保留原文地址:http://blog.csdn.net/ouyang_peng