RxJava 教程第三部分:馴服數據流之自定義操作函數

ythtgw5326 8年前發布 | 12K 次閱讀 RxJava Java開發

RxJava 提供了很多 操作函數 。加上各種重載函數,一共有 300 多個操作函數。這些函數中只有很少一部分是核心的操作函數,離開這些核心的函數根本就沒法使用 RxJava 了。其他的大部分函數只是一些便捷函數,方便開發者使用,并且他們的名字基本都說明了他們的用法。比如 如果操作函數 source.First(user -> user.isOnline()) 不存在,則我們依然可以使用 source.filter(user -> user.isOnline()).First() 來實現同樣的功能。

盡管提供了 300 多個操作函數,但這些也都是很基本的操作。 Rx 提供了基礎的功能,在此之上可以建構更加復雜的功能。最終你會遇到定義可重用代碼的地方。 在 標準 Java 中重用代碼是通過類和函數來實現,而在 Rx 中,是通過自定義操作函數來實現代碼重用。例如,在您的程序中,計算數字流的平均數可能經常使用。但是 Observable 中并沒有該函數,你可以自定義一個:

class AverageAcc {
    public final int sum;
    public final int count;
    public AverageAcc(int sum, int count) {
        this.sum = sum;
        this.count = count;
    }
}
 
source
    .scan(
        new AverageAcc(0,0),
        (acc, v) -> new AverageAcc(acc.sum + v, acc.count + 1))
    .filter(acc -> acc.count > 0)
    .map(acc -> acc.sum/(double)acc.count);
 

上面的代碼實現了功能,但是沒法重用。在標準的 Java 中可能會定義一個可以處理各種數據的函數,所以 一般的 Java 開發者可能一開始想到用一個函數來實現:

public static Observable<Double> runningAverage(Observable<Integer> source) {
    return source
        .scan(
            new AverageAcc(0,0),
            (acc, v) -> new AverageAcc(acc.sum + v, acc.count + 1))
        .filter(acc -> acc.count > 0)
        .map(acc -> acc.sum/(double)acc.count);
}
 

然后就可以重用了:

runningAverage(Observable.just(3, 5, 6, 4, 4))
    .subscribe(System.out::println);
 

結果:

3.0
4.0
4.666666666666667
4.5
4.4
 

由于上面的代碼很簡單,所以看起來還不錯。如果我們用自定義的操作函數做一些復雜的操作。例如,源 Observable 為一個句子,把這個句子分割沒每個單詞,并且把每個單詞的長度作為數字的輸入:

runningAverage(
    Observable.just("The brown fox jumped and I forget the rest")
        .flatMap(phrase -> Observable.from(phrase.split(" ")))
        .map(word -> word.length()))
    .subscribe(System.out::println);
 

上面的代碼可以正常使用,但是看起來不是純 Rx 實現。如果每個 Rx 中的函數都是這樣實現的,則最終多個操作函數一起使用就變成這樣了:

subscribe(
    lastOperator(
        middleOperator(
            firstOperator(source))))
 

這樣我們在倒著處理數據流! (^o^)/~

把操作函數串聯起來

Rx 中操作函數是通過串聯調用的方式來使用的,而不是嵌套調用。這種用法在 Java 中也很常見,每個函數都返回該對象本身,這樣就可以一直調用多個函數。例如 strings 對象:

String s = new String("Hi").toLowerCase().replace('a', 'c');
 

通過這種方式,可以直觀的看到對數據修改的順序,如果用了多個操作函數看起來也更加簡潔。

理想情況應該讓你的自定義操作函數和標準的操作函數一樣,可以串聯的調用。

Observable.range(0,10)
    .map(i -> i*2)
    .myOperator()
    .subscribe();
 

很多語言都直接支持該特性。但是 Java 并不直接支持。你不得不修改 Observable 的代碼來添加你的操作函數。但是你沒法告訴 RxJava 開發團隊,讓他們把你專用的操作函數給添加到 RxJava 標準類庫中。雖然可以通過繼承 Observable 的方式來添加你的操作函數,但是這樣做也沒法和標準的操作函數組合使用了。

compose

RxJava 提供了 compose 函數可以解決該問題。

public <R> Observable<R> compose(Observable.Transformer<? super T,? extends R> transformer)
 

一個 Transformer 接口。Transformer<T,R> 接口其實只是 Func1<Observable ,Observable > 接口的另外一種簡化形式。這是一個函數,把參數 Observable 轉換為 Observable , 和我們計算平均數的實現是一樣的:

Observable.just(3, 5, 6, 4, 4)
    .compose(Main::runningAverage)
    .subscribe(System.out::println);
 

在 Java 中沒法直接引用函數的名字,上面示例中,我們假設自定義的操作函數在 Main 類中定義。這樣自定義的操作函數就融合到串聯調用中了,只不過需要先調用 compose 函數。通過在新的類中實現 Observable.Transformer 接口可以實現更好的封裝:

