RxJava 教程第二部分:事件流基礎之 聚合
Aggregation
前面介紹了如何過濾掉不需要的數據、如何根據各種條件停止發射數據、如何檢查數據是否符合某個條件。這些操作對數據流來說都是非常有意義的。 本節介紹如何根據數據流中的數據來生成新的有意義的數據。
本節的操作函數會使用源 Observable 中的事件流中的數據,然后把這些數據轉換為其他類型的數據。返回結果是包含一個數據的 Observable。
如果你從頭開始閱讀本系列教程,則會發現前面代碼中有很多重復的地方。 為了避免重復代碼并且使代碼更加簡潔,方便我們聚焦要介紹的函數,從本節開始在示例代碼中會引入一個自定義的 Subscriber 。該 Subscribe 用來訂閱 Observable 并打印結果:
class PrintSubscriber extends Subscriber{
private final String name;
public PrintSubscriber(String name) {
this.name = name;
}
@Override
public void onCompleted() {
System.out.println(name + ": Completed");
}
@Override
public void onError(Throwable e) {
System.out.println(name + ": Error: " + e);
}
@Override
public void onNext(Object v) {
System.out.println(name + ": " + v);
}
}
很簡單的一個自定義實現,打印每個事件并使用一個 TAG 來標記是那個 Subscriber.
count
count 函數和 Java 集合中的 size 或者 length 一樣。用來統計源 Observable 完成的時候一共發射了多少個數據。
Observable<Integer> values = Observable.range(0, 3);
values
.subscribe(new PrintSubscriber("Values"));
values
.count()
.subscribe(new PrintSubscriber("Count"));
結果:
Values: 0
Values: 1
Values: 2
Values: Completed
Count: 3
Count: Completed
如果發射數據的個數超過了 int 最大值,則可以使用 countLong 函數。
first
first 類似于 take(1) , 發射 源 Observable 中的第一個數據。如果沒有數據,則返回 ava.util.NoSuchElementException。還有一個重載的帶有 過濾 參數,則返回第一個滿足該條件的數據。
Observable<Long> values = Observable.interval(100, TimeUnit.MILLISECONDS);
values
.first(v -> v>5)
.subscribe(new PrintSubscriber("First"));
結果:
First: 6
可以使用 firstOrDefault 來避免 java.util.NoSuchElementException 錯誤信息,這樣如果沒有發現數據,則發射一個默認的數據。
last
last 和 lastOrDefault 是和 first 一樣的,區別就是當 源 Observable 完成的時候, 發射最后的數據。 如果使用重載的帶 過濾參數的函數,則返回最后一個滿足該條件的數據。 從后面開始,這種和前面功能非常類似的示例代碼就省略了。但是你可以在 示例代碼 中查看這些省略的示例。
single
single 只會發射源 Observable 中的一個數據,如果使用重載的帶過濾條件的函數,則發射符合該過濾條件的那個數據。和 first 、last 不一樣的地方是,single 會檢查數據流中是否只包含一個所需要的的數據,如果有多個則會拋出一個錯誤信息。所以 single 用來檢查數據流中是否有且僅有一個符合條件的數據。所以 single 只有在源 Observable 完成后才能返回。
Observable<Long> values = Observable.interval(100, TimeUnit.MILLISECONDS);
values.take(10) // 獲取前 10 個數據 的 Observable
.single(v -> v == 5L) // 有且僅有一個 數據為 5L
.subscribe(new PrintSubscriber("Single1"));
values
.single(v -> v == 5L) // 由于源 Observable 為無限的,所以這個不會打印任何東西
.subscribe(new PrintSubscriber("Single2"));
結果:
Single1: 5
Single1: Completed
和前面的類似,使用 singleOrDefault 可以返回一個默認值。
Custom aggregators(自定義聚合)
本節前面介紹的幾個函數,和之前看到的也沒太大區別。下面會介紹兩個非常強大的操作函數,可以很方便的來擴展源 Observable。 之前所介紹的所有操作函數都可以通過這兩個函數來實現。
reduce
你可能從 MapReduce 中了解過 reduce。該思想是使用源 Observable 中的所有數據兩兩組合來生成一個單一的 數據。在大部分重載函數中都需要一個函數用來定義如何組合兩個數據變成一個。
public final Observable<T> reduce(Func2<T,T,T> accumulator)
下面的示例是最好的解釋。示例代碼的第一個功能是對數據流中所有整數求和(0+1+2+3+4+…)。第二個功能是找出所有整數中最小的那個。
Observable<Integer> values = Observable.range(0,5);
values
.reduce((i1,i2) -> i1+i2)
.subscribe(new PrintSubscriber("Sum"));
values
.reduce((i1,i2) -> (i1>i2) ? i2 : i1)
.subscribe(new PrintSubscriber("Min"));
結果:
Sum: 10
Sum: Completed
Min: 0
Min: Completed
Rx 中的 reduce 和并行系統中的 reduce 不一樣。在并行系統中的 reduce 是指,計算的取值是不相關的,這樣多個機器可以獨立并行工作。在 Rx 中是使用從數據流中第一個數據到最后一個數據(從左往右)中的數據來調用 參數 accumulator ,accumulator 用前一次返回的結果和下一個數據來再次調用 accumulator 。 下面這個重載函數更加暴露了這個設計意圖。
public final <R> Observable<R> reduce(R initialValue, Func2<R,? super T,R> accumulator)
accumulator 參數返回的數據類型和 源 Observable 的數據類型可能是不一樣的。accumulator 的第一個參數為前一步 accumulator 執行的結果,而第二個參數為 下一個數據。 使用一個初始化的值作為整個處理流程的開始。下面的示例通過重新實現 count 函數來演示 reduce 的使用:
Observable<String> values = Observable.just("Rx", "is", "easy");
values
.reduce(0, (acc,next) -> acc + 1)
.subscribe(new PrintSubscriber("Count"));
結果:
Count: 3
Count: Completed
上面示例中的 accumulator 參數為 (acc,next) -> acc + 1 這個 Lambda 表達式,該表達式需要兩個參數 acc 和next, 當第一個數據從 源 Observable 發射出來的時候,由于 Lambda 表達式還沒有調用過,所以使用 初始值 0 來替代 acc ,使用第一個字符串“Rx” 來調用 accumulator 參數,這樣 (acc,next) -> acc + 1 表達式返回的值就是 acc + 1 (而 acc 為初始值 0 ,所以返回 1, 可以看到 這個 next 參數 為源 Observable 的值在這里是沒有用的);這樣 源Observable 每次發射一個數據, Lambda 就把上一次的結果加1 返回。和 count 的功能一樣。
對于前面只返回一個數據結果的操作函數,大部分都可以通過 reduce 來實現。對于那些 源 Observable 沒有完成就返回的操作函數來說,是不能使用 reduce 來實現的。所以 可以用 reduce 來實現 last,但是用 reduce 實現的 all 函數和原來的 all 是不太一樣的。
scan
scan 和 reduce 很像,不一樣的地方在于 scan會發射所有中間的結算結果。
public final Observable<T> scan(Func2<T,T,T> accumulator)
通過上圖可以看到和 reduce 的區別, reduce 只是最后把計算結果發射出來,而 scan 把每次的計算結果都發射出來。
Observable<Integer> values = Observable.range(0,5);
values
.scan((i1,i2) -> i1+i2)
.subscribe(new PrintSubscriber("Sum"));
結果:
Sum: 0
Sum: 1
Sum: 3
Sum: 6
Sum: 10
Sum: Completed
reduce 可以通過 scan 來實現: reduce(acc) = scan(acc).takeLast() 。所以 scan 比 reduce 更加通用。
源 Observable 發射數據,經過 scan 處理后 scan 也發射一個處理后的數據,所以 scan 并不要求源 Observable 完成發射。下面示例實現了 查找已經發射數據中的最小值的功能:
Subject<Integer, Integer> values = ReplaySubject.create();
values
.subscribe(new PrintSubscriber("Values"));
values
.scan((i1,i2) -> (i1<i2) ? i1 : i2)
.distinctUntilChanged()
.subscribe(new PrintSubscriber("Min"));
values.onNext(2);
values.onNext(3);
values.onNext(1);
values.onNext(4);
values.onCompleted();
結果:
Values: 2
Min: 2
Values: 3
Values: 1
Min: 1
Values: 4
Values: Completed
Min: Completed
Aggregation to collections(把數據聚合到集合中)
使用 reduce 可以把源Observable 發射的數據放到一個集合中:
Observable<Integer> values = Observable.range(10,5);
values
.reduce(
new ArrayList<Integer>(),
(acc, value) -> {
acc.add(value);
return acc;
})
.subscribe(v -> System.out.println(v));
reduce 的參數初始值為 new ArrayList (), Lambda 表達式參數把源Observable 發射的數據添加到這個 List 中。當 源Observable 完成的時候,返回這個 List 對象。
結果:
[10, 11, 12, 13, 14]
上面的示例代碼其實并不太符合 Rx 操作符的原則,操作符有個原則是不能修改其他對象的狀態。所以符合原則的實現應該是在每次轉換中都創建一個新的 ArrayList 對象。下面是一個符合原則但是效率很低的實現:
.reduce(
new ArrayList<Integer>(),
(acc, value) -> {
ArrayList<Integer> newAcc = (ArrayList<Integer>) acc.clone();
newAcc.add(value);
return newAcc;
})
collect
上面每一個值都創建一個新對象的性能是無法接受的。為此, Rx 提供了一個 collect 函數來實現該功能,該函數使用了一個可變的 accumulator 。需要通過文檔說明你沒有遵守 Rx 的原則使用不可變對象,避免其他人誤解:
Observable<Integer> values = Observable.range(10,5);
values
.collect(
() -> new ArrayList<Integer>(),
(acc, value) -> acc.add(value))
.subscribe(v -> System.out.println(v));
結果:
[10, 11, 12, 13, 14]
通常你不需要像這樣手工的來收集數據, RxJava 提供了很多操作函數來實現這個功能。
toList
前一個示例代碼可以這樣實現:
Observable<Integer> values = Observable.range(10,5);
values
.toList()
.subscribe(v -> System.out.println(v));
toSortedList
toSortedList 和前面類似,返回一個排序后的 list,下面是該函數的定義:
public final Observable<java.util.List<T>> toSortedList()
public final Observable<java.util.List<T>> toSortedList(
Func2<? super T,? super T,java.lang.Integer> sortFunction)
可以使用默認的比較方式來比較對象,也可以提供一個比較參數。該比較參數和 Comparator 接口語義一致。
下面通過一個自定義的比較參數來返回一個倒序排列的整數集合:
Observable<Integer> values = Observable.range(10,5);
values
.toSortedList((i1,i2) -> i2 - i1)
.subscribe(v -> System.out.println(v));
結果:
[14, 13, 12, 11, 10]
toMap
toMap 把數據流 T 變為一個 Map<TKey,T>。 該函數有三個重載形式:
public final <K> Observable<java.util.Map<K,T>> toMap(
Func1<? super T,? extends K> keySelector)
public final <K,V> Observable<java.util.Map<K,V>> toMap(
Func1<? super T,? extends K> keySelector,
Func1<? super T,? extends V> valueSelector)
public final <K,V> Observable<java.util.Map<K,V>> toMap(
Func1<? super T,? extends K> keySelector,
Func1<? super T,? extends V> valueSelector,
Func0<? extends java.util.Map<K,V>> mapFactory)
keySelector 功能是從一個值 T 中獲取他對應的 key。valueSelector 功能是從一個值 T 中獲取需要保存 map 中的值。mapFactory 功能是創建該 map 對象。
來看看一個示例:有這么一個 Person 對象:
class Person {
public final String name;
public final Integer age;
public Person(String name, int age) {
this.name = name;
this.age = age;
}
}
下面的代碼使用 Person 的 name 作為 key, Person 作為 map 的value:
Observable<Person> values = Observable.just(
new Person("Will", 25),
new Person("Nick", 40),
new Person("Saul", 35)
);
values
.toMap(person -> person.name)
.subscribe(new PrintSubscriber("toMap"));
結果:
toMap: {Saul=Person@7cd84586, Nick=Person@30dae81, Will=Person@1b2c6ec2}
toMap: Completed
還可以用 Person 的 age 作為map 的value:
Observable<Person> values = Observable.just(
new Person("Will", 25),
new Person("Nick", 40),
new Person("Saul", 35)
);
values
.toMap(
person -> person.name,
person -> person.age)
.subscribe(new PrintSubscriber("toMap"));
結果:
toMap: {Saul=35, Nick=40, Will=25}
toMap: Completed
還可以自定義如何生成這個 map 對象:
values
.toMap(
person -> person.name,
person -> person.age,
() -> new HashMap())
.subscribe(new PrintSubscriber("toMap"));
最后一個參數為工廠函數,每次一個新的 Subscriber 訂閱的時候, 都會返回一個新的 map 對象。
toMultimap
通常情況下多個 value 的 key 可能是一樣的。 一個 key 可以映射多個 value 的數據結構為 multimap,multimap 的 value 為一個集合。該過程被稱之為 “grouping” (分組)。
public final <K> Observable<java.util.Map<K,java.util.Collection<T>>> toMultimap(
Func1<? super T,? extends K> keySelector)
public final <K,V> Observable<java.util.Map<K,java.util.Collection<V>>> toMultimap(
Func1<? super T,? extends K> keySelector,
Func1<? super T,? extends V> valueSelector)
public final <K,V> Observable<java.util.Map<K,java.util.Collection<V>>> toMultimap(
Func1<? super T,? extends K> keySelector,
Func1<? super T,? extends V> valueSelector,
Func0<? extends java.util.Map<K,java.util.Collection<V>>> mapFactory)
public final <K,V> Observable<java.util.Map<K,java.util.Collection<V>>> toMultimap(
Func1<? super T,? extends K> keySelector,
Func1<? super T,? extends V> valueSelector,
Func0<? extends java.util.Map<K,java.util.Collection<V>>> mapFactory,
Func1<? super K,? extends java.util.Collection<V>> collectionFactory)
下面是通過 age 來分組 Person 的實現:
Observable<Person> values = Observable.just(
new Person("Will", 35),
new Person("Nick", 40),
new Person("Saul", 35)
);
values
.toMultimap(
person -> person.age,
person -> person.name)
.subscribe(new PrintSubscriber("toMap"));
結果:
toMap: {35=[Will, Saul], 40=[Nick]}
toMap: Completed
toMultimap 的參數和 toMap 類似,最后一個 collectionFactory 參數是用來創建 value 的集合對象的,collectionFactory 使用 key 作為參數,這樣你可以根據 key 來做不同的處理。下面示例代碼中沒有使用這個 key 參數:
Observable<Person> values = Observable.just(
new Person("Will", 35),
new Person("Nick", 40),
new Person("Saul", 35)
);
values
.toMultimap(
person -> person.age,
person -> person.name,
() -> new HashMap(),
(key) -> new ArrayList()) // 沒有使用這個 key 參數
.subscribe(new PrintSubscriber("toMap"));
注意事項
這些操作函數都有非常有限的用法。這些函數只是用來給初學者把數據收集到集合中使用的,并且內部使用傳統的方式來處理數據。這些方式不應該在實際項目中實現,因為他們和使用 Rx 的理念并不相符。
groupBy
groupBy 是 toMultimap 函數的 Rx 方式的實現。groupBy 根據每個源Observable 發射的值來計算一個 key, 然后為每個 key 創建一個新的 Observable并把key 一樣的值發射到對應的新 Observable 中。
public final <K> Observable<GroupedObservable<K,T>> groupBy(Func1<? super T,? extends K> keySelector)
返回的結果為 GroupedObservable。 每次發現一個新的key,內部就生成一個新的 GroupedObservable并發射出來。和普通的 Observable 相比 多了一個 getKey 函數來獲取 分組的 key。來自于源Observable中的值會被發射到對應 key 的 GroupedObservable 中。
嵌套的 Observable 導致方法的定義比較復雜,但是提供了隨時發射數據的優勢,沒必要等源Observable 發射完成了才能返回數據。
下面的示例中使用了一堆單詞作為源Observable的數據,然后根據每個單詞的首字母作為分組的 key,最后把每個分組的 最后一個單詞打印出來:
Observable<String> values = Observable.just(
"first",
"second",
"third",
"forth",
"fifth",
"sixth"
);
values.groupBy(word -> word.charAt(0))
.subscribe(
group -> group.last()
.subscribe(v -> System.out.println(group.getKey() + ": " + v))
);
上面的代碼使用了嵌套的 Subscriber,在Rx 前傳 中 我們介紹了 Rx 功能之一就是為了避免嵌套回調函數,所以下面演示了如何避免嵌套:
Observable<String> values = Observable.just(
"first",
"second",
"third",
"forth",
"fifth",
"sixth"
);
values.groupBy(word -> word.charAt(0))
.flatMap(group ->
group.last().map(v -> group.getKey() + ": " + v)
)
.subscribe(v -> System.out.println(v));
結果:
s: sixth
t: third
f: fifth
下一章會介紹這里使用的 map 和 flatMap 操作函數。
Nested observables
嵌套的 Observable 可能一開始看起來比較迷糊,但是他們有很多使用的場景。很好很強大,比如:
- Partitions of Data (分區數據)
-
- 你可以把來至于一個源Observable 的數據分為多個 Observable并分別給多個資源去處理。分區數據對于聚合數據也是有用的,通常使用 groupBy 函數來實現該功能。
- 在線游戲服務器
-
- 例如魔獸世界有很多服務器。每個新的值代表一個服務器上線了。而嵌套的 Observable 值是每個服務器的延遲時間,這樣用戶就可以看到每個服務器是否可用的信息的。如果一個服務器掛了,則嵌套的Observable通過發射一個完成信號標記服務器掛了。
- 金融數據流
-
- 交易市場每天都有開市和閉市的時間。當開市了就提供每個市場的價格信息流,閉市了就標記完成了。
- 聊天室
-
- 用戶可以加入一個聊天室(源 Observable 里面有很多個聊天室),加入聊天室后可以留言(聊天室本身為嵌套的 Observable,在該 Observable中保存留言數據),然后還可以離開聊天室(結束 嵌套的 Observable 數據流)。
- 文件監視器
-
- 文件夾中的文件可以監視其修改操作。嵌套的 Observable 可以代表對每個文件所做的操作,刪除文件代表嵌套的 Observable 完成了。
nest
當和 嵌套的 Observable 打交道的時候,就要使用 nest 函數了。nest 函數把一個普通的非 嵌套 Observable 變為一個嵌套的 Observable。 nest 把一個源 Observable 變為一個嵌套的 Observable 發射出去就結束了。
Observable.range(0, 3)
.nest()
.subscribe(ob -> ob.subscribe(System.out::println));
結果:
0
1
2
上面的示例只是為了演示 nest 操作,實際沒球用!! 如果從其他地方獲取了一個非 嵌套的 Observable,但是使用的時候需要使用 嵌套的 Observable,則你可以通過 nest 函數來轉換。