RxJava 教程第二部分:事件流基礎之 過濾數據

ygdg3049 8年前發布 | 8K 次閱讀 RxJava Java開發

 

到目前為止我們看到的示例都很簡單。你也可以用 Rx 來處理大批量實時數據,但是如果把所有大批量數據整個打包發給你的話,使用 Rx 還有啥優勢呢? 本節 我們將介紹一些操作函數(operators )來過濾數據、或者把所有數據變成一個需要的數據。

如果你了解過函數式編程(functional programming)或者 Java 中的 Stream,則本節介紹的操作函數是非常眼熟的。本節中所有的操作符都返回一個不影響前一個 Observable 的新 Observable。 整個 Rx 框架都遵守該原則。通過創建新的 Observable 來轉換之前的 Observable而不會對之前的 Observable 造成干擾。訂閱到初始 Observable 的 Subscribers 不會受到任何影響,但是在后面的章節中也會看到,開發者也需要當心該原則。

Marble diagrams(彈子圖)

你可以想象一個機器,不停的發射彈子出來,發射出來的彈子可以被其他模塊再次加工(比如 上色、把不合格的彈子給回收了),加工完成后再次發射出來 … 彈子圖就是對這個機器的抽象描述。在 Rx 中流行使用這種方式來描述操作符,畢竟圖片看起來直觀多了。 Marble diagrams(彈子圖)基本元素如下:

時間從左往右流動,每個圖形代表一個數據,豎線代表發射完成了,而 X 代表出現錯誤了。 操作函數把上面的 Observable 轉換下面的新的 Observable , 里面的每個數據都被操作函數給處理了并返回一個新的數據。

Filter(過濾數據)

filter 函數使用一個 predicate 函數接口來判斷每個發射的值是否能通過這個判斷。如果返回 true,則該數據繼續往下一個(過濾后的) Observable 發射。

比如下面示例創建了一個發射 0 到 9 十個數字的 源Observable。在該 Observable 使用一個 filter 操作來過濾掉奇數,最后只保留偶數。

Observable<Integer> values = Observable.range(0,10);
SubscriptionoddNumbers = values
    .filter(v -> v % 2 == 0)
    .subscribe(
        v -> System.out.println(v),
        e -> System.out.println("Error: " + e),
        () -> System.out.println("Completed")
    );
 

結果:

0
2
4
6
8
Completed
 

distinct 和 distinctUntilChanged

distinct 函數用來過濾掉已經出現過的數據了。

Observable<Integer> values = Observable.create(o -> {
    o.onNext(1);
    o.onNext(1);
    o.onNext(2);
    o.onNext(3);
    o.onNext(2);
    o.onCompleted();
});
 
Subscriptionsubscription = values
    .distinct()
    .subscribe(
        v -> System.out.println(v),
        e -> System.out.println("Error: " + e),
        () -> System.out.println("Completed")
    );
 

結果:

1
2
3
Completed
 

distinct 還有一個重載函數,該函數有個生成 key 的參數。每個發射的數據都使用該參數生成一個 key,然后使用該key 來判斷數據是否一樣。

public final <U> Observable<T> distinct(Func1<? super T,? extends U> keySelector)
 

下面示例中使用字符串的第一個字母作為 key 來比較。

Observable<String> values = Observable.create(o -> {
    o.onNext("First");
    o.onNext("Second");
    o.onNext("Third");
    o.onNext("Fourth");
    o.onNext("Fifth");
    o.onCompleted();
});
 
Subscriptionsubscription = values
    .distinct(v -> v.charAt(0))
    .subscribe(
        v -> System.out.println(v),
        e -> System.out.println("Error: " + e),
        () -> System.out.println("Completed")
    );
 

結果:

First
Second
Third
Completed
 

“Fourth” 和 “Fifth” 字符串被過濾掉了,應為他們的 key (首字母)和 First 一樣。已經發射過的數據將被過濾掉。

有經驗的碼農知道,該函數在內部維護一個 key 集合來保存所有已經發射數據的 key,當有新的數據發射的時候,在集合中查找該 數據的key 是否存在。 在使用 Rx 操作函數的時把內部細節給封裝起來了,但是我們應該注意該問題來避免性能問題。(如果有大量的數據,維護一個內部的集合來保存 key 可能會占用很多內存。)

