Java Streams,第 3 部分: Streams 的幕后原理
本系列 的前兩篇(使用流執行聚合 java.util.stream 庫簡介)文章探討了如何使用 Java SE 8 中添加的 java.util.stream 庫,該庫使得聲明性地表達數據集上的查詢變得很容易。在許多情況下,該庫會確定如何高效地執行查詢,而不需要用戶協助。但在性能至關重要時,了解該庫的內部工作原理很有價值,這樣您就能夠消除低效性的可能來源。第三期文章將探索 Streams 實現的工作原理,解釋一些可通過聲明性方法實現的優化。
流管道
一個 流管道 包含一個 流來源 、0 或多個 中間操作 ,以及一個 終止操作 。流來源可以是集合、數組、生成器函數或其他任何適當地提供了其元素的訪問權的數據源。中間操作將流轉換為其他流 — 通過過濾元素 ( filter() ),轉換元素 ( map() ),排序元素 ( sorted() ),將流截斷為一定大小 ( limit() ),等等。終止操作包括聚合( reduce() 、 collect() ),搜索 ( findFirst() ) 和迭代 ( forEach() )。
關于本系列
借助 java.util.stream 包,您可以簡明地、聲明性地表達集合、數組和其他數據源上可能的并行批量操作。在 Java 語言架構師 Brian Goetz 編寫的這個系列 中,全面了解 Streams 庫并學習如何最充分地使用它。
流管道是惰性構造的。構造流來源不會計算流的元素,而是會確定在必要時如何找到元素。類似地,調用中間操作不會在元素上執行任何計算;只會將另一個操作添加到流描述的末尾。僅在調用終止操作時,管道才會實際執行相應的工作:計算元素,應用中間操作,以及應用終止操作。這種執行方法使得執行多項有趣的優化成為可能。
流來源
學習更多知識。開發更多項目。聯系更多同行。
全新的 developerWorks Premium 訂閱計劃提供了強大的開發工具和資源,包括 500 篇通過 Safari Books Online 提供的頂級技術文章(包含作者的 Java 并發性實戰 )、最重要開發人員活動的大幅折扣、最新的 O'Reilly 大會的視頻錄像,等等。立即注冊。
流來源有一種稱為 Spliterator 的抽象來描述。顧名思義, Spliterator 組合了兩種行為:訪問來源的元素(迭代),可能分解輸入來源來實現并行執行(拆分)。
盡管 Spliterator 包含與 Iterator 相同的基本行為,但它沒有擴展 Iterator ,而采用了不同的元素訪問方法。 Iterator 有兩個方法: hasNext() 和 next() ;訪問下一個元素可能涉及到(但不需要)調用這兩個方法。因此,正確編寫 Iterator 需要一定量的防御性和重復性編碼。(如果客戶端沒有在調用 next() 之前調用 hasNext() 會怎么樣?如果它調用 hasNext() 兩次會怎么樣?)此外,這種兩方法協議通常需要一定水平的有狀態性,比如前窺 (peek ahead ) 一個元素(并跟蹤您是否已前窺)。這些要求累積形成了大量的每元素訪問開銷。
語言中擁有拉姆達表達式使 Spliterator 能夠采取一種通常更加高效的元素訪問方法 — 而且更容易正確地編碼。 Spliterator 有兩個訪問元素的方法:
boolean tryAdvance(Consumer<? super T> action);
void forEachRemaining(Consumer<? super T> action);
tryAdvance() 方法嘗試處理單個元素。如果沒有元素, tryAdvance() 只會返回 false ;否則,它會前移游標,將當前元素傳遞給所提供的處理函數并返回 true 。 forEachRemaining() 方法處理所有剩余的元素,將它們一次一個地傳遞給所提供的處理函數。
即使忽略了并行分解的可能性, Spliterator 抽象也是一個 “更好的迭代器” — 更容易編寫,更容易使用,而且通常具有更低的每元素訪問開銷。但 Spliterator 抽象還擴展到了并行分解領域。一個 spliterator 描述剩余元素的序列,調用 tryAdvance() 或 forEachRemaining() 元素訪問方法來在該序列中前進。為了拆分來源,以便兩個線程可分別處理輸入的不同部分, Spliterator 提供了一個 trySplit() 方法:
Spliterator<T> trySplit();
trySplit() 的行為是嘗試將剩余元素拆分為兩個部分,這兩部分最好具有類似的大小。如果 Spliterator 可以拆分, trySplit() 會將所描述元素的初始部分拆分為一個新 Spliterator ,將其返回,并調整其狀態,以便描述拆分后的部分后面的元素。如果來源無法拆分, trySplit() 將會返回 null ,表明無法拆分且調用方應按順序繼續處理。對于很重要的來源(例如數組、 List 或 SortedSet ), trySplit() 必須保留此順序;它必須將剩余元素的初始部分拆分到一個新的 Spliterator 中,而且當前 spliterator 必須按照與原始順序相同的順序描述剩余元素。
JDK 中的 Collection 實現都已配備了高質量的 Spliterator 實現。允許一些來源獲得比其他來源更好的實現:包含多個元素的 ArrayList 始終可以干凈且均勻地進行拆分; LinkedList 的拆分效率一直很差;而且基于哈希值和基于樹的數據集通常能夠進行比較不錯的拆分。
構建流管道
流管道是通過構造流來源及其中間操作的鏈接列表表示來構建的。在內部表示中,管道的每個階段都通過一個 流標志 位圖來描述,該位圖描述了在流管道的這一階段已知的元素信息。流使用這些標志優化流的構造和執行。表 1 展示了流標志和它們的解釋。
表 1. 流標志
| 流標志 | 解釋 |
|---|---|
| SIZED | 流的大小已知。 |
| DISTINCT | 依據用于對象流的 Object.equals() 或用于原語流的 == ,流的元素將有所不同。 |
| SORTED | 流的元素按自然順序排序。 |
| ORDERED | 流有一個有意義的遇到順序(請參閱 “” 部分)。 |
來源階段的流標志來自 spliterator 的 characteristics 位圖(spliterator 支持比流更大的標志集)。高質量的 spliterator 實現不僅提供了高效的元素訪問和拆分,還會描述元素的特征。(例如,一個 HashSet 的 spliterator 報告 DISTINCT 特征,因為已知一個 Set 的元素是不同的。)
“ 在某些情況下,Streams 可以使用來源和之前的操作的知識來完全省略某個操作。 ”
每個中間操作都對流標志具有已知的影響;一個操作可設置、清除或保留每個標志的設置。例如, filter() 操作保留 SORTED 和 DISTINCT 標志,但清除 SIZED 標志; map() 操作清除 SORTED 和 DISTINCT 標志,但保留 SIZED 標志; sorted() 操作保留 SIZED 和 DISTINCT 標志,但注入 SORTED 標志。構造階段的鏈接列表表示時,會將前一個階段的標志與當前階段的行為相組合,以獲得當前階段的一組新標志。
在某些情況下,標志使完全省略一個操作成為可能,就像清單 1 中的流管道一樣。
清單 1. 可自動省略操作的流管道
TreeSet<String> ts = ...
String[] sortedAWords = ts.stream()
.filter(s -> s.startsWith("a"))
.sorted()
.toArray();
來源階段的流標志包含 SORTED ,因為來源是一個 TreeSet 。 filter() 方法保留了 SORTED 標志,所以過濾階段的流標志也包含 SORTED 標志。通常, sorted() 方法的結果是構造一個新的管道階段,將它添加到管道末尾,然后返回新階段。但是,因為已知元素是按自然順序排序的,所以 sorted() 方法是一個空操作 — 它僅返回前一個階段(過濾階段),因為排序是多余的。(類似地,如果元素已知是 DISTINCT ,那么可以完全消除 distinct() 操作。)
執行流管道
發起終止操作時,流實現會挑選一個執行計劃。中間操作可劃分為 無狀態 ( filter() 、 map() 、 flatMap() )和 有狀態 ( sorted() 、 limit() 、 distinct() )操作。無狀態操作是可在元素上執行而無需知道其他任何元素的操作。例如,過濾操作只需檢查當前元素來確定是包含還是消除它,但排序操作必須查看所有元素之后才知道首先發出哪個元素。
如果管道按順序執行,或者并行執行,但包含所有無狀態操作,那么它可以在一輪中計算。否則,管道會劃分為多個部分(在有狀態操作邊界上劃分)并分多輪計算。
終止操作是 短路 ( allMatch() 、 findFirst() )或 非短路 ( reduce() 、 collect() 、 forEach() )操作。如果終止操作是非短路操作,那么可以批量處理數據(使用來源 spliterator 的 forEachRemaining() 方法,進一步減少訪問每個元素的開銷);如果它是短路操作,則必須一個元素處理一次(使用 tryAdvance() )。
對于順序執行,Streams 構造了一個 “機器” — 一個 Consumer 對象鏈,其結構與管道結構相符。其中每個 Consumer 對象知道下一個階段;當它收到一個元素(或被告知沒有更多元素)時,它會將 0 或多個元素發送到鏈中的下一個階段。例如,與 filter() 階段有關聯的 Consumer 將過濾器謂詞應用于輸入元素,并將它發送或不發送到下一個階段;與 map() 階段有關聯的 Consumer 將映射函數應用于輸入元素,并將結果發送到下一個階段。與有狀態操作(比如 sorted() )有關聯的 Consumer 會緩沖元素,直到它看到輸入的末尾,然后將排序的數據發送到下一個階段。機器中的最后一個階段將實現終止操作。如果此操作生成了結果,比如 reduce() 或 toArray() ,該階段可充當此結果的累加器。
圖 1 顯示了以下流管道的 “流機器” 的動畫(或者在某些瀏覽器中顯示為快照)。(在圖 1 中,黃色、綠色和藍色塊按順序進入機器頂部的第一個階段。在第一個階段,每個塊壓縮為更小的塊,然后進入第二個階段。在這里,一個類似吃豆人的游戲人物吃掉每個黃色塊,僅讓綠色和藍色塊落入第三個階段。壓縮的藍色和綠色塊交替顯示在計算機屏幕上。)
blocks.stream()
.map(block -> block.squash())
.filter(block -> block.getColor() != YELLOW)
.forEach(block -> block.display());
圖 1. 流機器(動畫來自 Tagir Valeev)

