Java Streams,第 2 部分: 使用流執行聚合

ReginaLangs 8年前發布 | 11K 次閱讀 Java Java開發

本系列的第 1 部分 介紹了 Java SE 8 中添加的 java.util.stream 庫。第二期文章將重點介紹 Streams 庫的一個最重要的、靈活的方面 — 聚合和匯總數據的能力。

“累加器反模式”

第 1 部分中的第 1 個例子使用 Streams 執行了一次簡單的匯總,如清單 1 所示。

清單 1. 使用 Streams 聲明性地計算聚合值

int totalSalesFromNY
    = txns.stream()
          .filter(t -> t.getSeller().getAddr().getState().equals("NY"))
          .mapToInt(t -> t.getAmount())
          .sum();

清單 2 展示了如何采用 “老方法” 編寫這個示例。

清單 2. 通過命令計算同一個聚合值

int sum = 0;
for (Txn t : txns) {
    if (t.getSeller().getAddr().getState().equals("NY"))
        sum += t.getAmount();
}

第 1 部分介紹了盡管新方法比老方法更長,但新方法更可取的一些原因:

關于本系列

借助 java.util.stream 包,您可以簡明地、聲明性地表達集合、數組和其他數據源上可能的并行批量操作。在 Java 語言架構師 Brian Goetz 編寫的這個系列 中,全面了解 Streams 庫,并學習如何最充分地使用它。

  • 代碼更加清晰,因為它被簡單地構造為一些簡單操作的組合。
  • 該代碼是通過聲明進行表達的(描述想要的結果),而不是通過命令進行表達的(一個計算結果的循序漸進的過程)。
  • 隨著表達的查詢變得更加復雜,此方法可以更干凈地擴展。

應用這個特殊的聚合是有一些額外原因的。演示了 累加器反模式 ,其中代碼首先聲明并初始化一個可變累加器變量 ( sum ),然后繼續在循環中更新累加器。為什么這樣做是不正確的?首先,此代碼樣式難以并行化。沒有協調(比如同步),對累加器的每次訪問都導致一次數據爭用(而借助協調,協調導致的爭用會破壞并行性所帶來的效率收益)。

學習更多知識。開發更多項目。聯系更多同行。

全新的 developerWorks Premium 訂閱計劃提供了強大的開發工具和資源,包括 500 篇通過 Safari Books Online 提供的頂級技術文章(包含作者的 Java 并發性實戰 )、最重要開發人員活動的大幅折扣、最新的 O'Reilly 大會的視頻錄像,等等。立即注冊。

累加器方法更不可取的另一個原因是,它在太低的級別上建模計算 — 在各個元素的級別上,而不是在整個數據集的級別上。與 “逐個依次迭代交易金額,將每筆金額添加到一個已初始化為 0 的累加器” 相比,“所有交易金額的總和” 是目標的更抽象、更直接的陳述。

所以,如果命令式累加是錯誤的工具,那什么才是正確的工具?在這個特定的問題中,您已經看到了答案的線索( sum() 方法),但這只是一個強大的、通用的 縮減 技術的一種特殊情況。縮減技術簡單、靈活,而且可并行化,還能在比命令式累加更高的抽象級別上操作。

縮減

“ 縮減技術簡單、靈活,而且可并行化,還能在比命令式累加更高的抽象級別上操作。 ”

縮減(也稱為 折疊 )是一種來自函數編程的技術,它抽象化了許多不同的累加操作。給定一個類型為 T,包含 x 個元素的非空數列 X 1 , x 2 , ..., x n 和 T 上的二元運算符(在這里表示為 *),* 下的 X 的 縮減 被定義為:

   (x 1 * x 2 * ...* x n )

當使用普通的加法作為二元運算符來應用于某個數列時,縮減就是求和。但其他許多操作也可以使用縮減來描述。如果二元運算符是 “獲取兩個元素中較大的一個”(這在 Java 中可以使用拉姆達表達式 (x,y) -> Math.max(x,y) 來表示,或者更簡單地表示為方法引用 Math::max ),則縮減對應于查找最大值。

通過將累加描述為縮減,而不使用累加器反模式,可以采用更 抽象 、更 緊湊 、更 并行化 的方式來描述計算 — 只要您的二元運算符滿足一個簡單條件: 結合性 。回想一下,如果 a、b 和 c 元素滿足以下條件,二元運算符 * 就是 結合性的

   ((a * b) * c) = (a * (b * c))

