Java 8并行流:必備技巧
Java 8 并行流(parallel stream)采用共享線程池,對性能造成了嚴重影響。可以包裝流來調用自己的線程池解決性能問題。
問題
Java 8 的并行流可以讓我們相對輕松地執行并行任務。
myList.parallelStream.map(obj -> longRunningOperation())
但是這樣存在一個嚴重的問題:在 JVM 的后臺,使用通用的 fork/join 池來完成上述功能,該池是所有并行流共享的。默認情況,fork/join 池會為每個處理器分配一個線程。假設你有一臺16核的機器,這樣你就只能創建16個線程。對 CPU 密集型的任務來說,這樣是有意義的,因為你的機器確實只能執行16個線程。但是真實情況下,不是所有的任務都是 CPU 密集型的。例如:
myList.parallelStream
.map(this::retrieveFromA) .map(this::processUsingB) .forEach(this::saveToC)myList.parallelStream
.map(this::retrieveFromD) .map(this::processUsingE) .forEach(this::saveToD)</pre>這兩個流很大程度上是受限于IO操作,所以會等待其他系統。但這兩個流使用相同的(小)線程池,因此會相互等待而被阻塞。這個非常不好,可以改進。我們以一個流為例:
final List<Integer> firstRange = buildIntRange(); firstRange.parallelStream().forEach((number) -> { try { // do something slow Thread.sleep(5); } catch (InterruptedException e) { } });完整的代碼可以在gist上查看。
在執行期間,我獲取了一份線程dump的文件。這是相關的線程(在我的Macbook上):
ForkJoinPool.commonPool-worker-1 ForkJoinPool.commonPool-worker-2 ForkJoinPool.commonPool-worker-3 ForkJoinPool.commonPool-worker-4現在,我要并行的執行這兩個并行流(對于那些不是以英語為母語的人士,我感到非常抱歉!)
Runnable firstTask = () -> {
firstRange.parallelStream().forEach((number) -> { try { // do something slow Thread.sleep(5); } catch (InterruptedException e) { } }); };Runnable secondTask = () -> {
secondRange.parallelStream().forEach((number) -> { try { // do something slow Thread.sleep(5); } catch (InterruptedException e) { } }); }; // run threads</pre>完整的代碼可以在gist上查看。
這次我們再看一下線程dump文件:
ForkJoinPool.commonPool-worker-1 ForkJoinPool.commonPool-worker-2 ForkJoinPool.commonPool-worker-3 ForkJoinPool.commonPool-worker-4正如你所見,結果是一樣的。我們只使用了4個線程。
一種變通方案
正如我所提到的,JVM 后臺使用 fork/join 池,在 ForkJoinTask 的文檔中,我們可以看到:
如果合適,安排一個異步執行的任務到當前正在運行的池中。如果任務不在inForkJoinPool()中,也可以調用ForkJoinPool.commonPool()獲取新的池來執行。
讓我試一試……
ForkJoinPool forkJoinPool = new ForkJoinPool(3);
forkJoinPool.submit(() -> {
firstRange.parallelStream().forEach((number) -> { try { Thread.sleep(5); } catch (InterruptedException e) { } }); });ForkJoinPool forkJoinPool2 = new ForkJoinPool(3);
forkJoinPool2.submit(() -> {
secondRange.parallelStream().forEach((number) -> { try { Thread.sleep(5); } catch (InterruptedException e) { } }); });</pre>完整的代碼可以在gist上查看。
現在,我們再次查看線程池:
ForkJoinPool-1-worker-1 ForkJoinPool-1-worker-2 ForkJoinPool-1-worker-3 ForkJoinPool-1-worker-4 ForkJoinPool-2-worker-1 ForkJoinPool-2-worker-2 ForkJoinPool-2-worker-3 ForkJoinPool-1-worker-4因為我們創建自己的線程池,所以可以避免共享線程池,如果有需要,甚至可以分配比處理機數量更多的線程。
ForkJoinPool forkJoinPool = new ForkJoinPool(<numThreads>);原文鏈接: tobyhobson 翻譯: ImportNew.com - paddx
譯文鏈接: http://www.importnew.com/16801.html