RxJava 教程第三部分:馴服數據流之自定義操作函數
RxJava 提供了很多 操作函數 。加上各種重載函數,一共有 300 多個操作函數。這些函數中只有很少一部分是核心的操作函數,離開這些核心的函數根本就沒法使用 RxJava 了。其他的大部分函數只是一些便捷函數,方便開發者使用,并且他們的名字基本都說明了他們的用法。比如 如果操作函數 source.First(user -> user.isOnline()) 不存在,則我們依然可以使用 source.filter(user -> user.isOnline()).First() 來實現同樣的功能。
盡管提供了 300 多個操作函數,但這些也都是很基本的操作。 Rx 提供了基礎的功能,在此之上可以建構更加復雜的功能。最終你會遇到定義可重用代碼的地方。 在 標準 Java 中重用代碼是通過類和函數來實現,而在 Rx 中,是通過自定義操作函數來實現代碼重用。例如,在您的程序中,計算數字流的平均數可能經常使用。但是 Observable 中并沒有該函數,你可以自定義一個:
class AverageAcc {
public final int sum;
public final int count;
public AverageAcc(int sum, int count) {
this.sum = sum;
this.count = count;
}
}
source
.scan(
new AverageAcc(0,0),
(acc, v) -> new AverageAcc(acc.sum + v, acc.count + 1))
.filter(acc -> acc.count > 0)
.map(acc -> acc.sum/(double)acc.count);
上面的代碼實現了功能,但是沒法重用。在標準的 Java 中可能會定義一個可以處理各種數據的函數,所以 一般的 Java 開發者可能一開始想到用一個函數來實現:
public static Observable<Double> runningAverage(Observable<Integer> source) {
return source
.scan(
new AverageAcc(0,0),
(acc, v) -> new AverageAcc(acc.sum + v, acc.count + 1))
.filter(acc -> acc.count > 0)
.map(acc -> acc.sum/(double)acc.count);
}
然后就可以重用了:
runningAverage(Observable.just(3, 5, 6, 4, 4))
.subscribe(System.out::println);
結果:
3.0
4.0
4.666666666666667
4.5
4.4
由于上面的代碼很簡單,所以看起來還不錯。如果我們用自定義的操作函數做一些復雜的操作。例如,源 Observable 為一個句子,把這個句子分割沒每個單詞,并且把每個單詞的長度作為數字的輸入:
runningAverage(
Observable.just("The brown fox jumped and I forget the rest")
.flatMap(phrase -> Observable.from(phrase.split(" ")))
.map(word -> word.length()))
.subscribe(System.out::println);
上面的代碼可以正常使用,但是看起來不是純 Rx 實現。如果每個 Rx 中的函數都是這樣實現的,則最終多個操作函數一起使用就變成這樣了:
subscribe(
lastOperator(
middleOperator(
firstOperator(source))))
這樣我們在倒著處理數據流! (^o^)/~
把操作函數串聯起來
Rx 中操作函數是通過串聯調用的方式來使用的,而不是嵌套調用。這種用法在 Java 中也很常見,每個函數都返回該對象本身,這樣就可以一直調用多個函數。例如 strings 對象:
String s = new String("Hi").toLowerCase().replace('a', 'c');
通過這種方式,可以直觀的看到對數據修改的順序,如果用了多個操作函數看起來也更加簡潔。
理想情況應該讓你的自定義操作函數和標準的操作函數一樣,可以串聯的調用。
Observable.range(0,10)
.map(i -> i*2)
.myOperator()
.subscribe();
很多語言都直接支持該特性。但是 Java 并不直接支持。你不得不修改 Observable 的代碼來添加你的操作函數。但是你沒法告訴 RxJava 開發團隊,讓他們把你專用的操作函數給添加到 RxJava 標準類庫中。雖然可以通過繼承 Observable 的方式來添加你的操作函數,但是這樣做也沒法和標準的操作函數組合使用了。
compose
RxJava 提供了 compose 函數可以解決該問題。
public <R> Observable<R> compose(Observable.Transformer<? super T,? extends R> transformer)
一個 Transformer 接口。Transformer<T,R> 接口其實只是 Func1<Observable ,Observable > 接口的另外一種簡化形式。這是一個函數,把參數 Observable 轉換為 Observable , 和我們計算平均數的實現是一樣的:
Observable.just(3, 5, 6, 4, 4)
.compose(Main::runningAverage)
.subscribe(System.out::println);
在 Java 中沒法直接引用函數的名字,上面示例中,我們假設自定義的操作函數在 Main 類中定義。這樣自定義的操作函數就融合到串聯調用中了,只不過需要先調用 compose 函數。通過在新的類中實現 Observable.Transformer 接口可以實現更好的封裝:
public class RunningAverageimplements Observable.Transformer<Integer, Double> {
private static class AverageAcc {
public final int sum;
public final int count;
public AverageAcc(int sum, int count) {
this.sum = sum;
this.count = count;
}
}
@Override
public Observable<Double> call(Observable<Integer> source) {
return source
.scan(
new AverageAcc(0,0),
(acc, v) -> new AverageAcc(acc.sum + v, acc.count + 1))
.filter(acc -> acc.count > 0)
.map(acc -> acc.sum/(double)acc.count);
}
}
然后可以這樣使用:
source.compose(new RunningAverage())
大部分的 Rx 操作函數都是有參數的,我們也可以支持參數。比如:
public class RunningAverageimplements Observable.Transformer<Integer, Double> {
private static class AverageAcc {
public final int sum;
public final int count;
public AverageAcc(int sum, int count) {
this.sum = sum;
this.count = count;
}
}
final int threshold;
public RunningAverage() {
this.threshold = Integer.MAX_VALUE;
}
public RunningAverage(int threshold) {
this.threshold = threshold;
}
@Override
public Observable<Double> call(Observable<Integer> source) {
return source
.filter(i -> i< this.threshold)
.scan(
new AverageAcc(0,0),
(acc, v) -> new AverageAcc(acc.sum + v, acc.count + 1))
.filter(acc -> acc.count > 0)
.map(acc -> acc.sum/(double)acc.count);
}
}
這樣我們就可以調用 source.compose(new RunningAverage(5)) 了。由于 Java 語言的限制,我們沒法進一步優化這個使用情況了。 這里有一個更加復雜的自定義操作函數的示例 。
lift
一般而言,Rx 操作函數都做三件事:
1. 訂閱到源 Observable上并觀察他們發生的數據
2. 根據操作函數的目的來轉換數據流
3. 通過調用 onNext、 onError 和 onCompleted 函數 把轉換后的數據發射給自己的訂閱者。
compose 的參數為一個函數,該函數把一個 Observable 轉換為另外一個 Observable。并且需要手工的完成上面3步操作。并且假設你可以使用已有的操作函數完成轉換。如果沒有對應的操作函數,則需要使用傳統的面向對象的方式來處理。這樣你需要從數據流中提取轉換數據后重新發射出去。Observable.Transformer 通過訂閱到源 Observable 上來實現這個功能。
自定義多個操作函數以后,你會發現,很多模板代碼每次都需要編寫,如果進入底層代碼的話,有些模板代碼可以省略。 lift 操作函數和 compose 類似, 區別是 轉換的是一個 Subscriber 對象,而不是 Observable。
public final <R> Observable<R> lift(Observable.Operator<? extends R,? super T> lift)
Observable.Operator<R,T> 是 Func1<Subscriber<? super R>,Subscriber<? super T>> 的變體, 是一個函數用來把一個 Subscriber 轉換為 Subscriber 。直接和 Subscriber 打交道可以避免訪問 Observable。 lift 函數自動創建 Observable 并訂閱。
如果你研究一下這個函數,可以發現好像這個函數是倒著聲明的:為了把 Observable 轉換為 Observable ,需要一個函數把 Subscriber 轉換為 Subscriber 。 為什么會這樣呢? 還記得一個訂閱者在串聯調用的末尾訂閱的,然后傳遞給源 Observable。也就是說, Subscription 是倒著操作的。每個操作函數收到一個 Subscription,并使用這個 Subscription 來創建一個新的 Subscription 來處理這個操作。
下面的示例中,重新自定義實現 map 操作函數:
class MyMap<T,R> implements Observable.Operator<R, T> {
private Func1<T,R> transformer;
public MyMap(Func1<T,R> transformer) {
this.transformer = transformer;
}
@Override
public Subscriber<? super T> call(Subscriber<? super R> subscriber) {
return new Subscriber<T>() {
@Override
public void onCompleted() {
if (!subscriber.isUnsubscribed())
subscriber.onCompleted();
}
@Override
public void onError(Throwable e) {
if (!subscriber.isUnsubscribed())
subscriber.onError(e);
}
@Override
public void onNext(T t) {
if (!subscriber.isUnsubscribed())
subscriber.onNext(transformer.call(t));
}
};
}
}
map 操作函數需要一個參數把 T 轉換為 R。上面 的實現中,transformer 干了這件事。關鍵點在于 call 函數的調用。我們收到了一個 Subscriber 對象,該對象需要一個 R 類型數據。我們為個 Subscriber 創建了一個Subscriber 對象,并把 T 轉換為 R 類型數據然后發射給 Subscriber 。 lift 操作函數處理接受 Subscriber 的模板代碼,并且使用 Subscriber 訂閱到源 Observable上。
使用 Observable.Operator 和使用 Observable.Transformer 一樣簡單:
Observable.range(0, 5)
.lift(new MyMap<Integer, String>(i -> i + "!"))
.subscribe(System.out::println);
結果:
0!
1!
2!
3!
4!
Java 構造函數無法推倒類型,所以可以用一個靜態函數來實現該功能:
public static <T,R> MyMap<T,R> create(Func1<T,R> transformer) {
return new MyMap<T,R>(transformer);
}
然后這樣使用:
Observable.range(0, 5)
.lift(MyMap.create(i -> i + "!"))
.subscribe(System.out::println);
就像實現 Observable.Operator 中手動把數據發射給 Subscriber 一樣,需要考慮如下情況:
– Subscriber 可以隨時取消訂閱,所以需要檢查是否還在訂閱著,如果取消訂閱了則不發射數據
– 你需要遵守 Rx 的約定,調用 onNext 發射數據,依 onCompleted 或者 onError 來結束數據流
– 如果需要異步處理數據或者調度,則需要使用 Rx 的 Schedulers 。這樣你的操作函數將是 可測試 的。
serialize
如果你無法確保自定義的操作符符合 Rx 的約定,例如從多個源異步獲取數據,則可以使用 serialize 操作函數。 serialize 可以把一個不符合約定的 Observable 轉換為一個符合約定的 Observable。
下面創建一個不符合約定的 Observable,并且訂閱到該 Observable上:
Observable<Integer> source = Observable.create(o -> {
o.onNext(1);
o.onNext(2);
o.onCompleted();
o.onNext(3);
o.onCompleted();
});
source.doOnUnsubscribe(() -> System.out.println("Unsubscribed"))
.subscribe(
System.out::println,
System.out::println,
() -> System.out.println("Completed"));
結果:
1
2
Completed
Unsubscribed
先不管上面的 Observable 發射的數據,訂閱結束的情況看起來符合 Rx 約定。 這是由于 subscribe 認為當前數據流結束的時候會主動結束這個 Subscription。但這并不意味著總是這樣的。 還有一個函數為 unsafeSubscribe ,該函數不會自動取消訂閱。
Observable<Integer> source = Observable.create(o -> {
o.onNext(1);
o.onNext(2);
o.onCompleted();
o.onNext(3);
o.onCompleted();
});
source.doOnUnsubscribe(() -> System.out.println("Unsubscribed"))
.unsafeSubscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.println("Completed");
}
@Override
public void onError(Throwable e) {
System.out.println(e);
}
@Override
public void onNext(Integer t) {
System.out.println(t);
}
});
結果:
1
2
Completed
3
Completed
上面的示例最后就沒有打印 Unsubscribed 字符串。
unsafeSubscribe 也不能很好的處理錯誤情況。所以該函數幾乎沒用。在文檔中說:該函數應該僅僅在自定義操作函數中處理嵌套訂閱的情況。 為了避免這種操作函數接受到不合法的數據流,我們可以在其上應用 serialize 操作函數:
Observable<Integer> source = Observable.create(o -> {
o.onNext(1);
o.onNext(2);
o.onCompleted();
o.onNext(3);
o.onCompleted();
})
.cast(Integer.class)
.serialize();;
source.doOnUnsubscribe(() -> System.out.println("Unsubscribed"))
.unsafeSubscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.println("Completed");
}
@Override
public void onError(Throwable e) {
System.out.println(e);
}
@Override
public void onNext(Integer t) {
System.out.println(t);
}
});
結果:
1
2
Completed
盡管上面的代碼中沒有調用unsubscribe, 但是數據流事件依然符合約定。最后也收到了完成事件。
lift 函數的額外好處
標準的操作函數也用 lift 實現的,如果你的自定義操作函數也通過 lift 實現,則 lift 在運行的時候就變成了一個 hot 函數, JVM 在運行的時候會優化該函數的調用,性能會有所提升。
在 lift 和 compose 之間做選擇
lift 和 compose 都是元操作符(meta-operators),用來把自定義的操作函數注射到串聯調用中。這兩種情況下,自定義操作符既可以用函數實現也可以用類實現:
– compose: Observable.Transformer 或者 Func<Observable , Observable
>
– lift: Observable.Operator 或者 Func<Subscriber , Subscriber >
理論上,每個操作函數都可以實現 Observable.Operator 和 Observable.Transformer。如果選擇是根據使用的便捷性和你想避免什么樣的模板代碼:
– 如果自定義的操作函數只是現有的操作函數的組合,則使用 compose 比較自然
– 如果自定義從操作函數需要從數據流中獲取數據,并做一些處理后再次發射數據到數據流,則使用 lift 比較好。