結合性意味著 分組無關緊要 。如果二元運算符是結合性的,那么可以按照任何順序安全地執行縮減。在順序執行中,執行的自然順序是從左向右;在并行執行中,數據劃分為分段,分別縮減每個分段,然后組合結果。結合性可確保這兩種方法得到相同的答案。如果將結合性的定義擴展到 4 項,可能更容易理解:

   (((a * b) * c) * d) = ((a * b) * (b * d))

左側對應于典型的順序計算;右側對應于表示典型的并行執行的分區執行,其中輸入序列被分解為幾部分,各部分并行縮減,并使用 * 將各部分的結果組合起來。(或許令人驚奇的是,* 不需要是可交換的,但許多運算符通常都可用于縮減,比如相加和求最大值等。具有結合性但沒有可交換性的二元運算符的一個例子是字符串串聯。)

Streams 庫有多種縮減方法,包括:

Optional<T> reduce(BinaryOperator<T> op)
T reduce(T identity, BinaryOperator<T> op)

在這些方法中,最簡單的方法僅獲得一個結合性二元運算符,在該運算符下計算流元素的縮減結果。結果被描述為 Optional ;如果輸入流是空的,則縮減結果也是空的。(如果輸入只有一個元素,那么縮減結果就是該元素。)如果您有一個字符串集合,您可以將這些元素的串聯計算為:

String concatenated = strings.stream().reduce("", String::concat);

對于這兩種方法中的第二種方法,您需要提供一個身份值,在字符串為空時,還可以將該值用作結果。身份值必須滿足所有 x 的限制:

 

身份 * x = x

x * 身份 = x

 

不是所有二元運算符都有身份值,但當它們擁有身份值時,它們可能不會得到您想要的結果。例如,計算最大值時,您可能傾向于使用值 Integer.MIN_VALUE 作為身份(它確實滿足要求)。但在空流上使用該身份時,結果可能不是您想要的;您無法確定空輸入和僅包含 Integer.MIN_VALUE 的非空輸入之間的區別。(有時這不是問題,但有時會導致問題 — 因此 Streams 庫將留給客戶,由客戶決定是否指定身份。)

對于字符串串聯,身份是空字符串,所以您可以將前面的示例重寫為:

String concatenated = strings.stream().reduce("", String::concat);

類似地,您可以將數組上的整數總和描述為:

int sum = Stream.of(ints).reduce(0, (x,y) -> x+y);

(但實際上,您使用了 IntStream.sum() 便捷方法。)

縮減不需要僅應用于整數和字符串,它可以應用于您想要將元素序列縮減為該類型的單個元素的任何情形。例如,您可以通過縮減來計算最高的人:

Comparator<Person> byHeight = Comparators.comparingInt(Person::getHeight);
BinaryOperator<Person> tallerOf = BinaryOperator.maxBy(byHeight);
Optional<Person> tallest = people.stream().reduce(tallerOf);

如果提供的二元運算符不是結合性的,或者提供的身份值實際上不是該二元運算符的身份,那么在并行執行該操作時,結果可能是錯的,而且同一個數據集上的不同執行過程可能會生成不同的結果。

可變縮減

縮減獲取一個值序列并將它縮減為單個值,比如數列的和或它的最大值。但是有時您不想要單個匯總值;您想將結果組織為類似 List 或 Map 的數據結構,或者將它縮減為多個匯總值。在這種情況下,您應該使用 縮減 的可變類似方法,也稱為 收集 。

考慮將元素累積到一個 List 中的簡單情況。使用累加器反模式,您可以這樣編寫它:

ArrayList<String> list = new ArrayList<>();
for (Person p : people)
    list.add(p.toString());

當累加器變量是一個簡單值時,縮減是累加的更好替代方法,與此類似,在累加器結果是更復雜的數據結構時,也有一種更好的替代方法。縮減的構建塊是一個身份值和一種將兩個值組合成新值的途徑;可變縮減的類似方法包括:

  • 一種生成空結果容器的途徑
  • 一種將新元素合并到結果容器中的途徑
  • 一種合并兩個結果容器的途徑