distinct 還有個變體是 distinctUntilChanged。區別是 distinctUntilChanged 只過濾相鄰的 key 一樣的數據。

public final Observable<T> distinctUntilChanged()
public final <U> Observable<T> distinctUntilChanged(Func1<? super T,? extends U> keySelector)
 

Observable<Integer> values = Observable.create(o -> {
    o.onNext(1);
    o.onNext(1);
    o.onNext(2);
    o.onNext(3);
    o.onNext(2);
    o.onCompleted();
});
 
Subscriptionsubscription = values
    .distinctUntilChanged()
    .subscribe(
        v -> System.out.println(v),
        e -> System.out.println("Error: " + e),
        () -> System.out.println("Completed")
    );
 

結果:

1
2
3
2
Completed
 

同樣 distinctUntilChanged 也可以使用一個生成 key 的參數:

Observable<String> values = Observable.create(o -> {
    o.onNext("First");
    o.onNext("Second");
    o.onNext("Third");
    o.onNext("Fourth");
    o.onNext("Fifth");
    o.onCompleted();
});
 
Subscriptionsubscription = values
    .distinctUntilChanged(v -> v.charAt(0))
    .subscribe(
            v -> System.out.println(v),
            e -> System.out.println("Error: " + e),
            () -> System.out.println("Completed")
        );
 

結果:

First
Second
Third
Fourth
Completed
 

ignoreElements

ignoreElements 會忽略所有發射的數據,只讓 onCompleted 和 onError 可以通過。

Observable<Integer> values = Observable.range(0, 10);
 
Subscriptionsubscription = values
    .ignoreElements()
    .subscribe(
        v -> System.out.println(v),
        e -> System.out.println("Error: " + e),
        () -> System.out.println("Completed")
    );
 

結果:

Completed
 

ignoreElements() 和使用 filter(v -> false) 是一樣的效果。

skip 和 take

下面兩個操作函數依據發射數據的索引來在特定的位置切斷數據流,可以從頭開始切斷也可以從末尾開始切斷。 take 從頭開始獲取前 N 個數據,而 skip 則是從頭開始 跳過 N 個數據。注意,如果發射的數據比 N 小,則這兩個函數都會發射一個 error。

Observable<T>  take(int num)
 

Observable<Integer> values = Observable.range(0, 5);
 
Subscriptionfirst2 = values
    .take(2)
    .subscribe(
        v -> System.out.println(v),
        e -> System.out.println("Error: " + e),
        () -> System.out.println("Completed")
    );
 

結果:

0
1
Completed
 

熟悉 Java 8 Stream 的同學知道 take 函數和 limit 類似。 limit 函數在 Rx 中也有,和 take 是一樣的。只是為了方便熟悉 limit 的同學使用而已。

只要第 N 個數據可用, take 操作就結束了。 如果在 N 個數據發射之前發生了 error, error 信息會繼續傳遞到下一個 Observable。 如果 第 N 個數據發射后, take 就不再關心源 Observable 的狀態了。

Observable<Integer> values = Observable.create(o -> {
    o.onNext(1);
    o.onError(new Exception("Oops"));
});
 
Subscriptionsubscription = values
    .take(1)
    .subscribe(
        v -> System.out.println(v),
        e -> System.out.println("Error: " + e),
        () -> System.out.println("Completed")
    );
 

結果:

1
Completed
 

skip 返回 take 操作忽略的另外一部分數據。也就是跳過前面 N 個數據。

Observable<T>  skip(int num)
 

Observable<Integer> values = Observable.range(0, 5);
 
Subscriptionsubscription = values
    .skip(2)
    .subscribe(
        v -> System.out.println(v),
        e -> System.out.println("Error: " + e),
        () -> System.out.println("Completed")
    );
 

結果:

2
3
4
Completed
 

除了根據發射數據的索引來過濾數據以外,還可以使用數據流發射的時間來過濾。比如過濾掉前五秒發射的數據。

