RxJava 教程第三部分:馴服數據流之 組合數據流

JosephineMa 8年前發布 | 8K 次閱讀 RxJava Java開發

 

組合數據流

到目前為止我們介紹了如何創建數據流以及如何把數據流中的數據轉換成我們需要的數據。然而,大部分應用都需要處理多個數據源的數據。需要一種把多個數據源組合一起的方法。在前面的介紹中,也看到了一些數據流會使用多個 Observable。本節介紹如何把多個數據源的數據組合為一個數據源的操作函數。

Concatenation

一個數據流發射完后繼續發射下一個數據流是一種很常見的組合方法。

concat

concat 操作函數把多個數據流按照順序一個一個的發射數據。第一個數據流發射完后,繼續發射下一個。 concat 函數有多個重載函數:

public static final <T> Observable<T> concat(
    Observable<? extends Observable<? extends T>> observables)
public static final <T> Observable<T> concat(
    Observable<? extends T> t1,
    Observable<? extends T> t2)
public static final <T> Observable<T> concat(Observable<? extends T> t1,
    Observable<? extends T> t2,
    Observable<? extends T> t3)
public static final <T> Observable<T> concat(Observable<? extends T> t1,
    Observable<? extends T> t2,
    Observable<? extends T> t3,
    Observable<? extends T> t4)
// All the way to 10 observables
 

示例:

Observable<Integer> seq1 = Observable.range(0, 3);
Observable<Integer> seq2 = Observable.range(10, 3);
 
Observable.concat(seq1, seq2)
    .subscribe(System.out::println);
 

結果:

0
1
2
10
11
12
 

如果需要組合的數據流是動態的,則依然可以使用 concat 來組合返回多個 Observable 的情況。下面的示例中,使用 groupBy 來把一個 Observable 中的數據流分組為多個 Observable ,這樣 groupBy 返回的是多個 Observable, 然后使用 concat 把這些 動態生成的 Observable 給組合起來:

Observable<String> words = Observable.just(
    "First",
    "Second",
    "Third",
    "Fourth",
    "Fifth",
    "Sixth"
);
 
Observable.concat(words.groupBy(v -> v.charAt(0)))
    .subscribe(System.out::println);
 

結果:

First
Fourth
Fifth
Second
Sixth
Third
 

concat 的行為有點像 concatMap 操作函數的扁平處理(flattening phase)。事實上, concatMap 等價于 先應用 map 操作函數然后再使用 concat。

concatWith 函數是 concat 的另外一種使用方式,可以通過串聯的方法來一個一個的組合數據流:

public void exampleConcatWith() {
    Observable<Integer> seq1 = Observable.range(0, 3);
    Observable<Integer> seq2 = Observable.range(10, 3);
    Observable<Integer> seq3 = Observable.just(20);
 
    seq1.concatWith(seq2)
        .concatWith(seq3)
        .subscribe(System.out::println);
}
 

結果:

0
1
2
10
11
12
20
 

repeat

repeat 顧名思義,可以重復的發射自己。 repeat 不會緩存之前的數據,當再次發射數據的時候,會從新就算數據。

public final Observable<T> repeat()
public final Observable<T> repeat(long count)
 

示例:

Observable<Integer> words = Observable.range(0,2);
 
words.repeat(2)
    .subscribe(System.out::println);
 

結果:

0
1
0
1
 

repeatWhen

repeatWhen 可以指定一個條件,當該條件滿足的時候才重復發射數據流。條件為一個 Observable,當源Observable 結束的時候,會等待 條件 Observable 來發射數據通知源 Observable 重復發射數據流。如果條件 Observable 結束了,則不會觸發源 Observable 重復發射數據。

有時候需要知道一個重復發射的數據量是何時結束的。repeatWhen 提供了一種特別的 Observable 在數據流結束的時候發射一個 Void。可以使用這個 Observable來當做一種信號。

public final Observable<T> repeatWhen(
    Func1<? super Observable<? extends java.lang.Void>,? extends Observable<?>> notificationHandler)
 