這些構建塊可以輕松地表達為函數。這些函數中的第 3 個支持并行執行可變縮減:您可以對數據集進行分區,為每一部分生成一個中間累加結果,然后合并中間結果。Streams 庫有一個 collect() 方法,它接受以下 3 個函數:

<R> collect(Supplier<R> resultSupplier,
            BiConsumer<R, T> accumulator, 
            BiConsumer<R, R> combiner)

在前一節中,您看到了一個使用縮減來計算字符串串聯的示例。該代碼會生成正確的結果,但是,因為 Java 中的字符串是不可變的,而且串聯要求復制整個字符串,所以它還有 O(n 2 ) 運行時(一些字符串將復制多次)。您可以通過將結果收集到 StringBuilder 中,更高效地表達字符串串聯:

StringBuilder concat = strings.stream()
                              .collect(() -> new StringBuilder(),
                                       (sb, s) -> sb.append(s),
                                       (sb, sb2) -> sb.append(sb2));

此方法使用 StringBuilder 作為結果容器。傳遞給 collect() 的 3 個函數使用默認構造函數創建了一個空容器, append(String) 方法將一個元素添加到容器中, append(StringBuilder) 方法將一個容器合并到另一個容器中。使用方法引用可能可以比拉姆達表達式更好地表達此代碼:

StringBuilder concat = strings.stream()
                              .collect(StringBuilder::new,
                                       StringBuilder::append,
                                       StringBuilder::append);

類似地,要將一個流收集到一個 HashSet 中,您可以這樣做:

Set<String> uniqueStrings = strings.stream()
                                   .collect(HashSet::new,
                                            HashSet::add,
                                            HashSet::addAll);

在這個版本中,結果容器是一個 HashSet 而不是 StringBuilder ,但方法是一樣的:使用默認構造函數創建一個新的結果容器,使用 add() 方法將一個新元素合并到結果集中,使用 addAll() 方法合并兩個結果集。很容易看到如何將此代碼調整為其他任何類型的集合。

您可能會想,因為使用了可變結果容器( StringBuilder 或 HashSet ),所以這也是累加器反模式的一個例子。但其實不然。累加器反模式在這種情況下采用的類似方法是:

Set<String> set = new HashSet<>();
strings.stream().forEach(s -> set.add(s));

“ 可將收集器組合到一起來形成更復雜的聚合。 ”

就像只要組合函數是結合性的,且沒有相互干擾的副作用,就可以安全地并行化縮減一樣,如果滿足一些簡單的一致性要求(在 collect() 的規范中列出),就可以安全地并行化使用了 Stream.collect() 的可變縮減。關鍵區別在于,對于 forEach() 版本,多個線程會同時嘗試訪問一個結果容器,而對于并行 collect() ,每個線程擁有自己的本地結果容器,會在以后合并其中的結果。

收集器

傳遞給 collect() 的 3 個函數(創建、填充和合并結果容器)之間的關系非常重要,所以有必要提供它自己的抽象 Collector 和 collect() 的相應簡化版本。字符串串聯示例可重寫為:

String concat = strings.stream().collect(Collectors.joining());

收集到結果集的示例可重寫為:

Set<String> uniqueStrings = strings.stream().collect(Collectors.toSet());

Collectors 類包含許多常見聚合操作的因素,比如累加到集合中、字符串串聯、縮減和其他匯總計算,以及創建匯總表(通過 groupingBy() )。表 1 包含部分內置收集器的列表,而且如果它們不夠用,編寫您自己的收集器也很容易(請參閱 “” 部分)。

表 1. 內置收集器

