Java 8并行流:必備技巧

jopen 9年前發布 | 15K 次閱讀 Java 8 Java開發

Java 8 并行流(parallel stream)采用共享線程池,對性能造成了嚴重影響。可以包裝流來調用自己的線程池解決性能問題。

問題

Java 8 的并行流可以讓我們相對輕松地執行并行任務。

myList.parallelStream.map(obj -> longRunningOperation())

但是這樣存在一個嚴重的問題:在 JVM 的后臺,使用通用的 fork/join 池來完成上述功能,該池是所有并行流共享的。默認情況,fork/join 池會為每個處理器分配一個線程。假設你有一臺16核的機器,這樣你就只能創建16個線程。對 CPU 密集型的任務來說,這樣是有意義的,因為你的機器確實只能執行16個線程。但是真實情況下,不是所有的任務都是 CPU 密集型的。例如:

myList.parallelStream
.map(this::retrieveFromA) .map(this::processUsingB) .forEach(this::saveToC)

myList.parallelStream
.map(this::retrieveFromD) .map(this::processUsingE) .forEach(this::saveToD)</pre>

這兩個流很大程度上是受限于IO操作,所以會等待其他系統。但這兩個流使用相同的(小)線程池,因此會相互等待而被阻塞。這個非常不好,可以改進。我們以一個流為例:

final List<Integer> firstRange = buildIntRange();  
   firstRange.parallelStream().forEach((number) -> {
      try {
         // do something slow
         Thread.sleep(5);
      } catch (InterruptedException e) { }
});

完整的代碼可以在gist上查看。

在執行期間,我獲取了一份線程dump的文件。這是相關的線程(在我的Macbook上):

ForkJoinPool.commonPool-worker-1  
ForkJoinPool.commonPool-worker-2  
ForkJoinPool.commonPool-worker-3  
ForkJoinPool.commonPool-worker-4

現在,我要并行的執行這兩個并行流(對于那些不是以英語為母語的人士,我感到非常抱歉!)

Runnable firstTask = () -> {
firstRange.parallelStream().forEach((number) -> { try { // do something slow Thread.sleep(5); } catch (InterruptedException e) { } }); };

Runnable secondTask = () -> {
secondRange.parallelStream().forEach((number) -> { try { // do something slow Thread.sleep(5); } catch (InterruptedException e) { } }); }; // run threads</pre>

完整的代碼可以在gist上查看。

這次我們再看一下線程dump文件:

ForkJoinPool.commonPool-worker-1  
ForkJoinPool.commonPool-worker-2  
ForkJoinPool.commonPool-worker-3  
ForkJoinPool.commonPool-worker-4

正如你所見,結果是一樣的。我們只使用了4個線程。

一種變通方案

正如我所提到的,JVM 后臺使用 fork/join 池,在 ForkJoinTask 的文檔中,我們可以看到:

如果合適,安排一個異步執行的任務到當前正在運行的池中。如果任務不在inForkJoinPool()中,也可以調用ForkJoinPool.commonPool()獲取新的池來執行。

讓我試一試……

ForkJoinPool forkJoinPool = new ForkJoinPool(3);
forkJoinPool.submit(() -> {
firstRange.parallelStream().forEach((number) -> { try { Thread.sleep(5); } catch (InterruptedException e) { } }); });

ForkJoinPool forkJoinPool2 = new ForkJoinPool(3);
forkJoinPool2.submit(() -> {
secondRange.parallelStream().forEach((number) -> { try { Thread.sleep(5); } catch (InterruptedException e) { } }); });</pre>

完整的代碼可以在gist上查看。

現在,我們再次查看線程池:

ForkJoinPool-1-worker-1  
ForkJoinPool-1-worker-2  
ForkJoinPool-1-worker-3  
ForkJoinPool-1-worker-4  
ForkJoinPool-2-worker-1  
ForkJoinPool-2-worker-2  
ForkJoinPool-2-worker-3  
ForkJoinPool-1-worker-4

因為我們創建自己的線程池,所以可以避免共享線程池,如果有需要,甚至可以分配比處理機數量更多的線程。

ForkJoinPool forkJoinPool = new ForkJoinPool(<numThreads>);
原文鏈接: tobyhobson 翻譯: ImportNew.com - paddx
譯文鏈接: http://www.importnew.com/16801.html

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!