RxJava 教程第四部分:并發 之意外情況處理

uhzn5269 8年前發布 | 14K 次閱讀 RxJava 并發 Java開發

Rx 盡量避免狀態泄露到數據流之外的場景。但是有些東西本身就帶有狀態。比如服務器可以上線和離線、手機可以訪問Wifi、按鈕被按下了等。在 Rx 中國,我們在一段時間內看到這些事件,并稱之為窗口(window)。其他事件在這個窗口內發生可能需要特殊處理。例如,手機在使用移動收費上網的時候,會把網絡請求優先級降低,來避免天價流量費的情況。

窗口 Window

buffer 函數可以緩存多個數據并整體發射。 window 操作函數和 buffer 有一對一的關系。區別在于 window 不會整體返回緩存的數據。而是把緩存的數據當做一個新的 Observable 數據流來返回。這樣當源 Observable 有數據發射了,這個數據就立刻發射到 window 返回的 Observable 里面了。下圖可以看到二者的區別:

window :

如果你還沒有了解 buffer, 建議你到前面的章節下去看看 buffer。 buffer 和 window 的函數形式是一樣的,功能也非常類似,并且易于理解。 buffer 都可以用 window 來實現其功能:

source.buffer(...) 
// 和下面的是一樣的功能
source.window(...).flatMap(w -> w.toList())
 

Window by count

窗口內可以限定數目。當窗口發射的數據達到了限定的數目,當前窗口的 Observable 就結束并開啟一個新的窗口。

和buffer 一樣, 使用 window(int count, int skip) 也可以跳過數據或者重疊使用數據。

Observable
    .merge(
        Observable.range(0, 5)
            .window(3,1))
    .subscribe(System.out::println);
 

結果:

0
1
1
2
2
2
3
3
3
4
4
4
 

可以看到當有數據重疊的時候, 多個 Observable 會返回同樣的數據,可以把結果輸出形式修改一下,方便查看:

Observable.range(0, 5)
    .window(3, 1)
    .flatMap(o -> o.toList())
    .subscribe(System.out::println);
 

結果:

[0, 1, 2]

[1, 2, 3]

[2, 3, 4]

[3, 4]

[4]

這樣就可以看到 window 和 buffer 是非常類似的了。

Window by time

同樣也可以指定窗口的時間長度:

public final Observable<Observable<T>> window(long timespan, long timeshift, java.util.concurrent.TimeUnitunit)
 
Observable.interval(100, TimeUnit.MILLISECONDS)
    .take(5)
    .window(250, 100, TimeUnit.MILLISECONDS)
    .flatMap(o -> o.toList())
    .subscribe(System.out::println);
 

結果:

[0, 1]
[0, 1, 2]
[1, 2, 3]
[2, 3, 4]
[3, 4]
[4]
 

上面的示例中,每隔100ms開始一個新的窗口,每個窗口持續 250ms。 第一個窗口從 0ms 開始并捕獲到數據 [0, 1](0 是在第100ms的時候發射的)。

Window with signal

同樣也可以用另外一個信號 Observable當做窗口結束的信號。

信號 Observable 直接也可以相互傳遞事件。

下面是使用信號 Observable 實現的重疊窗口:

Observable.interval(100, TimeUnit.MILLISECONDS)
    .take(5)
    .window(
        Observable.interval(100, TimeUnit.MILLISECONDS),
        o -> Observable.timer(250, TimeUnit.MILLISECONDS))
    .flatMap(o -> o.toList())
    .subscribe(System.out::println);
 

結果:

[1, 2]
[2, 3]
[3, 4]
[4]
[]
 

注意上面的數字 0 沒有捕獲到,原因在于源 Observable 和 信號 Observable 都是在同一時刻發生的,但是在實際操作中并沒有這種情況。所以當信號 Observable發射的時候, 數字 0 已經發射出去了。

Join

join 可以把兩個數據流中的數據組合一起。 zip 函數根據數據發射的順序來組合數據。 join 可以根據時間來組合。

public final <TRight,TLeftDuration,TRightDuration,R> Observable<R> join(
    Observable<TRight> right,
    Func1<T,Observable<TLeftDuration>> leftDurationSelector,
    Func1<TRight,Observable<TRightDuration>> rightDurationSelector,
    Func2<T,TRight,R> resultSelector)
 

join 組合的兩個 Observable 被稱之為 左和右。 上面的函數并不是靜態的,調用該函數的 Observable就是 左 。參數中的 leftDurationSelector 和 rightDurationSelector 分別使用 左右發射的數據為參數,然后返回一個定義時間間隔的 Observable,和 window 的最后一個重載函數類似。這個時間間隔是用來選擇里面發射的數據并組合一起。里面的數據會當做參數調用 resultSelector ,然后返回一個組合后的數據。然后組合后的數據由 join 返回的 Observable 發射出去。

join 比較難以理解以及強大之處就是如果選擇組合的數據。當有數據在 源 Observable 中發射,就開始一個該數據的時間窗口。對應的時間間隔用來計時該數據的窗口何時結束。在時間窗口還沒結束的時候,另外一個 Observable 發射的數據就和當前的數據組合一起。左右數據流的處理方式是一樣的,所以為了簡化介紹,我們假定只有一個 源 Observable 有時間窗口。