public class RunningAverageimplements Observable.Transformer<Integer, Double> {
    private static class AverageAcc {
        public final int sum;
        public final int count;
        public AverageAcc(int sum, int count) {
            this.sum = sum;
            this.count = count;
        }
    }
 
    @Override
    public Observable<Double> call(Observable<Integer> source) {
        return source
            .scan(
                new AverageAcc(0,0),
                (acc, v) -> new AverageAcc(acc.sum + v, acc.count + 1))
            .filter(acc -> acc.count > 0)
            .map(acc -> acc.sum/(double)acc.count);
    }
}
 

然后可以這樣使用:

source.compose(new RunningAverage())
 

大部分的 Rx 操作函數都是有參數的,我們也可以支持參數。比如:

public class RunningAverageimplements Observable.Transformer<Integer, Double> {
    private static class AverageAcc {
        public final int sum;
        public final int count;
        public AverageAcc(int sum, int count) {
            this.sum = sum;
            this.count = count;
        }
    }
 
    final int threshold;
 
    public RunningAverage() {
        this.threshold = Integer.MAX_VALUE;
    }
 
    public RunningAverage(int threshold) {
        this.threshold = threshold;
    }
 
    @Override
    public Observable<Double> call(Observable<Integer> source) {
        return source
            .filter(i -> i< this.threshold)
            .scan(
                new AverageAcc(0,0),
                (acc, v) -> new AverageAcc(acc.sum + v, acc.count + 1))
            .filter(acc -> acc.count > 0)
            .map(acc -> acc.sum/(double)acc.count);
    }
}
 

這樣我們就可以調用 source.compose(new RunningAverage(5)) 了。由于 Java 語言的限制,我們沒法進一步優化這個使用情況了。 這里有一個更加復雜的自定義操作函數的示例

lift

一般而言,Rx 操作函數都做三件事:

1. 訂閱到源 Observable上并觀察他們發生的數據

2. 根據操作函數的目的來轉換數據流

3. 通過調用 onNext、 onError 和 onCompleted 函數 把轉換后的數據發射給自己的訂閱者。

compose 的參數為一個函數,該函數把一個 Observable 轉換為另外一個 Observable。并且需要手工的完成上面3步操作。并且假設你可以使用已有的操作函數完成轉換。如果沒有對應的操作函數,則需要使用傳統的面向對象的方式來處理。這樣你需要從數據流中提取轉換數據后重新發射出去。Observable.Transformer 通過訂閱到源 Observable 上來實現這個功能。

自定義多個操作函數以后,你會發現,很多模板代碼每次都需要編寫,如果進入底層代碼的話,有些模板代碼可以省略。 lift 操作函數和 compose 類似, 區別是 轉換的是一個 Subscriber 對象,而不是 Observable。

public final <R> Observable<R> lift(Observable.Operator<? extends R,? super T> lift)
 

Observable.Operator<R,T> 是 Func1<Subscriber<? super R>,Subscriber<? super T>> 的變體, 是一個函數用來把一個 Subscriber 轉換為 Subscriber 。直接和 Subscriber 打交道可以避免訪問 Observable。 lift 函數自動創建 Observable 并訂閱。

如果你研究一下這個函數,可以發現好像這個函數是倒著聲明的:為了把 Observable 轉換為 Observable ,需要一個函數把 Subscriber 轉換為 Subscriber 。 為什么會這樣呢? 還記得一個訂閱者在串聯調用的末尾訂閱的,然后傳遞給源 Observable。也就是說, Subscription 是倒著操作的。每個操作函數收到一個 Subscription,并使用這個 Subscription 來創建一個新的 Subscription 來處理這個操作。

下面的示例中,重新自定義實現 map 操作函數:

class MyMap<T,R> implements Observable.Operator<R, T> {
 
    private Func1<T,R> transformer;
 
    public MyMap(Func1<T,R> transformer) {
        this.transformer = transformer;
    }
 
    @Override
    public Subscriber<? super T> call(Subscriber<? super R> subscriber) {
        return new Subscriber<T>() {
 
            @Override
            public void onCompleted() {
                if (!subscriber.isUnsubscribed())
                    subscriber.onCompleted();
            }
 
            @Override
            public void onError(Throwable e) {
                if (!subscriber.isUnsubscribed())
                    subscriber.onError(e);
            }
 
            @Override
            public void onNext(T t) {
                if (!subscriber.isUnsubscribed())
                    subscriber.onNext(transformer.call(t));
            }
 
        };
    }
}
 

map 操作函數需要一個參數把 T 轉換為 R。上面 的實現中,transformer 干了這件事。關鍵點在于 call 函數的調用。我們收到了一個 Subscriber 對象,該對象需要一個 R 類型數據。我們為個 Subscriber 創建了一個Subscriber 對象,并把 T 轉換為 R 類型數據然后發射給 Subscriber 。 lift 操作函數處理接受 Subscriber 的模板代碼,并且使用 Subscriber 訂閱到源 Observable上。

使用 Observable.Operator 和使用 Observable.Transformer 一樣簡單:

