Java使用Fork/Join框架來并行執行任務

jopen 9年前發布 | 29K 次閱讀 Java Java開發

現代的計算機已經向多CPU方向發展,即使是普通的PC,甚至現在的智能手機、多核處理器已被廣泛應用。在未來,處理器的核心數將會發展的越來越多。

雖然硬件上的多核CPU已經十分成熟,但是很多應用程序并未這種多核CPU做好準備,因此并不能很好地利用多核CPU的性能優勢。

為了充分利用多CPU、多核CPU的性能優勢,級軟基軟件系統應該可以充分“挖掘”每個CPU的計算能力,決不能讓某個CPU處于“空閑”狀態。為此,可以考慮把一個任務拆分成多個“小任務”,把多個"小任務"放到多個處理器核心上并行執行。當多個“小任務”執行完成之后,再將這些執行結果合并起來即可。

如下面的示意圖所示:


第一步分割任務。首先我們需要有一個fork類來把大任務分割成子任務,有可能子任務還是很大,所以還需要不停的分割,直到分割出的子任務足夠小。

第二步執行任務并合并結果。分割的子任務分別放在雙端隊列里,然后幾個啟動線程分別從雙端隊列里獲取任務執行。子任務執行完的結果都統一放在一個隊列里,啟動一個線程從隊列里拿數據,然后合并這些數據。


Java提供了ForkJoinPool來支持將一個任務拆分成多個“小任務”并行計算,再把多個“小任務”的結果合成總的計算結果。

ForkJoinPool是ExecutorService的實現類,因此是一種特殊的線程池。ForkJoinPool提供了如下兩個常用的構造器。

  •  public ForkJoinPool(int parallelism):創建一個包含parallelism個并行線程的ForkJoinPool
  •  public ForkJoinPool() :以Runtime.getRuntime().availableProcessors()的返回值作為parallelism來創建ForkJoinPool

創建ForkJoinPool實例后,可以釣魚ForkJoinPool的submit(ForkJoinTask<T> task)或者invoke(ForkJoinTask<T> task)來執行指定任務。其中ForkJoinTask代表一個可以并行、合并的任務。ForkJoinTask是一個抽象類,它有兩個抽象子類:RecursiveAction和RecursiveTask。

  • RecursiveTask代表有返回值的任務
  • RecursiveAction代表沒有返回值的任務。


一、RecursiveAction

下面以一個沒有返回值的大任務為例,介紹一下RecursiveAction的用法。

大任務是:打印0-200的數值。

小任務是:每次只能打印50個數值。

    import java.util.concurrent.ForkJoinPool;  
    import java.util.concurrent.RecursiveAction;  
    import java.util.concurrent.TimeUnit;  

    //RecursiveAction為ForkJoinTask的抽象子類,沒有返回值的任務  
    class PrintTask extends RecursiveAction {  
        // 每個"小任務"最多只打印50個數  
        private static final int MAX = 50;  

        private int start;  
        private int end;  

        PrintTask(int start, int end) {  
            this.start = start;  
            this.end = end;  
        }  

        @Override  
        protected void compute() {  
            // 當end-start的值小于MAX時候,開始打印  
            if ((end - start) < MAX) {  
                for (int i = start; i < end; i++) {  
                    System.out.println(Thread.currentThread().getName() + "的i值:"  
                            + i);  
                }  
            } else {  
                // 將大任務分解成兩個小任務  
                int middle = (start + end) / 2;  
                PrintTask left = new PrintTask(start, middle);  
                PrintTask right = new PrintTask(middle, end);  
                // 并行執行兩個小任務  
                left.fork();  
                right.fork();  
            }  
        }  
    }  

    public class ForkJoinPoolTest {  
        /** 
         * @param args 
         * @throws Exception 
         */  
        public static void main(String[] args) throws Exception {  
            // 創建包含Runtime.getRuntime().availableProcessors()返回值作為個數的并行線程的ForkJoinPool  
            ForkJoinPool forkJoinPool = new ForkJoinPool();  
            // 提交可分解的PrintTask任務  
            forkJoinPool.submit(new PrintTask(0, 200));  
            forkJoinPool.awaitTermination(2, TimeUnit.SECONDS);//阻塞當前線程直到 ForkJoinPool 中所有的任務都執行結束  
            // 關閉線程池  
            forkJoinPool.shutdown();  
        }  

    }  

