RxJava 教程第二部分:事件流基礎之 轉換數據流
Transformation of sequences
本節介紹轉換數據流中數據的方法。在真實世界中, Observable 中的數據可以是任意類型的,可能在你的應用中無法直接使用這些數據類型,你需要對這些數據對象進行一些轉換。
map 和 flatMap 是本節中操作函數的基礎。 下面是三種轉換方式的示意:
– Ana(morphism) T –> IObservable – Cata(morphism) IObservable
–> T
– Bind IObservable –> IObservable
本節還是繼續使用上一結引入的自定義 Subscriber:
class PrintSubscriber extends Subscriber{
private final String name;
public PrintSubscriber(String name) {
this.name = name;
}
@Override
public void onCompleted() {
System.out.println(name + ": Completed");
}
@Override
public void onError(Throwable e) {
System.out.println(name + ": Error: " + e);
}
@Override
public void onNext(Object v) {
System.out.println(name + ": " + v);
}
}
map
最基礎的轉換函數就是 map。 map 使用一個轉換的參數把源Observable 中的數據轉換為另外一種類型的數據。返回的 Observable 中包含了轉換后的數據。
public final <R> Observable<R> map(Func1<? super T,? extends R> func)
下面是把源 Observable 中的每個數據都加 3 然后再返回:
Observable<Integer> values = Observable.range(0,4);
values
.map(i -> i + 3)
.subscribe(new PrintSubscriber("Map"));
結果:
Map: 3
Map: 4
Map: 5
Map: 6
Map: Completed
上面的代碼只是示例 map 的使用,并沒有太大的實際意義。下面是一個比較實際的轉換方式:
Observable<Integer> values =
Observable.just("0", "1", "2", "3")
.map(Integer::parseInt);
values.subscribe(new PrintSubscriber("Map"));
結果:
Map: 0
Map: 1
Map: 2
Map: 3
Map: Completed
源 Observable 發射的為 String 類型數據,而我們需要的是 int 類型,則可以通過 map 把 String 轉換為 int。
如果你認為這種轉換太簡單了, 完全可以在 Subscriber 中完成,這樣在設計架構上并不合理,沒有有效的區分職責。 代碼設計每個部分都有各自的職責,使用 map 可以有效的確保職責清晰。方便后續修改。
cast 和 ofType
cast 是把一個對象強制轉換為子類型的縮寫形式。 假設源 Observable為 Observable
Observable<Object> values = Observable.just(0, 1, 2, 3);
values
.cast(Integer.class)
.subscribe(new PrintSubscriber("Map"));
結果:
Map: 0
Map: 1
Map: 2
Map: 3
Map: Completed
如果遇到類型不一樣的對象的話,就會拋出一個 error:
Observable<Object> values = Observable.just(0, 1, 2, "3");
values
.cast(Integer.class)
.subscribe(new PrintSubscriber("Map"));
結果:
Map: 0
Map: 1
Map: 2
Map: Error: java.lang.ClassCastException: Cannotcastjava.lang.String to java.lang.Integer
如果你不想處理類型不一樣的對象,則可以用 ofType 。 該函數用來判斷數據是否為 該類型,如果不是則跳過這個數據。
Observable<Object> values = Observable.just(0, 1, "2", 3);
values
.ofType(Integer.class)
.subscribe(new PrintSubscriber("Map"));
結果:
Map: 0
Map: 1
Map: 3
Map: Completed
timestamp 和 timeInterval
這兩個函數可以給數據流中的數據添加額外的時間相關的信息。timestamp 把數據轉換為 Timestamped 類型,里面包含了原始的數據和一個原始數據是何時發射的時間戳。
public final Observable<Timestamped<T>> timestamp()
Observable<Long> values = Observable.interval(100, TimeUnit.MILLISECONDS);
values.take(3)
.timestamp()
.subscribe(new PrintSubscriber("Timestamp"));
結果:
Timestamp: Timestamped(timestampMillis = 1428611094943, value = 0)
Timestamp: Timestamped(timestampMillis = 1428611095037, value = 1)
Timestamp: Timestamped(timestampMillis = 1428611095136, value = 2)
Timestamp: Completed
從結果可以看到,上面的數據大概每隔100毫秒發射一個。
如果你想知道前一個數據和當前數據發射直接的時間間隔,則可以使用 timeInterval 函數。
public final Observable<TimeInterval<T>> timeInterval()
Observable<Long> values = Observable.interval(100, TimeUnit.MILLISECONDS);
values.take(3)
.timeInterval()
.subscribe(new PrintSubscriber("TimeInterval"));
結果:
TimeInterval: TimeInterval [intervalInMilliseconds=131, value=0]
TimeInterval: TimeInterval [intervalInMilliseconds=75, value=1]
TimeInterval: TimeInterval [intervalInMilliseconds=100, value=2]
TimeInterval: Completed
TimeInterval 中有個屬性 intervalInMilliseconds 記錄了兩次數據發射直接的時間間隔。
這兩個函數中的時間對于記錄日志和調試程序是非常有用的。這是用 Rx 的方式來獲取異步調用的數據流信息。
materialize 和 dematerialize
materialize 對于記錄日志也是很有用的。materialize 把數據轉換為元數據發射出去:
public final Observable<Notification<T>> materialize()
元數據中包含了源 Observable 所發射的動作,是調用 onNext 還是 onComplete。注意上圖中,源 Observable 結束的時候, materialize 還會發射一個 onComplete 數據,然后才發射一個結束事件。
Observable<Long> values = Observable.interval(100, TimeUnit.MILLISECONDS);
values.take(3)
.materialize()
.subscribe(new PrintSubscriber("Materialize"));
結果:
Materialize: [rx.Notification@a4c802e9 OnNext 0]
Materialize: [rx.Notification@a4c802ea OnNext 1]
Materialize: [rx.Notification@a4c802eb OnNext 2]
Materialize: [rx.Notification@18d48ace OnCompleted]
Materialize: Completed
Notification 類包含了一些判斷每個數據發射類型的方法,如果出錯了還可以獲取錯誤信息 Throwable 對象。
dematerialize 函數會把 materialize 轉換后的Observable 再還原為 源 Observable。
flatMap
map 把一個數據轉換為另外一個數據。而 flatMap 把源 Observable 中的一個數據替換為任意數量的數據,可以為 0 個,也可以為無限個。 flatMap 把源 Observable 中的一個數據轉換為一個新的 Observable 發射出去。
public final <R> Observable<R> flatMap(Func1<? super T,? extends Observable<? extends R>> func)
flatMap 的參數會把 源 Observable 中發射的每個數據轉換為一個新的 Observable, 然后 flatMap 再把這些新的 Observable 中發射的數據發射出來。每個新的 Observable 數據都是按照他們產生的順序發射出來,但是 Observable 之間數據的順序可能會不一樣。
下面通過一個簡單的例子來幫助理解 flatMap 。
Observable<Integer> values = Observable.just(2);
values
.flatMap(i -> Observable.range(0,i))
.subscribe(new PrintSubscriber("flatMap"));
結果:
flatMap: 0
flatMap: 1
flatMap: Completed
上面的示例中,values 這個 Observable 只發射一個值 2. 而 flatMap 參數把數據 2 轉換為 Observable.range(0,2),其中 Lambda 表達式中的 i 為 values Observable 發射的數據,這里也就是 2. 然后訂閱到 flatMap 生成的新 Observable 上。 而 Observable.range(0,2) 會發射 0 和 1 兩個數據,所以結果就是 0、 1 、完成。
從上面示例中可以看到, flatMap 把 源 Observable 中每個值都轉換為一個新的 Observable 了。 比如:
Observable<Integer> values = Observable.range(1,3);
values
.flatMap(i -> Observable.range(0,i))
.subscribe(new PrintSubscriber("flatMap"));
這里 values 會發射 1 、 2、 3 三個數據。 然后 flatMap 把每個數據變為新的 Observable (Observable.range(0,i)),所以會有 3 個 Observable,這 3個 Observable 分別發射 [0], [0,1] 和 [0,1,2]。最終 flatMap 再把這 3 個新 Observable 發射的數據合并到一個 Observable 發射出去。所以上面的結果如下:
flatMap: 0
flatMap: 0
flatMap: 1
flatMap: 0
flatMap: 1
flatMap: 2
flatMap: Completed
再看一個示例,把 int 值轉換為 字母:
Observable<Integer> values = Observable.just(1);
values
.flatMap(i ->
Observable.just(
Character.valueOf((char)(i+64))
))
.subscribe(new PrintSubscriber("flatMap"));
上面的示例,用 map 函數實現會更簡單,這里是為了說明 flatMap 另外一種功能,如果你發現源 Observable 中發射的數據不符合你的要求,則你可以返回一個 空的 Observable。這就相當于過濾數據的作用, 例如:
Observable<Integer> values = Observable.range(0,30);
values
.flatMap(i -> {
if (0 < i && i <= 26)
return Observable.just(Character.valueOf((char)(i+64)));
else
return Observable.empty();
})
.subscribe(new PrintSubscriber("flatMap"));
結果:
flatMap: A
flatMap: B
flatMap: C
...
flatMap: X
flatMap: Y
flatMap: Z
flatMap: Completed
上面示例源 Observable 一共發射 0 到 29 這 30個數字。在 flatMap 中判斷 如果數字大于 0 并且小于等于 26,則轉換為字母用 并用 Observable.just 生成新的 Observable;其他數字都返回一個 Observable.empty() 空 Observable。
注意,flatMap 是把幾個新的 Observable 合并為一個 Observable 返回, 只要這些新的 Observable 有數據發射出來, flatMap 就會把數據立刻發射出去。所以如果這些新的 Observable 發射數據是異步的,那么 flatMap 返回的數據也是異步的。下面示例中使用 Observable.interval 來生成每個數據對應的新 Observable,由于 interval 返回的 Observable 是異步的,所以可以看到最終輸出的結果是每當有 Observable 發射數據的時候, flatMap 就返回該數據。
Observable.just(100, 150)
.flatMap(i ->
Observable.interval(i, TimeUnit.MILLISECONDS)
.map(v -> i)
)
.take(10)
.subscribe(new PrintSubscriber("flatMap"));
上面的 Lambda 表達式 先把參數 i (這里分別為 100 和 150 這兩個數字)轉換為 Observable.interval(i, TimeUnit.MILLISECONDS), 每隔 i 毫秒發射一個數字,這樣兩個 Observable.interval 都發射同樣的數字,只不過發射的時間間隔不一樣,所以為了區分打印的結果,我們再用 map(v -> i) 把結果轉換為 i 。結果如下:
flatMap: 100
flatMap: 150
flatMap: 100
flatMap: 100
flatMap: 150
flatMap: 100
flatMap: 150
flatMap: 100
flatMap: 100
flatMap: 150
flatMap: Completed
可以兩個新的 Observable 的數據交織在一起發射出來。
concatMap
如果你不想把新 Observable 中的數據交織在一起發射,則可以選擇使用 concatMap 函數。
該函數會等第一個新的 Observable 完成后再發射下一個 新的 Observable 中的數據。
Observable.just(100, 150)
.concatMap(i ->
Observable.interval(i, TimeUnit.MILLISECONDS)
.map(v -> i)
.take(3))
.subscribe(
System.out::println,
System.out::println,
() -> System.out.println("Completed"));
結果:
100
100
100
150
150
150
Completed
所以 concatMap 要求新的Observable 不能是無限的,否則該無限 Observable 會阻礙后面的數據發射。為此,上面的示例使用 take 來結束 Observable。
flatMapIterable
flatMapIterable 和 flatMap 類似,區別是 flatMap 參數把每個數據轉換為 一個新的 Observable,而 flatMapIterable 參數把一個數據轉換為一個新的 iterable 對象。
例如下面是一個把參數轉換為 iterable 的函數:
public static Iterable<Integer> range(int start, int count) {
List<Integer> list = new ArrayList<>();
for (int i=start ; i<start+count ; i++) {
list.add(i);
}
return list;
}
然后可以這樣使用該函數作為 flatMapIterable 的參數:
Observable.range(1, 3)
.flatMapIterable(i -> range(1, i))
.subscribe(System.out::println);
結果:
1
1
2
1
2
3
flatMapIterable 把生成的 3 個 iterable 合并為一個 Observable 發射。
作為 Rx 開發者,我們需要知道在 Rx 中應該使用 Observable 數據流來發射數據而不要混合使用傳統的 iterable。但是如果你無法控制數據的來源,提供數據的一方只提供 iterable 數據,則依然可以直接使用這些數據。flatMapIterable 把多個 iterable 的數據按照順序發射出來,不會交織發射。
flatMapIterable 還有另外一個重載函數可以用源 Observable 發射的數據來處理新的 iterable 中的每個數據:
Observable.range(1, 3)
.flatMapIterable(
i -> range(1, i),
(ori, rv) -> ori * (Integer) rv)
.subscribe(System.out::println);
結果:
1
2
4
3
6
9
注意,上面的 ori 參數取值為 源 Observable 發射出來的數據,也就是 1、 2、 3. 而 rv 參數取值為 range(1, i) 參數生成的 iterable 中的每個數據,也就是分別為 [1]、[1,2]、[1,2,3],所以最終的結果就是:[1 1], [1 2, 2 2], [1 3, 2 3, 3 3].