repeatWhen 的參數是一個函數,該函數的參數為 Observable 返回另外一個 Observable。這兩個Observable 發射的數據類型是無關緊要的。輸入的 Observable 用了表示重復結束的信號,返回的 Observable 用來表示重新開始的信號。

下一個示例使用 repeatWhen 來自己實現一個 repeat(n) :

Observable<Long> values = Observable.interval(100, TimeUnit.MILLISECONDS);
 
values
    .take(2)
    .repeatWhen(ob -> {
        return ob.take(2);
    })
    .subscribe(new PrintSubscriber("repeatWhen"));
 

結果:

repeatWhen: 0
repeatWhen: 1
repeatWhen: 0
repeatWhen: 1
repeatWhen: Completed
 

上面的示例中,當重復發射完成后,ob 就立刻發射信號告訴源 Observable 重新發射。

下面的示例中,創建一個每隔兩秒就重復一次的無限循環數據流:

Observable<Long> values = Observable.interval(100, TimeUnit.MILLISECONDS);
 
values
    .take(5)
    .repeatWhen((ob)-> {
        ob.subscribe();
        return Observable.interval(2, TimeUnit.SECONDS);
    })
    .subscribe(new PrintSubscriber("repeatWhen"));
 

注意上面的返回的條件 Observable 每隔兩秒就發射一次信號。在后面會介紹更多關于時間控制的技巧。

另外一個需要注意的就是 ob.subscribe() 語句,看起來是多余的其實是必不可少的。這樣會強制創建 ob 對象,當前 repeatWhen 的實現需要 ob 被訂閱,否則是不會觸發重復發射數據的。

startWith

startWith 的參數為一個數據流,然后先發射該數據再發射 源 Observable 中的數據。

public final Observable<T> startWith(java.lang.Iterable<T> values)
public final Observable<T> startWith(Observable<T> values)
public final Observable<T> startWith(T t1)
public final Observable<T> startWith(T t1, T t2)
public final Observable<T> startWith(T t1, T t2, T t3)
// up to 10 values
 

Observable<Integer> values = Observable.range(0, 3);
 
values.startWith(-1,-2)
    .subscribe(System.out::println);
 

結果:

-1
-2
0
1
2
 

startWith 是使用 參數為 just 的 concat 函數的簡寫。

Observable.concat(
    Observable.just(-1,-2,-3),
    values)
// 和下面的是一樣的效果
values.startWith(-1,-2,-3)
 

并行數據流(Concurrent sequences)

Observable 并不總是在可預期的時間內發射數據。下面是一些用了組合并行 Observable 的操作函數。

amb

amb 的參數為多個 Observable,使用第一個先發射數據的 Observable ,其他的 Observable 被丟棄。

public static final <T> Observable<T> amb(
    java.lang.Iterable<? extends Observable<? extends T>> sources)
public static final <T> Observable<T> amb(
    Observable<? extends T> o1,
    Observable<? extends T> o2)
public static final <T> Observable<T> amb(
    Observable<? extends T> o1,
    Observable<? extends T> o2,
    Observable<? extends T> o3)
// Up to 10 observables
 

Observable.amb(
        Observable.timer(100, TimeUnit.MILLISECONDS).map(i -> "First"),
        Observable.timer(50, TimeUnit.MILLISECONDS).map(i -> "Second"))
    .subscribe(System.out::println);
 

結果:

Second
 

由于第二個 Observable 先開始發射數據,所以第一個 Observable 被丟棄了, 使用 第二個 Observable。

該操作函數可以用于如下情況:

你有多個廉價的資源提供方,但是這些資源提供方返回數據的時間是不一樣的。例如一個天氣預報應用,可以從多個數據源獲取數據,當其中一個數據源返回數據的時候,就丟棄其的請求,而使用這個數據源。

同樣,還有一個 ambWith 版本的函數,可以通過鏈式調用每個 Observable。讓代碼看起來更優雅一些:

Observable.timer(100, TimeUnit.MILLISECONDS).map(i -> "First")
    .ambWith(Observable.timer(50, TimeUnit.MILLISECONDS).map(i -> "Second"))
    .ambWith(Observable.timer(70, TimeUnit.MILLISECONDS).map(i -> "Third"))
    .subscribe(System.out::println);
 