Observable.range(0, 5)
    .lift(new MyMap<Integer, String>(i -> i + "!"))
    .subscribe(System.out::println);
 

結果:

0!
1!
2!
3!
4!
 

Java 構造函數無法推倒類型,所以可以用一個靜態函數來實現該功能:

public static <T,R> MyMap<T,R> create(Func1<T,R> transformer) {
    return new MyMap<T,R>(transformer);
}
 

然后這樣使用:

Observable.range(0, 5)
    .lift(MyMap.create(i -> i + "!"))
    .subscribe(System.out::println);
 

就像實現 Observable.Operator 中手動把數據發射給 Subscriber 一樣,需要考慮如下情況:

– Subscriber 可以隨時取消訂閱,所以需要檢查是否還在訂閱著,如果取消訂閱了則不發射數據

– 你需要遵守 Rx 的約定,調用 onNext 發射數據,依 onCompleted 或者 onError 來結束數據流

– 如果需要異步處理數據或者調度,則需要使用 Rx 的 Schedulers 。這樣你的操作函數將是 可測試 的。

serialize

如果你無法確保自定義的操作符符合 Rx 的約定,例如從多個源異步獲取數據,則可以使用 serialize 操作函數。 serialize 可以把一個不符合約定的 Observable 轉換為一個符合約定的 Observable。

下面創建一個不符合約定的 Observable,并且訂閱到該 Observable上:

Observable<Integer> source = Observable.create(o -> {
    o.onNext(1);
    o.onNext(2);
    o.onCompleted();
    o.onNext(3);
    o.onCompleted();
});
 
source.doOnUnsubscribe(() -> System.out.println("Unsubscribed"))
    .subscribe(
        System.out::println,
        System.out::println,
        () -> System.out.println("Completed"));
 

結果:

1
2
Completed
Unsubscribed
 

先不管上面的 Observable 發射的數據,訂閱結束的情況看起來符合 Rx 約定。 這是由于 subscribe 認為當前數據流結束的時候會主動結束這個 Subscription。但這并不意味著總是這樣的。 還有一個函數為 unsafeSubscribe ,該函數不會自動取消訂閱。

Observable<Integer> source = Observable.create(o -> {
    o.onNext(1);
    o.onNext(2);
    o.onCompleted();
    o.onNext(3);
    o.onCompleted();
});
 
source.doOnUnsubscribe(() -> System.out.println("Unsubscribed"))
    .unsafeSubscribe(new Subscriber<Integer>() {
        @Override
        public void onCompleted() {
            System.out.println("Completed");
        }
 
        @Override
        public void onError(Throwable e) {
            System.out.println(e);
        }
 
        @Override
        public void onNext(Integer t) {
            System.out.println(t);
        }
});
 

結果:

1
2
Completed
3
Completed
 

上面的示例最后就沒有打印 Unsubscribed 字符串。

unsafeSubscribe 也不能很好的處理錯誤情況。所以該函數幾乎沒用。在文檔中說:該函數應該僅僅在自定義操作函數中處理嵌套訂閱的情況。 為了避免這種操作函數接受到不合法的數據流,我們可以在其上應用 serialize 操作函數:

Observable<Integer> source = Observable.create(o -> {
        o.onNext(1);
        o.onNext(2);
        o.onCompleted();
        o.onNext(3);
        o.onCompleted();
    })
    .cast(Integer.class)
    .serialize();;
 
 
source.doOnUnsubscribe(() -> System.out.println("Unsubscribed"))
    .unsafeSubscribe(new Subscriber<Integer>() {
        @Override
        public void onCompleted() {
            System.out.println("Completed");
        }
 
        @Override
        public void onError(Throwable e) {
            System.out.println(e);
        }
 
        @Override
        public void onNext(Integer t) {
            System.out.println(t);
        }
});
 

結果:

1
2
Completed
 

盡管上面的代碼中沒有調用unsubscribe, 但是數據流事件依然符合約定。最后也收到了完成事件。

lift 函數的額外好處

標準的操作函數也用 lift 實現的,如果你的自定義操作函數也通過 lift 實現,則 lift 在運行的時候就變成了一個 hot 函數, JVM 在運行的時候會優化該函數的調用,性能會有所提升。

在 lift 和 compose 之間做選擇

lift 和 compose 都是元操作符(meta-operators),用來把自定義的操作函數注射到串聯調用中。這兩種情況下,自定義操作符既可以用函數實現也可以用類實現:

– compose: Observable.Transformer 或者 Func<Observable , Observable

>

– lift: Observable.Operator 或者 Func<Subscriber , Subscriber >

 

理論上,每個操作函數都可以實現 Observable.Operator 和 Observable.Transformer。如果選擇是根據使用的便捷性和你想避免什么樣的模板代碼:

– 如果自定義的操作函數只是現有的操作函數的組合,則使用 compose 比較自然

– 如果自定義從操作函數需要從數據流中獲取數據,并做一些處理后再次發射數據到數據流,則使用 lift 比較好。

 

來自: http://blog.chengyunfeng.com/?p=976

 本文由用戶 ythtgw5326 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!