收集器 行為
toList() 將元素收集到一個 List 中。
toSet() 將元素收集到一個 Set 中。
toCollection(Supplier<Collection>) 將元素收集到一個特定類型的 Collection 中。
toMap(Function<T, K>, Function<T, V>) 將元素收集到一個 Map 中,依據提供的映射函數將元素轉換為鍵值。
summingInt(ToIntFunction<T>) 計算將提供的 int 值映射函數應用于每個元素(以及 long 和 double 版本)的結果的總和。
summarizingInt(ToIntFunction<T>) 計算將提供的 int 值映射函數應用于每個元素(以及 long 和 double 版本)的結果的 sum 、 min 、 max 、 count 和 average 。
reducing() 向元素應用縮減(通常用作下游收集器,比如用于 groupingBy )(各種版本)。
partitioningBy(Predicate<T>) 將元素分為兩組:為其保留了提供的預期的組和未保留預期的組。
partitioningBy(Predicate<T>, Collector) 將元素分區,使用指定的下游收集器處理每個分區。
groupingBy(Function<T,U>) 將元素分組到一個 Map 中,其中的鍵是所提供的應用于流元素的函數,值是共享該鍵的元素列表。
groupingBy(Function<T,U>, Collector) 將元素分組,使用指定的下游收集器來處理與每個組有關聯的值。
minBy(BinaryOperator<T>) 計算元素的最小值(與 maxBy() 相同)。
mapping(Function<T,U>, Collector) 將提供的映射函數應用于每個元素,并使用指定的下游收集器(通常用作下游收集器本身,比如用于 groupingBy )進行處理。
joining() 假設元素為 String 類型,將這些元素聯結到一個字符串中(或許使用分隔符、前綴和后綴)。
counting() 計算元素數量。(通常用作下游收集器。)

將收集器函數分組到 Collector 抽象中在語法上更簡單,但實際收益來自您開始將收集器組合在一起時,比如您想要創建復雜的匯總結果(比如 groupingBy() 收集器創建的摘要)的時候,該收集器依據來自元素的一個鍵將元素收集到 Map 中。例如,要創建超過 1000 美元的交易的 Map ,可以使用賣家作為鍵:

Map<Seller, List<Txn>> bigTxnsBySeller =
    txns.stream()
        .filter(t -> t.getAmount() > 1000)
        .collect(groupingBy(Txn::getSeller));

但是,假設您不想要每個賣家的交易 List ,而想要來自每個賣家的最大交易。您仍希望使用賣家作為結果的鍵,但您希望進一步處理與賣家關聯的交易,以便將它縮減為最大的交易。可以使用 groupingBy() 的替代版本,無需將每個鍵的元素收集到列表中,而是將它們提供給另一個收集器( downstream 收集器)。對于下游收集器,您可以選擇 maxBy() 等縮減方法:

Map<Seller, Txn> biggestTxnBySeller =
    txns.stream()
        .collect(groupingBy(Txn::getSeller,
                            maxBy(comparing(Txn::getAmount))));

在這里,您將交易分組到以賣家作為鍵的映射中,但該映射的值是使用 maxBy() 收集器收集該賣家的所有銷售的結果。如果您不想要該賣家的最大交易,而想要總和,可以使用 summingInt() 收集器:

Map<Seller, Integer> salesBySeller =
    txns.stream()
         .collect(groupingBy(Txn::getSeller,
                            summingInt(Txn::getAmount)));

要獲得多級匯總結果,比如每個區域和賣家的銷售,可以使用另一個 groupingBy 收集器作為下游收集器:

Map<Region, Map<Seller, Integer>> salesByRegionAndSeller =
    txns.stream()
        .collect(groupingBy(Txn::getRegion,
                            groupingBy(Txn::getSeller, 
                                       summingInt(Txn::getAmount))));

舉一個不同領域的例子:要計算一個文檔中的詞頻直方圖,可以使用 BufferedReader.lines() 將文檔拆分為行,使用 Pattern.splitAsStream() 將它分解為一個單詞流,然后使用 collect() 和 groupingBy() 創建一個 Map ,后者的鍵是單詞,值是這些單詞的數量,如清單 3 所示。

清單 3. 使用 Streams 計算單詞數量直方圖

Pattern whitespace = Pattern.compile("\\s+");
    Map<String, Integer> wordFrequencies =
        reader.lines()
              .flatMap(s -> whitespace.splitAsStream())
              .collect(groupingBy(String::toLowerCase),
                                  Collectors.counting());

自定義收集器

盡管 JDK 提供的標準的收集器集合非常大,但編寫您自己的收集器非常容易。 Collector 接口(如清單 4 所示)非常簡單。該接口通過 3 種類型來實現參數化:輸入類型 T 、累加器類型 A 和最終的返回類型 R ( A 和 R 通常是相同的),這些方法返回的函數與之前演示的 collect() 3 參數版本所接受的函數類似。

清單 4. Collector 接口

public interface Collector<T, A, R> {
    /** Return a function that creates a new empty result container */
    Supplier<A> supplier();

    /** Return a function that incorporates an element into a container */
    BiConsumer<A, T> accumulator();