結果:

Second
 

merge

merge 把多個 Observable 合并為一個,合并后的 Observable 在每個源Observable 發射數據的時候就發射同樣的數據。所以多個源 Observable 的數據最終是混合是一起的:

public static final <T> Observable<T> merge(
    java.lang.Iterable<? extends Observable<? extends T>> sequences)
public static final <T> Observable<T> merge(
    java.lang.Iterable<? extends Observable<? extends T>> sequences,
    int maxConcurrent)
public static final <T> Observable<T> merge(
    Observable<? extends Observable<? extends T>> source)
public static final <T> Observable<T> merge(
    Observable<? extends Observable<? extends T>> source,
    int maxConcurrent)
public static final <T> Observable<T> merge(
    Observable<? extends T> t1,
    Observable<? extends T> t2)
public static final <T> Observable<T> merge(
    Observable<? extends T> t1,
    Observable<? extends T> t2,
    Observable<? extends T> t3)
...
 

示例:

Observable.merge(
        Observable.interval(250, TimeUnit.MILLISECONDS).map(i -> "First"),
        Observable.interval(150, TimeUnit.MILLISECONDS).map(i -> "Second"))
    .take(10)
    .subscribe(System.out::println);
 

結果:

Second
First
Second
Second
First
Second
Second
First
Second
First
 

concat 和 merge 的區別是,merge 不會等到前面一個 Observable 結束才會發射下一個 Observable 的數據,merge 訂閱到所有的 Observable 上,如果有任何一個 Observable 發射了數據,則 就把該數據發射出來。同樣 還有一個 mergeWith 函數用了串聯調用。

Observable.interval(250, TimeUnit.MILLISECONDS).map(i -> "First")
    .mergeWith(Observable.interval(150, TimeUnit.MILLISECONDS).map(i -> "Second"))
    .take(10)
    .subscribe(System.out::println);
 

和上面輸出的結果一樣。

mergeDelayError

merge 中如果任意一個源 Observable 出現錯誤了,則 merge 后的 Observable 也就出錯并結束發射。使用 mergeDelayError 可以推遲發生的錯誤,繼續發射其他 Observable 發射的數據。

public static final <T> Observable<T> mergeDelayError(
    Observable<? extends Observable<? extends T>> source)
public static final <T> Observable<T> mergeDelayError(
    Observable<? extends T> t1,
        Observable<? extends T> t2)
public static final <T> Observable<T> mergeDelayError(
    Observable<? extends T> t1,
    Observable<? extends T> t2,
    Observable<? extends T> t3)
...
 

Observable<Long> failAt200 = 
    Observable.concat(
        Observable.interval(100, TimeUnit.MILLISECONDS).take(2),
        Observable.error(new Exception("Failed")));
Observable<Long> completeAt400 = 
    Observable.interval(100, TimeUnit.MILLISECONDS)
        .take(4);
 
Observable.mergeDelayError(failAt200, completeAt400)
    .subscribe(
        System.out::println,
        System.out::println);
 

結果:

0
0
1
1
2
3
java.lang.Exception: Failed
 

上面的示例中,開始兩個 Observable 都發射一樣的數據,當發射第二個數據 1 后,第一個 Observable 拋出一個異常退出,而合并后的數據流繼續發射直到所有的 Observable 完成或者也出現異常。

如果合并多個 Observable,則合并后的 Observable 只有當所有源 Observable 結束后才結束,如果有多個源 Observable 出現了異常,則合并后的 Observable 會用一個 CompositeException 來結束。

Observable<Long> failAt200 = 
Observable.concat(
    Observable.interval(100, TimeUnit.MILLISECONDS).take(2),
        Observable.error(new Exception("Failed")));
Observable<Long> failAt300 = 
    Observable.concat(
        Observable.interval(100, TimeUnit.MILLISECONDS).take(3),
        Observable.error(new Exception("Failed")));
Observable<Long> completeAt400 = 
    Observable.interval(100, TimeUnit.MILLISECONDS)
        .take(4);
 
Observable.mergeDelayError(failAt200, failAt300, completeAt400)
    .subscribe(
        System.out::println,
        System.out::println);
 

