RxJava 教程第三部分:馴服數據流之副作用
前面兩部分,我們學習到了如何創建 Observable以及如何從 Observable 中獲取數據。本部分將介紹一些更高級的用法,以及一些在大型項目中的最佳實踐。
Side effects(副作用)
沒有副作用的函數通過參數和返回值來程序中的其他函數相互調用。當一個函數中的操作會影響其他函數中的返回結果時,我們稱該函數有副作用。寫數據到磁盤、記錄日志、打印調試信息都是常見的副作用表現。Java 中在一個函數里面修改另外一個函數使用的對象的值是合法的。
副作用有時候很有用也有必要使用。但是有時候也有很多坑。 Rx 開發者應避免非必要的副作用,如果必須使用副作用的時候,則應該寫下詳細的說明文檔。
副作用的問題
Functional programming 通常避免副作用。帶有副作用的函數(尤其是可以修改參數狀態的)要求開發者了解跟多實現的細節。增加了函數的復雜度并且導致函數被錯誤理解和使用,并且難以維護。副作用有故意的和無意的。可以通過封裝或者使用不可變對象來避免副作用。有一些明智的封裝規則可以顯著的提高你 Rx 代碼的可維護性。
我們使用一個帶有副作用的示例來演示。 Java 中不可以在 Lambda 或者 匿名函數中引用外層的非 final 變量。 但是 Java 中的 final 變量只是保證了該編譯引用的對象地址不變,但是對象本身的狀態還是可以改變的。例如,下面是一個用來計數的一個類:
class Inc {
private int count = 0;
public void inc() {
count++;
}
public int getCount() {
return count;
}
}
即使是一個 final 的 Inc 變量,還是可以通過調用其函數來修改他的狀態。 注意 Java 并沒有強制顯式使用 final ,如果在你 Lambda 表達式中修改外層變量的引用對象地址(把外層變量重新復制為其他對象),則會出錯。
Observable<String> values = Observable.just("請", "不要", "有", "副作用");
Inc index = new Inc();
Observable<String> indexed =
values.map(w -> {
index.inc();
return w;
});
indexed.subscribe(w -> System.out.println(index.getCount() + ": " + w));
結果:
1: 請
2: 不要
3: 有
4: 副作用
目前還來看不出來問題。但是如果我們在該 Observable 上再次訂閱一個 subscriber,則問題就出來了。
Observable<String> values = Observable.just("請", "不要", "有", "副作用");
Inc index = new Inc();
Observable<String> indexed =
values.map(w -> {
index.inc();
return w;
});
indexed.subscribe(w -> System.out.println("1st observer: " + index.getCount() + ": " + w));
indexed.subscribe(w -> System.out.println("2nd observer: " + index.getCount() + ": " + w));
結果:
1st observer: 1: 請
1st observer: 2: 不要
1st observer: 3: 有
1st observer: 4: 副作用
2nd observer: 5: 請
2nd observer: 6: 不要
2nd observer: 7: 有
2nd observer: 8: 副作用
第二個 Subscriber 的索引是從 5 開始的。這明顯不是我們想要的結果。這里的副作用很容易發現,但是真實應用中的副作用有些很難發現。
在數據流中組織數據
可以通過 scan 函數來計算每個數據的發射順序:
class Indexed <T> {
public final int index;
public final T item;
public Indexed(int index, T item) {
this.index = index;
this.item = item;
}
}
Observable<String> values = Observable.just("No", "side", "effects", "please");
Observable<Indexed<String>> indexed =
values.scan(
new Indexed<String>(0, null),
(prev,v) -> new Indexed<String>(prev.index+1, v))
.skip(1);
indexed.subscribe(w -> System.out.println("1st observer: " + w.index + ": " + w.item));
indexed.subscribe(w -> System.out.println("2nd observer: " + w.index + ": " + w.item));
結果:
1st observer: 1: No
1st observer: 2: side
1st observer: 3: effects
1st observer: 4: please
2nd observer: 1: No
2nd observer: 2: side
2nd observer: 3: effects
2nd observer: 4: please
上面的結果為正確的。 我們把兩個 Subscriber 共享的屬性給刪除了,這樣他們就沒法相互影響了。
do
像記錄日志這樣的情況是需要副作用的。subscribe 總是有副作用,否則的話這個函數就沒啥用了。雖然可以在 subscriber 中記錄日志信息,但是這樣做有缺點:
1. 在核心業務代碼中混合了不太重要的日志代碼
2. 如果想記錄數據流中數據的中間狀態,比如 執行某個操作之前和之后,則需要一個額外的 Subscriber 來實現。這樣可能會導致最終 Subscriber 和 日志 Subscriber 看到的狀態是不一樣的。
下面的這些函數讓我們可以更加簡潔的實現需要的功能:
public final Observable<T> doOnCompleted(Action0 onCompleted)
public final Observable<T> doOnEach(Action1<Notification<? super T>> onNotification)
public final Observable<T> doOnEach(Observer<? super T> observer)
public final Observable<T> doOnError(Action1<java.lang.Throwable> onError)
public final Observable<T> doOnNext(Action1<? super T> onNext)
public final Observable<T> doOnTerminate(Action0 onTerminate)
這些函數在 Observable 每次事件發生的時候執行,并且返回 Observable。 這些函數明確的表明了他們有副作用,使用起來更加不易混淆:
Observable<String> values = Observable.just("side", "effects");
values
.doOnEach(new PrintSubscriber("Log"))
.map(s -> s.toUpperCase())
.subscribe(new PrintSubscriber("Process"));
結果:
Log: side
Process: SIDE
Log: effects
Process: EFFECTS
Log: Completed
Process: Completed
這里使用了上一章使用的幫助類 PrintSubscriber 。這些 do 開頭的函數并不影響最終的 Subscriber。 例如:
static Observable<String> service() {
return Observable.just("First", "Second", "Third")
.doOnEach(new PrintSubscriber("Log"));
}
可以這樣使用該函數:
service()
.map(s -> s.toUpperCase())
.filter(s -> s.length() > 5)
.subscribe(new PrintSubscriber("Process"));
結果:
Log: First
Log: Second
Process: SECOND
Log: Third
Log: Completed
Process: Completed
即便最終使用的時候過濾了一些數據,但是我們記錄了服務器返回的所有結果。
這些函數中 doOnTerminate 在 Observable 結束發射數據之前發生。不管是因為 onCompleted 還是 onError 導致數據流結束。 另外還有一個 finallyDo 函數在 Observable 結束發射之后發生。
doOnSubscribe, doOnUnsubscribe
public final Observable<T> doOnSubscribe(Action0 subscribe)
public final Observable<T> doOnUnsubscribe(Action0 unsubscribe)
Subscription 和 unsubscription 并不是 Observable 發射的事件。而是 該 Observable 被 Observer 訂閱和取消訂閱的事件。
ReplaySubject<Integer> subject = ReplaySubject.create();
Observable<Integer> values = subject
.doOnSubscribe(() -> System.out.println("New subscription"))
.doOnUnsubscribe(() -> System.out.println("Subscription over"));
Subscription s1 = values.subscribe(new PrintSubscriber("1st"));
subject.onNext(0);
Subscription s2 = values.subscribe(new PrintSubscriber("2st"));
subject.onNext(1);
s1.unsubscribe();
subject.onNext(2);
subject.onNext(3);
subject.onCompleted();
結果:
New subscription
1st: 0
New subscription
2st: 0
1st: 1
2st: 1
Subscription over
2st: 2
2st: 3
2st: Completed
Subscription over
使用 AsObservable 函數來封裝
Rx 使用面向對象的 Java 語言來實現 functional programming 風格編碼。 需要注意 面向對象中的問題。 例如下面一個天真版的返回 observable 的服務:
public class BrakeableService {
public BehaviorSubject<String> items = BehaviorSubject.create("Greet");
public void play() {
items.onNext("Hello");
items.onNext("and");
items.onNext("goodbye");
}
}
上面的實現中, 調用者可以自己修改 items 引用的對象,也可以修改 Observable 發射的數據。所以需要對調用者隱藏 Subject 接口,只暴露 Observable 接口:
public class BrakeableService {
private final BehaviorSubject<String> items = BehaviorSubject.create("Greet");
public Observable<String> getValues() {
return items;
}
public void play() {
items.onNext("Hello");
items.onNext("and");
items.onNext("goodbye");
}
}
上面的改進版本,看起來我們返回的是一個 Observable,但該返回的對象是不安全的,返回的其實是一個 Subject。
asObservable
由于 Observable 是不可變的,所以 asObservable 函數是為了把一個 Observable 對象包裝起來并安全的分享給其他人使用。
public Observable<String> getValues() {
return items.asObservable();
}
這樣的話,我們的 Subject 對象就被合理的保護起來了。這樣其他惡意人員也無法修改你的 Observable 返回的數據了,在使用的過程中也可以避免出現錯誤了。
無法保護可變對象
在 RxJava 中, Rx 傳遞的是對象引用 而不是 對象的副本。在一個 地方修改了對象,在傳遞路徑的其他地方上也是可見的。例如下面一個可變的對象:
class Data {
public int id;
public String name;
public Data(int id, String name) {
this.id = id;
this.name = name;
}
}
使用該對象的一個 Observable 和兩個 Subscriber:
Observable<Data> data = Observable.just(
new Data(1, "Microsoft"),
new Data(2, "Netflix")
);
data.subscribe(d -> d.name = "Garbage");
data.subscribe(d -> System.out.println(d.id + ": " + d.name));
結果:
1: Garbage
2: Garbage
第一個 Subscriber 先處理每個數據,在第一個 Subscriber 完成后第二個 Subscriber 開始處理數據,由于 Observable 在傳遞路徑中使用的是對象引用,所以 第一個 Subscriber 中對對象做的修改,第二個 Subscriber 也會看到。
來自:http://blog.chengyunfeng.com/?p=968