    /** Return a function that merges two result containers */
    BinaryOperator<A> combiner();

    /** Return a function that converts the intermediate result container
        into the final representation */
    Function<A, R> finisher();

    /** Special characteristics of this collector */
    Set<Characteristics> characteristics();
}

Collectors 中的大部分收集器工廠的實現都很簡單。例如, toList() 的實現是:

return new CollectorImpl<>(ArrayList::new,
                           List::add,
                           (left, right) -> { left.addAll(right); return left; },
                           CH_ID);

此實現使用 ArrayList 作為結果容器,使用 add() 合并一個元素,并使用 addAll() 將一個列表合并到另一個中,通過這些特征表明它的完成函數是身份函數(這使得流框架可以優化執行)。

與之前看到的一樣,一些一致性要求與縮減中的身份和累加器函數之間的限制類似。這些要求已在 Collector 的規范中列出。

作為一個更復雜的示例,可以考慮在數據集上創建匯總統計數據的問題。很容易使用縮減來計算數字數據集的總和、最小值、最大值或數量(而且您可以使用總和和數量來計算平均值)。在數據上,使用縮減在一輪計算中一次性計算所有這些結果更加困難。但您可以輕松地編寫一個 Collector 來高效地(如果愿意,還可并行地)執行此計算。

Collectors 類包含一個 collectingInt() 工廠方法,該方法返回一個 IntSummaryStatistics ,后者會執行您想要的準確操作,比如在一輪計算中計算 sum 、 min 、 max 、 count 和 average 。 IntSummaryStatistics 的實現很簡單,而且您可以輕松地編寫自己的類似收集器來計算任意數據匯總結果(或擴展此結果)。

清單 5 顯示了 IntSummaryStatistics 類。實際實現包含更多細節(包含用于獲取匯總統計數據的 getter),但它的核心是簡單的 accept() 和 combine() 方法。

清單 5. summarizingInt() 收集器使用的 IntSummaryStatistics 類

public class IntSummaryStatistics implements IntConsumer {
        private long count;
        private long sum;
        private int min = Integer.MAX_VALUE;
        private int max = Integer.MIN_VALUE;

        public void accept(int value) {
            ++count;
            sum += value;
            min = Math.min(min, value);
            max = Math.max(max, value);
        }

        public void combine(IntSummaryStatistics other) {
            count += other.count;
            sum += other.sum;
            min = Math.min(min, other.min);
            max = Math.max(max, other.max);
        }

        // plus getters for count, sum, min, max, and average
    }

如您所見,這是一個非常簡單的類。在觀察每個新數據元素時,會以各種方式更新各種匯總結果,而且會以各種方式組合兩個 IntSummaryStatistics 持有者。 Collectors.summarizingInt() 的實現(如清單 6 所示)同樣很簡單;它創建了一個 Collector ,以便通過應用一個整數值來提取器函數,并將結果傳遞給 IntSummaryStatistics.accept() 來合并一個元素。

清單 6. summarizingInt() 收集器工廠

public static <T>
    Collector<T, ?, IntSummaryStatistics> summarizingInt(ToIntFunction<? super T> mapper) {
        return new CollectorImpl<T, IntSummaryStatistics, IntSummaryStatistics>(
                IntSummaryStatistics::new,
                (r, t) -> r.accept(mapper.applyAsInt(t)),
                (l, r) -> { l.combine(r); return l; },
                CH_ID);
    }

組合收集器的容易性(您在 groupingBy() 示例中已看到)和創建新收集器的容易性相結合,可以創建流數據的幾乎任何匯總結果,同時保持您的代碼緊湊而又清晰。

第 2 部分的小結

聚合工具是 Streams 庫的最有用和靈活的部分之一。可以使用縮減操作來輕松地按順序或并行聚合簡單的值;更復雜的匯總結果可通過 collect() 創建。該庫附帶了一組簡單的基本收集器,可以組合它們來執行更復雜的聚合,而且您可以輕松地將自己的收集器添加到組合中。

第 3 部分 中,將深入剖析 Streams 的內部結構,以便了解在性能至關重要時如何最高效地使用該庫。

 

來自: http://www.ibm.com/developerworks/cn/java/j-java-streams-2-brian-goetz/index.html?ca=drs-

 

 本文由用戶 ReginaLangs 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!