結果:

0
0
0
1
1
1
2
2
3
rx.exceptions.CompositeException: 2 exceptionsoccurred. 
 

switchOnNext

switchOnNext 的參數為一個返回 Observable 對象的 Observable。也就是說,這個參數為一個 Observable, 但是這個 Observable 所發射的數據類型是 Observable 。switchOnNext 返回的 Observable 發射數據的規則如下:

在參數 Observable 返回的 Observable 中,把最先發射數據的 Observable 中的數據拿來轉發,如果之后又有新的 Observable 開始發射數據了,則就切換到新的 Observable 丟棄前一個。

Observable.switchOnNext(
    Observable.interval(100, TimeUnit.MILLISECONDS)
        .map(i -> 
            Observable.interval(30, TimeUnit.MILLISECONDS)
                .map(i2 -> i)
        )
    )
    .take(9)
    .subscribe(System.out::println);
 

結果:

0
0
0
1
1
1
2
2
2
 

注意上面示例中 switchOnNext 的參數 每隔 100毫秒返回一個 Observable 。這個返回的 Observable 會每隔 30 毫秒發射一個數字,這個數字被映射為 100毫秒發射一個數據的 Observable 返回的數據。所以在第一個100毫秒的時候,switchOnNext 的參數返回的第一個 Observable 可以發射3個數據 0,然后到第100毫秒的時候,switchOnNext 的參數返回的第二個 Observable 開發發射數據1, 所以前一個發射數據 0 的 Observable 就被丟棄了, switchOnNext 切換到新的發射數據的 Observable。

switchMap

就像 flatMap 內部使用 merge 來組合發射的數據;以及 concatMap 使用 concat 來組合數據,而 switchMap 內部使用 switchOnNext 來打散組合數據。

public final <R> Observable<R> switchMap(Func1<? super T,? extends Observable<? extends R>> func)
 

源 Observable 所發射的每一個數據都被 func 函數給轉換為一個新的 Observable 了。每次只要 源Observable 發射一個數據,func 函數都把該數據轉換為一個 Observable 然后 switchMap 返回的 Observable 就使用這個新的 Observable 來發射數據。

前面的示例也可以用 switchMap 實現:

Observable.interval(100, TimeUnit.MILLISECONDS)
    .switchMap(i -> 
        Observable.interval(30, TimeUnit.MILLISECONDS)
            .map(l -> i))
    .take(9)
    .subscribe(System.out::println);
 

結果:

0
0
0
1
1
1
2
2
2
 

Pairing sequences

下面幾個操作符用來把多個源 Observable 發射的數據組合成一個數據。

zip

zip 是函數式編程中的一個基本概念,參數為多個源 Observable, 返回的結果是把這些源 Observable 發射的數據按照順序給組合起來。

下面的示例中,有兩個源 Observable 發射數據的速度是不一樣的。

Observable.zip(
        Observable.interval(100, TimeUnit.MILLISECONDS)
            .doOnNext(i -> System.out.println("Left emits " + i)),
        Observable.interval(150, TimeUnit.MILLISECONDS)
            .doOnNext(i -> System.out.println("Right emits " + i)),
        (i1,i2) -> i1 + " - " + i2)
    .take(6)
    .subscribe(System.out::println);
 

結果:

Leftemits 0
Rightemits 0
0 - 0
Leftemits 1
Rightemits 1
Leftemits 2
1 - 1
Leftemits 3
Rightemits 2
2 - 2
Leftemits 4
Leftemits 5
Rightemits 3
3 - 3
Leftemits 6
Rightemits 4
4 - 4
Leftemits 7
Rightemits 5
Leftemits 8
5 - 5
 

從上面示例中可以看到,zip 是按照順序來組合數據的。zip 有很多重載函數可以接受多個 Observable :

public static final <R> Observable<R> zip(
    java.lang.Iterable<? extends Observable<?>> ws,
    FuncN<? extends R> zipFunction)
public static final <R> Observable<R> zip(
    Observable<? extends Observable<?>> ws,
    FuncN<? extends R> zipFunction)