運行結果如下:
    ForkJoinPool-1-worker-2的i值:75  
    ForkJoinPool-1-worker-2的i值:76  
    ForkJoinPool-1-worker-2的i值:77  
    ForkJoinPool-1-worker-2的i值:78  
    ForkJoinPool-1-worker-2的i值:79  
    ForkJoinPool-1-worker-2的i值:80  
    ForkJoinPool-1-worker-2的i值:81  
    ForkJoinPool-1-worker-2的i值:82  
    ForkJoinPool-1-worker-2的i值:83  
    ForkJoinPool-1-worker-2的i值:84  
    ForkJoinPool-1-worker-2的i值:85  
    ForkJoinPool-1-worker-2的i值:86  
    ForkJoinPool-1-worker-2的i值:87  
    ForkJoinPool-1-worker-2的i值:88  
    ForkJoinPool-1-worker-2的i值:89  
    ForkJoinPool-1-worker-2的i值:90  
    ForkJoinPool-1-worker-2的i值:91  
    ForkJoinPool-1-worker-2的i值:92  
    ForkJoinPool-1-worker-2的i值:93  
    ForkJoinPool-1-worker-2的i值:94  
    ForkJoinPool-1-worker-2的i值:95  
    ForkJoinPool-1-worker-2的i值:96  
    ForkJoinPool-1-worker-2的i值:97  
    ForkJoinPool-1-worker-2的i值:98  
    ForkJoinPool-1-worker-2的i值:99  
    ForkJoinPool-1-worker-2的i值:50  
    ForkJoinPool-1-worker-2的i值:51  
    ForkJoinPool-1-worker-2的i值:52  
    ForkJoinPool-1-worker-2的i值:53  
    ForkJoinPool-1-worker-2的i值:54  
    ForkJoinPool-1-worker-2的i值:55  
    ForkJoinPool-1-worker-2的i值:56  
    ForkJoinPool-1-worker-2的i值:57  
    ForkJoinPool-1-worker-2的i值:58  
    ForkJoinPool-1-worker-2的i值:59  
    ForkJoinPool-1-worker-2的i值:60  
    ForkJoinPool-1-worker-2的i值:61  
    ForkJoinPool-1-worker-2的i值:62  
    ForkJoinPool-1-worker-2的i值:63  
    ForkJoinPool-1-worker-2的i值:64  
    ForkJoinPool-1-worker-2的i值:65  
    ForkJoinPool-1-worker-2的i值:66  
    ForkJoinPool-1-worker-2的i值:67  
    ForkJoinPool-1-worker-2的i值:68  
    ForkJoinPool-1-worker-2的i值:69  
    ForkJoinPool-1-worker-1的i值:175  
    ForkJoinPool-1-worker-1的i值:176  
    ForkJoinPool-1-worker-1的i值:177  
    ForkJoinPool-1-worker-1的i值:178  
    ForkJoinPool-1-worker-1的i值:179  
    ForkJoinPool-1-worker-1的i值:180  
    ForkJoinPool-1-worker-1的i值:181  
    ForkJoinPool-1-worker-1的i值:182  
    ForkJoinPool-1-worker-1的i值:183  
    ForkJoinPool-1-worker-1的i值:184  
    ForkJoinPool-1-worker-1的i值:185  
    ForkJoinPool-1-worker-1的i值:186  
    ForkJoinPool-1-worker-1的i值:187  
    ForkJoinPool-1-worker-1的i值:188  
    ForkJoinPool-1-worker-1的i值:189  
    ForkJoinPool-1-worker-1的i值:190  
    ForkJoinPool-1-worker-1的i值:191  
    ForkJoinPool-1-worker-1的i值:192  
    ForkJoinPool-1-worker-1的i值:193  
    ForkJoinPool-1-worker-1的i值:194  
    ForkJoinPool-1-worker-1的i值:195  
    ForkJoinPool-1-worker-1的i值:196  
    ForkJoinPool-1-worker-1的i值:197  
    ForkJoinPool-1-worker-1的i值:198  
    ForkJoinPool-1-worker-1的i值:199  
    ForkJoinPool-1-worker-1的i值:150  
    ForkJoinPool-1-worker-1的i值:151  
    ForkJoinPool-1-worker-1的i值:152  
    ForkJoinPool-1-worker-1的i值:153  
    ForkJoinPool-1-worker-1的i值:154  
    ForkJoinPool-1-worker-1的i值:155  
    ForkJoinPool-1-worker-1的i值:156  
    ForkJoinPool-1-worker-1的i值:157  
    ForkJoinPool-1-worker-1的i值:158  
    ForkJoinPool-1-worker-1的i值:159  
    ForkJoinPool-1-worker-1的i值:160  
    ForkJoinPool-1-worker-1的i值:161  
    ForkJoinPool-1-worker-1的i值:162  
    ForkJoinPool-1-worker-1的i值:163  
    ForkJoinPool-1-worker-1的i值:164  
    ForkJoinPool-1-worker-1的i值:165  
    ForkJoinPool-1-worker-1的i值:166  
    ForkJoinPool-1-worker-1的i值:167  
    ForkJoinPool-1-worker-1的i值:168  
    ForkJoinPool-1-worker-1的i值:169  
    ForkJoinPool-1-worker-1的i值:170  
    ForkJoinPool-1-worker-1的i值:171  
    ForkJoinPool-1-worker-1的i值:172  
    ForkJoinPool-1-worker-1的i值:173  
    ForkJoinPool-1-worker-1的i值:174  
    ForkJoinPool-1-worker-1的i值:125  
    ForkJoinPool-1-worker-1的i值:126  
    ForkJoinPool-1-worker-1的i值:127  
    ForkJoinPool-1-worker-1的i值:128  
    ForkJoinPool-1-worker-1的i值:129  
    ForkJoinPool-1-worker-1的i值:130  
    ForkJoinPool-1-worker-1的i值:131  
    ForkJoinPool-1-worker-1的i值:132  
    ForkJoinPool-1-worker-1的i值:133  
    ForkJoinPool-1-worker-1的i值:134  
    ForkJoinPool-1-worker-1的i值:135  
    ForkJoinPool-1-worker-1的i值:136  
    ForkJoinPool-1-worker-1的i值:137  
    ForkJoinPool-1-worker-1的i值:138  
    ForkJoinPool-1-worker-1的i值:139  
    ForkJoinPool-1-worker-1的i值:140  
    ForkJoinPool-1-worker-1的i值:141  
    ForkJoinPool-1-worker-1的i值:142  
    ForkJoinPool-1-worker-1的i值:143  
    ForkJoinPool-1-worker-1的i值:144  
    ForkJoinPool-1-worker-1的i值:145  
    ForkJoinPool-1-worker-1的i值:146  
    ForkJoinPool-1-worker-1的i值:147  
    ForkJoinPool-1-worker-1的i值:148  
    ForkJoinPool-1-worker-1的i值:149  
    ForkJoinPool-1-worker-1的i值:100  
    ForkJoinPool-1-worker-1的i值:101  
    ForkJoinPool-1-worker-1的i值:102  
    ForkJoinPool-1-worker-1的i值:103  
    ForkJoinPool-1-worker-1的i值:104  
    ForkJoinPool-1-worker-1的i值:105  
    ForkJoinPool-1-worker-1的i值:106  
    ForkJoinPool-1-worker-1的i值:107  
    ForkJoinPool-1-worker-1的i值:108  
    ForkJoinPool-1-worker-1的i值:109  
    ForkJoinPool-1-worker-1的i值:110  
    ForkJoinPool-1-worker-1的i值:111  
    ForkJoinPool-1-worker-1的i值:112  
    ForkJoinPool-1-worker-1的i值:113  
    ForkJoinPool-1-worker-1的i值:114  
    ForkJoinPool-1-worker-1的i值:115  
    ForkJoinPool-1-worker-1的i值:116  
    ForkJoinPool-1-worker-1的i值:117  
    ForkJoinPool-1-worker-1的i值:118  
    ForkJoinPool-1-worker-1的i值:119  
    ForkJoinPool-1-worker-1的i值:120  
    ForkJoinPool-1-worker-1的i值:121  
    ForkJoinPool-1-worker-1的i值:122  
    ForkJoinPool-1-worker-1的i值:123  
    ForkJoinPool-1-worker-1的i值:124  
    ForkJoinPool-1-worker-1的i值:25  
    ForkJoinPool-1-worker-1的i值:26  
    ForkJoinPool-1-worker-1的i值:27  
    ForkJoinPool-1-worker-1的i值:28  
    ForkJoinPool-1-worker-1的i值:29  
    ForkJoinPool-1-worker-1的i值:30  
    ForkJoinPool-1-worker-1的i值:31  
    ForkJoinPool-1-worker-1的i值:32  
    ForkJoinPool-1-worker-1的i值:33  
    ForkJoinPool-1-worker-1的i值:34  
    ForkJoinPool-1-worker-1的i值:35  
    ForkJoinPool-1-worker-1的i值:36  
    ForkJoinPool-1-worker-1的i值:37  
    ForkJoinPool-1-worker-1的i值:38  
    ForkJoinPool-1-worker-1的i值:39  
    ForkJoinPool-1-worker-1的i值:40  
    ForkJoinPool-1-worker-1的i值:41  
    ForkJoinPool-1-worker-1的i值:42  
    ForkJoinPool-1-worker-1的i值:43  
    ForkJoinPool-1-worker-1的i值:44  
    ForkJoinPool-1-worker-1的i值:45  
    ForkJoinPool-1-worker-1的i值:46  
    ForkJoinPool-1-worker-1的i值:47  
    ForkJoinPool-1-worker-1的i值:48  
    ForkJoinPool-1-worker-1的i值:49  
    ForkJoinPool-1-worker-1的i值:0  
    ForkJoinPool-1-worker-1的i值:1  
    ForkJoinPool-1-worker-1的i值:2  
    ForkJoinPool-1-worker-1的i值:3  
    ForkJoinPool-1-worker-1的i值:4  
    ForkJoinPool-1-worker-1的i值:5  
    ForkJoinPool-1-worker-1的i值:6  
    ForkJoinPool-1-worker-1的i值:7  
    ForkJoinPool-1-worker-1的i值:8  
    ForkJoinPool-1-worker-1的i值:9  
    ForkJoinPool-1-worker-1的i值:10  
    ForkJoinPool-1-worker-1的i值:11  
    ForkJoinPool-1-worker-1的i值:12  
    ForkJoinPool-1-worker-1的i值:13  
    ForkJoinPool-1-worker-1的i值:14  
    ForkJoinPool-1-worker-1的i值:15  
    ForkJoinPool-1-worker-1的i值:16  
    ForkJoinPool-1-worker-1的i值:17  
    ForkJoinPool-1-worker-1的i值:18  
    ForkJoinPool-1-worker-1的i值:19  
    ForkJoinPool-1-worker-1的i值:20  
    ForkJoinPool-1-worker-1的i值:21  
    ForkJoinPool-1-worker-1的i值:22  
    ForkJoinPool-1-worker-1的i值:23  
    ForkJoinPool-1-worker-1的i值:24  
    ForkJoinPool-1-worker-2的i值:70  
    ForkJoinPool-1-worker-2的i值:71  
    ForkJoinPool-1-worker-2的i值:72  
    ForkJoinPool-1-worker-2的i值:73  
    ForkJoinPool-1-worker-2的i值:74  

