RxJava 教程第四部分:并發 之意外情況處理
Rx 盡量避免狀態泄露到數據流之外的場景。但是有些東西本身就帶有狀態。比如服務器可以上線和離線、手機可以訪問Wifi、按鈕被按下了等。在 Rx 中國,我們在一段時間內看到這些事件,并稱之為窗口(window)。其他事件在這個窗口內發生可能需要特殊處理。例如,手機在使用移動收費上網的時候,會把網絡請求優先級降低,來避免天價流量費的情況。
窗口 Window
buffer 函數可以緩存多個數據并整體發射。 window 操作函數和 buffer 有一對一的關系。區別在于 window 不會整體返回緩存的數據。而是把緩存的數據當做一個新的 Observable 數據流來返回。這樣當源 Observable 有數據發射了,這個數據就立刻發射到 window 返回的 Observable 里面了。下圖可以看到二者的區別:
window :
如果你還沒有了解 buffer, 建議你到前面的章節下去看看 buffer。 buffer 和 window 的函數形式是一樣的,功能也非常類似,并且易于理解。 buffer 都可以用 window 來實現其功能:
source.buffer(...)
// 和下面的是一樣的功能
source.window(...).flatMap(w -> w.toList())
Window by count
窗口內可以限定數目。當窗口發射的數據達到了限定的數目,當前窗口的 Observable 就結束并開啟一個新的窗口。
和buffer 一樣, 使用 window(int count, int skip) 也可以跳過數據或者重疊使用數據。
Observable
.merge(
Observable.range(0, 5)
.window(3,1))
.subscribe(System.out::println);
結果:
0
1
1
2
2
2
3
3
3
4
4
4
可以看到當有數據重疊的時候, 多個 Observable 會返回同樣的數據,可以把結果輸出形式修改一下,方便查看:
Observable.range(0, 5)
.window(3, 1)
.flatMap(o -> o.toList())
.subscribe(System.out::println);
結果:
[0, 1, 2]
[1, 2, 3]
[2, 3, 4]
[3, 4]
[4]
這樣就可以看到 window 和 buffer 是非常類似的了。
Window by time
同樣也可以指定窗口的時間長度:
public final Observable<Observable<T>> window(long timespan, long timeshift, java.util.concurrent.TimeUnitunit)
Observable.interval(100, TimeUnit.MILLISECONDS)
.take(5)
.window(250, 100, TimeUnit.MILLISECONDS)
.flatMap(o -> o.toList())
.subscribe(System.out::println);
結果:
[0, 1]
[0, 1, 2]
[1, 2, 3]
[2, 3, 4]
[3, 4]
[4]
上面的示例中,每隔100ms開始一個新的窗口,每個窗口持續 250ms。 第一個窗口從 0ms 開始并捕獲到數據 [0, 1](0 是在第100ms的時候發射的)。
Window with signal
同樣也可以用另外一個信號 Observable當做窗口結束的信號。
信號 Observable 直接也可以相互傳遞事件。
下面是使用信號 Observable 實現的重疊窗口:
Observable.interval(100, TimeUnit.MILLISECONDS)
.take(5)
.window(
Observable.interval(100, TimeUnit.MILLISECONDS),
o -> Observable.timer(250, TimeUnit.MILLISECONDS))
.flatMap(o -> o.toList())
.subscribe(System.out::println);
結果:
[1, 2]
[2, 3]
[3, 4]
[4]
[]
注意上面的數字 0 沒有捕獲到,原因在于源 Observable 和 信號 Observable 都是在同一時刻發生的,但是在實際操作中并沒有這種情況。所以當信號 Observable發射的時候, 數字 0 已經發射出去了。
Join
join 可以把兩個數據流中的數據組合一起。 zip 函數根據數據發射的順序來組合數據。 join 可以根據時間來組合。
public final <TRight,TLeftDuration,TRightDuration,R> Observable<R> join(
Observable<TRight> right,
Func1<T,Observable<TLeftDuration>> leftDurationSelector,
Func1<TRight,Observable<TRightDuration>> rightDurationSelector,
Func2<T,TRight,R> resultSelector)
join 組合的兩個 Observable 被稱之為 左和右。 上面的函數并不是靜態的,調用該函數的 Observable就是 左 。參數中的 leftDurationSelector 和 rightDurationSelector 分別使用 左右發射的數據為參數,然后返回一個定義時間間隔的 Observable,和 window 的最后一個重載函數類似。這個時間間隔是用來選擇里面發射的數據并組合一起。里面的數據會當做參數調用 resultSelector ,然后返回一個組合后的數據。然后組合后的數據由 join 返回的 Observable 發射出去。
join 比較難以理解以及強大之處就是如果選擇組合的數據。當有數據在 源 Observable 中發射,就開始一個該數據的時間窗口。對應的時間間隔用來計時該數據的窗口何時結束。在時間窗口還沒結束的時候,另外一個 Observable 發射的數據就和當前的數據組合一起。左右數據流的處理方式是一樣的,所以為了簡化介紹,我們假定只有一個 源 Observable 有時間窗口。
下面的示例中, 左Observable 數據流從來不結束而右Observable的時間窗口為 0.
Observable<String> left =
Observable.interval(100, TimeUnit.MILLISECONDS)
.map(i -> "L" + i);
Observable<String> right =
Observable.interval(200, TimeUnit.MILLISECONDS)
.map(i -> "R" + i);
left
.join(
right,
i -> Observable.never(),
i -> Observable.timer(0, TimeUnit.MILLISECONDS),
(l,r) -> l + " - " + r
)
.take(10)
.subscribe(System.out::println);
結果:
L0 - R0
L1 - R0
L0 - R1
L1 - R1
L2 - R1
L3 - R1
L0 - R2
L1 - R2
L2 - R2
L3 - R2
由于左邊的 Observable 時間窗口是永久,這意味著左邊每個發射的數據都會和右邊的 數據組合。 當右邊數據發射的比左邊的慢一倍。所以當左邊的數據發射了兩個對應右邊的同一個數據。然后右邊發射下一個數據就開啟了右邊的一個新的時間窗口,然后左右的數據會從開始的數據和右邊的新窗口中的數據組合。
下面示例把左右源 Observable 發射的間隔都設置為 100ms,然后把左時間窗口設置為 150ms:
Observable<String> left =
Observable.interval(100, TimeUnit.MILLISECONDS)
.map(i -> "L" + i);
Observable<String> right =
Observable.interval(100, TimeUnit.MILLISECONDS)
.map(i -> "R" + i);
left
.join(
right,
i -> Observable.timer(150, TimeUnit.MILLISECONDS),
i -> Observable.timer(0, TimeUnit.MILLISECONDS),
(l,r) -> l + " - " + r
)
.take(10)
.subscribe(System.out::println);
結果:
L0 - R0
L0 - R1
L1 - R1
L1 - R2
L2 - R2
L2 - R3
L3 - R3
L3 - R4
L4 - R4
L4 - R5
左右同時發射數據,所以左右同時開始第一個時間窗口,然后組合的數據為 “L0 – R0”。然后 左邊的時間窗口繼續,而右邊發射新的數據 R1 則右邊的數據R1和左邊的 L0 組合 “L0 – R1”,然后過了 50ms 后, 左邊的時間窗口結束了,開啟下一個時間窗口,結果為 “L1 – R1”。 一直重復下去。
兩個數據流都有時間窗口。每個數據流中的每個值按照如下方式組合:
– 如果舊的數據時間窗口還沒有結束,則和另外一個數據流中的每個舊的數據組合
– 如果當前數據的時間窗口還沒有結束,則和另外一個數據流中的每個新的數據組合。
groupJoin
只要檢測到一個組合數據,join 就用兩個數據調用 resultSelector 并發射返回的數據。 而 groupJoin 又有不同的功能:
public final <T2,D1,D2,R> Observable<R> groupJoin(
Observable<T2> right,
Func1<? super T,? extends Observable<D1>> leftDuration,
Func1<? super T2,? extends Observable<D2>> rightDuration,
Func2<? super T,? super Observable<T2>,? extends R> resultSelector)
除了 resultSelector 以外,其他參數和 join 函數的參數是一樣的。這個 resultSelector 從左邊的數據流中獲取一個數據并從右邊數據流中獲取一個 Observable。這個 Observable 會發射和左邊數據配對的所有數據。groupJoin 中的配對和 join 一樣是對稱的,但是結果是不一樣的。可以把 resultSelect 實現為一個 GroupedObservable, 左邊的數據當做 key,而把右邊的數據發射出去。
還使用第一個 jion的示例,左邊的數據流的時間窗口重來不關閉:
Observable<String> left =
Observable.interval(100, TimeUnit.MILLISECONDS)
.map(i -> "L" + i)
.take(6);
Observable<String> right =
Observable.interval(200, TimeUnit.MILLISECONDS)
.map(i -> "R" + i)
.take(3);
left
.groupJoin(
right,
i -> Observable.never(),
i -> Observable.timer(0, TimeUnit.MILLISECONDS),
(l, rs) -> rs.toList().subscribe(list -> System.out.println(l + ": " + list))
)
.subscribe();
結果:
L0: [R0, R1, R2]
L1: [R0, R1, R2]
L2: [R1, R2]
L3: [R1, R2]
L4: [R2]
L5: [R2]
上面的 示例和 jion 中的示例數據配對是一樣的,只是 resultSelector 不一樣導致輸出的結果不一樣。
使用 groupJoin 和 flatMap 可以實現 jion的 功能:
.join(
right,
leftDuration
rightDuration,
(l,r) -> joinResultSelector(l,r)
)
// 和下面的一樣
.groupJoin(
right,
leftDuration
rightDuration,
(l, rs) -> rs.map(r -> joinResultSelector(l,r))
)
.flatMap(i -> i)
通過 join 和 groupBy 也可以實現 groupJoin。在示例代碼中有這個實現,感興趣的可以去看看。