public static final <T1,T2,R> Observable<R> zip(
    Observable<? extends T1> o1,
    Observable<? extends T2> o2,
    Func2<? super T1,? super T2,? extends R> zipFunction)
public static final <T1,T2,T3,R> Observable<R> zip(
    Observable<? extends T1> o1,
    Observable<? extends T2> o2,
    Observable<? extends T3> o3,
    Func3<? super T1,? super T2,? super T3,? extends R> zipFunction)
/// etc
 

如何有多個 源 Observable,則 zip 會等待最慢的一個 Observable 發射完數據才開始組合這次發射的所有數據。

Observable.zip(
        Observable.interval(100, TimeUnit.MILLISECONDS),
        Observable.interval(150, TimeUnit.MILLISECONDS),
        Observable.interval(050, TimeUnit.MILLISECONDS),
        (i1,i2,i3) -> i1 + " - " + i2 + " - " + i3)
    .take(6)
    .subscribe(System.out::println);
 

結果:

0 - 0 - 0
1 - 1 - 1
2 - 2 - 2
3 - 3 - 3
4 - 4 - 4
5 - 5 - 5
 

zip 的任意一個源 Observable 結束標示著 zip 的結束。其他源 Observable 后續發射的數據被忽略了。 下面的例子組合三個 Observable,然后統計下 zip 返回的 Observable 發射了多少個數據:

Observable.zip(
        Observable.range(0, 5),
        Observable.range(0, 3),
        Observable.range(0, 8),
        (i1,i2,i3) -> i1 + " - " + i2 + " - " + i3)
    .count()
    .subscribe(System.out::println);
 

結果:

3

所以 zip 返回的Observable發射的速度和最慢的那個 Observable 一樣,發射的數據和發射最少數據的 那個 Observable 一樣。

zip 還有一個 zipWith 操作函數:

Observable.interval(100, TimeUnit.MILLISECONDS)
    .zipWith(
        Observable.interval(150, TimeUnit.MILLISECONDS), 
        (i1,i2) -> i1 + " - " + i2)
    .take(6)
    .subscribe(System.out::println);
 

結果:

0 - 0
1 - 1
2 - 2
3 - 3
4 - 4
5 - 5
 

zipWidth 還可以使用一個 iterable 為參數:

Observable.range(0, 5)
    .zipWith(
        Arrays.asList(0,2,4,6,8),
        (i1,i2) -> i1 + " - " + i2)
    .subscribe(System.out::println);
 

結果:

0 - 0
1 - 2
2 - 4
3 - 6
4 - 8
 

combineLatest

前面的 zip 使用源 Observable 發射的順序為組合的標記,而 combineLatest 使用的是時間。只要 combineLatest 的任何一個源 Observable 發射數據,則就使用該數據和其他Observable 最后一次發射的數據去組合。

Observable.combineLatest(
        Observable.interval(100, TimeUnit.MILLISECONDS)
            .doOnNext(i -> System.out.println("Left emits")),
        Observable.interval(150, TimeUnit.MILLISECONDS)
            .doOnNext(i -> System.out.println("Right emits")),
        (i1,i2) -> i1 + " - " + i2
    )
    .take(6)
    .subscribe(System.out::println);
 

結果:

Leftemits
Rightemits
0 - 0
Leftemits
1 - 0
Leftemits
2 - 0
Rightemits
2 - 1
Leftemits
3 - 1
Rightemits
3 - 2
 

combineLatest 一開始等待所有的源 Observable 發射第一個數據,然后只要有任意一個 Observable 發射數據,就用這個數據和其他 Observable 最后一次發射的數據組合。

combineLatest 同樣有多個重載函數可以組合多個源 Observable。 combineLatest 使用場景是只要有一個條件變化了就需要重新計算當前的數據或者狀態。比如在用markdown 寫博客的時候,編輯器里面有個控制按鈕為把單詞的每個字母轉換為大寫字母的開關,輸入框旁邊有個預覽界面。每當你在編輯框中輸入內容或者改變大小寫狀態的時候,combineLatest 就用輸入框和大小寫狀態的最新的值來重新渲染預覽界面。

來自: http://blog.chengyunfeng.com/?p=972

 本文由用戶 JosephineMa 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!