從上面結果來看,ForkJoinPool啟動了兩個線程來執行這個打印任務,這是因為筆者的計算機的CPU是雙核的。不僅如此,讀者可以看到程序雖然打印了0-199這兩百個數字,但是并不是連續打印的,這是因為程序將這個打印任務進行了分解,分解后的任務會并行執行,所以不會按順序從0打印 到199。

二、RecursiveTask


下面以一個有返回值的大任務為例,介紹一下RecursiveTask的用法。

大任務是:計算隨機的100個數字的和。

小任務是:每次只能20個數值的和。

    import java.util.Random;  
    import java.util.concurrent.ForkJoinPool;  
    import java.util.concurrent.Future;  
    import java.util.concurrent.RecursiveTask;  

    //RecursiveTask為ForkJoinTask的抽象子類,有返回值的任務  
    class SumTask extends RecursiveTask<Integer> {  
        // 每個"小任務"最多只打印50個數  
        private static final int MAX = 20;  
        private int arr[];  
        private int start;  
        private int end;  

        SumTask(int arr[], int start, int end) {  
            this.arr = arr;  
            this.start = start;  
            this.end = end;  
        }  

        @Override  
        protected Integer compute() {  
            int sum = 0;  
            // 當end-start的值小于MAX時候,開始打印  
            if ((end - start) < MAX) {  
                for (int i = start; i < end; i++) {  
                    sum += arr[i];  
                }  
                return sum;  
            } else {  
                System.err.println("=====任務分解======");  
                // 將大任務分解成兩個小任務  
                int middle = (start + end) / 2;  
                SumTask left = new SumTask(arr, start, middle);  
                SumTask right = new SumTask(arr, middle, end);  
                // 并行執行兩個小任務  
                left.fork();  
                right.fork();  
                // 把兩個小任務累加的結果合并起來  
                return left.join() + right.join();  
            }  
        }  

    }  

    public class ForkJoinPoolTest2 {  
        /** 
         * @param args 
         * @throws Exception 
         */  
        public static void main(String[] args) throws Exception {  
            int arr[] = new int[100];  
            Random random = new Random();  
            int total = 0;  
            // 初始化100個數字元素  
            for (int i = 0; i < arr.length; i++) {  
                int temp = random.nextInt(100);  
                // 對數組元素賦值,并將數組元素的值添加到total總和中  
                total += (arr[i] = temp);  
            }  
            System.out.println("初始化時的總和=" + total);  
            // 創建包含Runtime.getRuntime().availableProcessors()返回值作為個數的并行線程的ForkJoinPool  
            ForkJoinPool forkJoinPool = new ForkJoinPool();  
            // 提交可分解的PrintTask任務  
            Future<Integer> future = forkJoinPool.submit(new SumTask(arr, 0,  
                    arr.length));  
            System.out.println("計算出來的總和=" + future.get());  
            // 關閉線程池  
            forkJoinPool.shutdown();  
        }  

    }  

計算結果如下:
    初始化時的總和=4283  
    =====任務分解======  
    =====任務分解======  
    =====任務分解======  
    =====任務分解======  
    =====任務分解======  
    =====任務分解======  
    =====任務分解======  
    計算出來的總和=4283  

從上面結果來看,ForkJoinPool將任務分解了7次,程序通過SumTask計算出來的結果,和初始化數組時統計出來的總和是相等的,這表明計算結果一切正常。



讀者還參考以下文章加深對ForkJoinPool的理解:

http://www.infoq.com/cn/articles/fork-join-introduction/

http://www.ibm.com/developerworks/cn/java/j-lo-forkjoin/



==================================================================================================

  作者:歐陽鵬  歡迎轉載,與人分享是進步的源泉!

  轉載請保留原文地址:http://blog.csdn.net/ouyang_peng

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!