并行執行將會執行類似的操作,但不會創建單個機器,每個工作線程將會獲取自己的機器副本并將其數據節提供給它,然后將每個線程機器的結果與其他機器的結果合并,生成最終結果。
流管道的執行也可以使用流標志來優化。例如, SIZED 標志指示最終結果的大小是已知的。 toArray() 終止操作可使用此標志預先分配正確大小的數組;如果沒有 SIZED 標志,則需要猜測數組大小,并在猜測錯誤時復制數據。
“ 當性能至關重要時,了解庫的內部工作原理非常重要。 ”
預先設置大小的優化在并行流執行中更有效。除了 SIZED 標志之外,另一個 spliterator 特征 SUBSIZED 表示不僅大小已知,而且如果 spliterator 已拆分,則拆分大小也是已知的。(數組和 ArrayList 就屬于這種情況,但其他可拆分來源,比如樹,不一定屬于這種情況。)如果有 SUBSIZED 特征,在并行執行中, toArray() 操作可為整個結果分配一個正確大小的數組,各個線程(分別處理輸入的不同部分)可將它們的結果直接寫入數組的正確部分 — 無需同步或復制。(缺少 SUBSIZED 標志時,會將每一部分收集到一個中間數組中,然后復制到最終位置。)
遇到順序
另一個影響庫的優化能力的微妙的考慮事項是 遇到順序 。遇到順序指的是來源分發元素的順序是否對計算至關重要。一些來源(比如基于哈希的集合和映射)沒有有意義的遇到順序。流標志 ORDERED 描述了流是否有有意義的遇到順序。JDK 集合的 spliterator 會根據集合的規范來設置此標志;一些中間操作可能注入 ORDERED ( sorted() ) 或清除它 ( unordered() )。
如果流沒有遇到順序,大部分流操作都必須遵守該順序。對于順序執行,會自動保留遇到順序,因為元素會按遇到它們的順序自然地處理。甚至在并行執行中,許多操作(無狀態中間操作和一些終止操作(比如 reduce() )),遵守遇到順序不會產生任何實際成本。但對于其他操作(有狀態中間操作,其語義與遇到順序關聯的終止操作,比如 findFirst() 或 forEachOrdered() ),在并行執行中遵守遇到順序的責任可能很重大。如果流有一個已定義的遇到順序,但該順序對結果沒有意義,那么可以通過使用 unordered() 操作刪除 ORDERED 標志,加速包含順序敏感型操作的管道的順序執行。
作為對遇到順序敏感的操作的示例,可以考慮 limit() ,它會在指定大小處截斷一個流。在順序執行中實現 limit() 很簡單:保留一個已看到多少元素的計數器,在這之后丟棄任何元素。但是在并行執行中,實現 limit() 要復雜得多;您需要保留 前 N 個元素。此要求大大限制了利用并行性的能力;如果輸入劃分為多個部分,您只有在某個部分之前的所有部分都已完成后,才知道該部分的結果是否將包含在最終結果中。因此,該實現一般會錯誤地選擇不使用所有可用的核心,或者緩存整個試驗性結果,直到您達到目標長度。
如果流沒有遇到順序, limit() 操作可以自由選擇 任何 N 個元素,這讓執行效率變得高得多。知道元素后可立即將其發往下游,無需任何緩存,而且線程之間唯一需要執行的協調是發送一個信號來確保未超出目標流長度。
遇到順序成本的另一個不太常見的示例是排序。如果遇到順序有意義,那么 sorted() 操作會實現一種 穩定 排序(相同的元素按照它們進入輸入時的相同順序出現在輸出中),而對于無序的流,穩定性(具有成本)不是必需的。 distinct() 具有類似的情況:如果流有一個遇到順序,那么對于多個相同的輸入元素, distinct() 必須發出其中的 第一個 ,而對于無序的流,它可以發出任何元素 — 同樣可以獲得高效得多的并行實現。
在您使用 collect() 聚合時會遇到類似的情形。如果在無序流上執行 collect(groupingBy()) 操作,與任何鍵對應的元素都必須按它們在輸入中出現的順序提供給下游收集器。此順序對應用程序通常沒有什么意義,而且任何順序都沒有意義。在這些情況下,可能最好選擇一個 并發 收集器(比如 groupingByConcurrent() ),它可以忽略遇到順序,并讓所有線程直接收集到一個共享的并發數據結構中(比如 ConcurrentHashMap ),而不是讓每個線程收集到它自己的中間映射中,然后再合并中間映射(這可能產生很高的成本)。
創建流
“ 可以輕松地調整現有數據結構來分發流。 ”
盡管 JDK 中的許多類已被改進來用作流來源,但同樣可以輕松地調整現有數據結構來分發流。要從任意數據源創建流,需要為該流的元素創建一個 Spliterator ,并將該 spliterator 連同一個 boolean 標志傳遞給 StreamSupport.stream() ,該標志表明結果流應是順序的還是并行的。
Spliterator 實現的質量可能存在巨大差別,以平衡使用 spliterator 作為來源的流管道的實現工作與性能。 Spliterator 接口有多種可選的方法,比如 trySplit() 。如果您不想實現拆分,可以從 trySplit() 返回 null ,但這意味著使用這個 Spliterator 作為來源的流將無法利用并行性來加速計算。
影響 spliterator 質量的考慮因素包括:
- spliterator 是否報告了準確的大小?
- spliterator 能否拆分輸入?
- 它能否將輸入拆分為幾乎相等的部分?
- 所拆分部分的大小是否可預測(通過 SUBSIZED 特征反映)?
- spliterator 是否報告了所有相關特征?
創建 spliterator 的最簡單方法(但會導致最差的結果質量)是將 Iterator 傳遞給 Spliterators.spliteratorUnknownSize() 。您可以通過將 Iterator 和一個大小傳遞給 Spliterators.spliterator 來獲得稍微好點的 spliterator。但是如果流性能很重要(尤其是并行性能),可以實現完整的 Spliterator 接口(包括所有適用的特征)。集合類(比如 ArrayList 、 TreeSet 和 HashMap )的 JDK 來源提供了一些高質量的 spliterator 示例,您可針對您自己的數據結構來模仿它們。
第 3 部分的小結
盡管開箱即用的 Streams 的性能通常很好(有時比相應的命令式代碼更好),但牢固掌握 Streams 的幕后工作原理使您能夠最高效地使用這個庫,并創建自定義適配器來從任何數據源中獲取流。 Java Streams 系列接下來的兩期將深入探討并行性。
來自: http://www.ibm.com/developerworks/cn/java/j-java-streams-3-brian-goetz/index.html?ca=drs-