Observable<T>  take(long time, java.util.concurrent.TimeUnitunit)
Observable<T>  skip(long time, java.util.concurrent.TimeUnitunit)
 
Observable<Long> values = Observable.interval(100, TimeUnit.MILLISECONDS);
 
Subscriptionsubscription = values
    .take(250, TimeUnit.MILLISECONDS)
    .subscribe(
        v -> System.out.println(v),
        e -> System.out.println("Error: " + e),
        () -> System.out.println("Completed")
    );
 

結果:

0
1
Completed
 

上面示例中只獲取前 250 毫秒發射的數據。 第 300 毫秒才開始發射數據 3, 所以這里只獲取 0 和1 兩個數據。

skipWhile 和 takeWhile

這兩個函數是使用一個 predicate 參數來當做判斷條件。 如果判斷條件返回為 ture, 則 takeWhile 保留該數據。

Observable<T>  takeWhile(Func1<? super T,java.lang.Boolean> predicate)
 
Observable<Long> values = Observable.interval(100, TimeUnit.MILLISECONDS);
 
Subscriptionsubscription = values
    .takeWhile(v -> v < 2)
    .subscribe(
        v -> System.out.println(v),
        e -> System.out.println("Error: " + e),
        () -> System.out.println("Completed")
    );
 

結果:

0
1
Completed
 

不出意料, skipWhile 跳過過濾條件為 true 的數據。

Observable<Long> values = Observable.interval(100, TimeUnit.MILLISECONDS);
 
Subscriptionsubscription = values
    .skipWhile(v -> v < 2)
    .subscribe(
        v -> System.out.println(v),
        e -> System.out.println("Error: " + e),
        () -> System.out.println("Completed")
    );
 

結果:

2
3
4
...
 

skipLast 和 takeLast

skip 和 take 是從頭開始索引數據,而 skipLast 和 takeLast 和他們相反,是從末尾開始索引數據。

Observable<Integer> values = Observable.range(0,5);
 
Subscriptionsubscription = values
    .skipLast(2)
    .subscribe(
        v -> System.out.println(v),
        e -> System.out.println("Error: " + e),
        () -> System.out.println("Completed")
    );
 

結果:

0
1
2
Completed
 

同樣這兩個函數也有依時間為條件的重載函數。

takeUntil 和 skipUntil

takeUntil 和 skipUntil 這兩個函數和 takeWhile 、skipWhile 剛好相反。 當判斷條件為 false 的時候, takeUntil 保留該數據。

takeUntil 和 skipUntil 還有另外一種不一樣的重載函數。切斷的條件為 另外一個 Observable 發射數據的時刻。

// 獲取源Observable的數據直到 other Observable 發射第一個數據時停止
public final <E> Observable<T> takeUntil(Observable<? extends E> other)
 

Observable<Long> values = Observable.interval(100,TimeUnit.MILLISECONDS);
Observable<Long> cutoff = Observable.timer(250, TimeUnit.MILLISECONDS);
 
Subscriptionsubscription = values
    .takeUntil(cutoff)
    .subscribe(
        v -> System.out.println(v),
        e -> System.out.println("Error: " + e),
        () -> System.out.println("Completed")
    );
 

結果:

0
1
Completed
 

你應該還記得,這個 timer 函數會等待 250 毫秒然后發射一個數據。當 takeUntil 收到 這個數據的時候就停止繼續接受 values 發射的數據。 cutoff 這個充當信號的 Observable 可以是任意數據類型的,這里不關心數據只關心何時發射了數據。

skipUntil 也是一樣,當收到另外一個 Observable 發射數據的時候,就開始接收 源 Observable 的數據。

Observable<Long> values = Observable.interval(100,TimeUnit.MILLISECONDS);
Observable<Long> cutoff = Observable.timer(250, TimeUnit.MILLISECONDS);
 
Subscriptionsubscription = values
    .skipUntil(cutoff)
    .subscribe(
        v -> System.out.println(v),
        e -> System.out.println("Error: " + e),
        () -> System.out.println("Completed")
    );
 

結果:

2
3
4
...
 

來自: http://blog.chengyunfeng.com/?p=960 

 本文由用戶 ygdg3049 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!