下面的示例中, 左Observable 數據流從來不結束而右Observable的時間窗口為 0.

Observable<String> left = 
        Observable.interval(100, TimeUnit.MILLISECONDS)
            .map(i -> "L" + i);
Observable<String> right = 
        Observable.interval(200, TimeUnit.MILLISECONDS)
            .map(i -> "R" + i);
 
left
    .join(
        right,
        i -> Observable.never(),
        i -> Observable.timer(0, TimeUnit.MILLISECONDS),
        (l,r) -> l + " - " + r
    )
    .take(10)
    .subscribe(System.out::println);
 

結果:

L0 - R0
L1 - R0
L0 - R1
L1 - R1
L2 - R1
L3 - R1
L0 - R2
L1 - R2
L2 - R2
L3 - R2
 

由于左邊的 Observable 時間窗口是永久,這意味著左邊每個發射的數據都會和右邊的 數據組合。 當右邊數據發射的比左邊的慢一倍。所以當左邊的數據發射了兩個對應右邊的同一個數據。然后右邊發射下一個數據就開啟了右邊的一個新的時間窗口,然后左右的數據會從開始的數據和右邊的新窗口中的數據組合。

下面示例把左右源 Observable 發射的間隔都設置為 100ms,然后把左時間窗口設置為 150ms:

Observable<String> left = 
        Observable.interval(100, TimeUnit.MILLISECONDS)
            .map(i -> "L" + i);
Observable<String> right = 
        Observable.interval(100, TimeUnit.MILLISECONDS)
            .map(i -> "R" + i);
 
left
    .join(
        right,
        i -> Observable.timer(150, TimeUnit.MILLISECONDS),
        i -> Observable.timer(0, TimeUnit.MILLISECONDS),
        (l,r) -> l + " - " + r
    )
    .take(10)
    .subscribe(System.out::println);
 

結果:

L0 - R0
L0 - R1
L1 - R1
L1 - R2
L2 - R2
L2 - R3
L3 - R3
L3 - R4
L4 - R4
L4 - R5
 

左右同時發射數據,所以左右同時開始第一個時間窗口,然后組合的數據為 “L0 – R0”。然后 左邊的時間窗口繼續,而右邊發射新的數據 R1 則右邊的數據R1和左邊的 L0 組合 “L0 – R1”,然后過了 50ms 后, 左邊的時間窗口結束了,開啟下一個時間窗口,結果為 “L1 – R1”。 一直重復下去。

兩個數據流都有時間窗口。每個數據流中的每個值按照如下方式組合:

– 如果舊的數據時間窗口還沒有結束,則和另外一個數據流中的每個舊的數據組合

– 如果當前數據的時間窗口還沒有結束,則和另外一個數據流中的每個新的數據組合。

groupJoin

只要檢測到一個組合數據,join 就用兩個數據調用 resultSelector 并發射返回的數據。 而 groupJoin 又有不同的功能:

public final <T2,D1,D2,R> Observable<R> groupJoin(
    Observable<T2> right,
    Func1<? super T,? extends Observable<D1>> leftDuration,
    Func1<? super T2,? extends Observable<D2>> rightDuration,
    Func2<? super T,? super Observable<T2>,? extends R> resultSelector)
 

除了 resultSelector 以外,其他參數和 join 函數的參數是一樣的。這個 resultSelector 從左邊的數據流中獲取一個數據并從右邊數據流中獲取一個 Observable。這個 Observable 會發射和左邊數據配對的所有數據。groupJoin 中的配對和 join 一樣是對稱的,但是結果是不一樣的。可以把 resultSelect 實現為一個 GroupedObservable, 左邊的數據當做 key,而把右邊的數據發射出去。

還使用第一個 jion的示例,左邊的數據流的時間窗口重來不關閉:

Observable<String> left = 
        Observable.interval(100, TimeUnit.MILLISECONDS)
            .map(i -> "L" + i)
            .take(6);
Observable<String> right = 
        Observable.interval(200, TimeUnit.MILLISECONDS)
            .map(i -> "R" + i)
            .take(3);
 
left
    .groupJoin(
        right,
        i -> Observable.never(),
        i -> Observable.timer(0, TimeUnit.MILLISECONDS),
        (l, rs) -> rs.toList().subscribe(list -> System.out.println(l + ": " + list))
    )
    .subscribe();
 

結果:

L0: [R0, R1, R2]
L1: [R0, R1, R2]
L2: [R1, R2]
L3: [R1, R2]
L4: [R2]
L5: [R2]
 

上面的 示例和 jion 中的示例數據配對是一樣的,只是 resultSelector 不一樣導致輸出的結果不一樣。

使用 groupJoin 和 flatMap 可以實現 jion的 功能:

.join(
    right,
    leftDuration
    rightDuration,
    (l,r) -> joinResultSelector(l,r)
)
// 和下面的一樣
.groupJoin(
    right,
    leftDuration
    rightDuration,
    (l, rs) -> rs.map(r -> joinResultSelector(l,r))
)
.flatMap(i -> i)
 

通過 join 和 groupBy 也可以實現 groupJoin。在示例代碼中有這個實現,感興趣的可以去看看。

 

來自: http://blog.chengyunfeng.com/?p=980

 本文由用戶 uhzn5269 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!