RxJava 教程第三部分:馴服數據流之副作用

T-top 8年前發布 | 12K 次閱讀 RxJava Java開發

前面兩部分,我們學習到了如何創建 Observable以及如何從 Observable 中獲取數據。本部分將介紹一些更高級的用法,以及一些在大型項目中的最佳實踐。

Side effects(副作用)

沒有副作用的函數通過參數和返回值來程序中的其他函數相互調用。當一個函數中的操作會影響其他函數中的返回結果時,我們稱該函數有副作用。寫數據到磁盤、記錄日志、打印調試信息都是常見的副作用表現。Java 中在一個函數里面修改另外一個函數使用的對象的值是合法的。

副作用有時候很有用也有必要使用。但是有時候也有很多坑。 Rx 開發者應避免非必要的副作用,如果必須使用副作用的時候,則應該寫下詳細的說明文檔。

副作用的問題

Functional programming 通常避免副作用。帶有副作用的函數(尤其是可以修改參數狀態的)要求開發者了解跟多實現的細節。增加了函數的復雜度并且導致函數被錯誤理解和使用,并且難以維護。副作用有故意的和無意的。可以通過封裝或者使用不可變對象來避免副作用。有一些明智的封裝規則可以顯著的提高你 Rx 代碼的可維護性。

我們使用一個帶有副作用的示例來演示。 Java 中不可以在 Lambda 或者 匿名函數中引用外層的非 final 變量。 但是 Java 中的 final 變量只是保證了該編譯引用的對象地址不變,但是對象本身的狀態還是可以改變的。例如,下面是一個用來計數的一個類:

class Inc {

    private int count = 0;

    public void inc() {

        count++;

    }

    public int getCount() {

        return count;

    }

}


 

即使是一個 final 的 Inc 變量,還是可以通過調用其函數來修改他的狀態。 注意 Java 并沒有強制顯式使用 final ,如果在你 Lambda 表達式中修改外層變量的引用對象地址(把外層變量重新復制為其他對象),則會出錯。

Observable<String> values = Observable.just("請", "不要", "有", "副作用");



Inc index = new Inc();

Observable<String> indexed =

        values.map(w -> {

            index.inc();

            return w;

        });

indexed.subscribe(w -> System.out.println(index.getCount() + ": " + w));

結果:

1: 請

2: 不要

3: 有

4: 副作用

目前還來看不出來問題。但是如果我們在該 Observable 上再次訂閱一個 subscriber,則問題就出來了。

Observable<String> values = Observable.just("請", "不要", "有", "副作用");



Inc index = new Inc();

Observable<String> indexed =

        values.map(w -> {

            index.inc();

            return w;

        });

indexed.subscribe(w -> System.out.println("1st observer: " + index.getCount() + ": " + w));

indexed.subscribe(w -> System.out.println("2nd observer: " + index.getCount() + ": " + w));

結果:

1st observer: 1: 請

1st observer: 2: 不要

1st observer: 3: 有

1st observer: 4: 副作用

2nd observer: 5: 請

2nd observer: 6: 不要

2nd observer: 7: 有

2nd observer: 8: 副作用


第二個 Subscriber 的索引是從 5 開始的。這明顯不是我們想要的結果。這里的副作用很容易發現,但是真實應用中的副作用有些很難發現。

在數據流中組織數據

可以通過 scan 函數來計算每個數據的發射順序:

class Indexed <T> {

    public final int index;

    public final T item;

    public Indexed(int index, T item) {

        this.index = index;

        this.item = item;

    }

}
Observable<String> values = Observable.just("No", "side", "effects", "please");



Observable<Indexed<String>> indexed =

    values.scan(

            new Indexed<String>(0, null),

            (prev,v) -> new Indexed<String>(prev.index+1, v))

        .skip(1);

indexed.subscribe(w -> System.out.println("1st observer: " + w.index + ": " + w.item));

indexed.subscribe(w -> System.out.println("2nd observer: " + w.index + ": " + w.item));

結果:

1st observer: 1: No

1st observer: 2: side

1st observer: 3: effects

1st observer: 4: please

2nd observer: 1: No

2nd observer: 2: side

2nd observer: 3: effects

2nd observer: 4: please

上面的結果為正確的。 我們把兩個 Subscriber 共享的屬性給刪除了,這樣他們就沒法相互影響了。

do

像記錄日志這樣的情況是需要副作用的。subscribe 總是有副作用,否則的話這個函數就沒啥用了。雖然可以在 subscriber 中記錄日志信息,但是這樣做有缺點:
1. 在核心業務代碼中混合了不太重要的日志代碼
2. 如果想記錄數據流中數據的中間狀態,比如 執行某個操作之前和之后,則需要一個額外的 Subscriber 來實現。這樣可能會導致最終 Subscriber 和 日志 Subscriber 看到的狀態是不一樣的。

下面的這些函數讓我們可以更加簡潔的實現需要的功能:

public final Observable<T> doOnCompleted(Action0 onCompleted)

public final Observable<T> doOnEach(Action1<Notification<? super T>> onNotification)

public final Observable<T> doOnEach(Observer<? super T> observer)

public final Observable<T> doOnError(Action1<java.lang.Throwable> onError)

public final Observable<T> doOnNext(Action1<? super T> onNext)

public final Observable<T> doOnTerminate(Action0 onTerminate)

這些函數在 Observable 每次事件發生的時候執行,并且返回 Observable。 這些函數明確的表明了他們有副作用,使用起來更加不易混淆:

Observable<String> values = Observable.just("side", "effects");



values

    .doOnEach(new PrintSubscriber("Log"))

    .map(s -> s.toUpperCase())

    .subscribe(new PrintSubscriber("Process"));

結果:

Log: side

Process: SIDE

Log: effects

Process: EFFECTS

Log: Completed

Process: Completed


 

這里使用了上一章使用的幫助類 PrintSubscriber 。這些 do 開頭的函數并不影響最終的 Subscriber。 例如:

static Observable<String> service() {

    return  Observable.just("First", "Second", "Third")

            .doOnEach(new PrintSubscriber("Log"));

}

可以這樣使用該函數:

service()

    .map(s -> s.toUpperCase())

    .filter(s -> s.length() > 5)

    .subscribe(new PrintSubscriber("Process"));


結果:

Log: First

Log: Second

Process: SECOND

Log: Third

Log: Completed

Process: Completed


即便最終使用的時候過濾了一些數據,但是我們記錄了服務器返回的所有結果。

這些函數中 doOnTerminate 在 Observable 結束發射數據之前發生。不管是因為 onCompleted 還是 onError 導致數據流結束。 另外還有一個 finallyDo 函數在 Observable 結束發射之后發生。

doOnSubscribe, doOnUnsubscribe

public final Observable<T> doOnSubscribe(Action0 subscribe)

public final Observable<T> doOnUnsubscribe(Action0 unsubscribe)


Subscription 和 unsubscription 并不是 Observable 發射的事件。而是 該 Observable 被 Observer 訂閱和取消訂閱的事件。

ReplaySubject<Integer> subject = ReplaySubject.create();

Observable<Integer> values = subject

    .doOnSubscribe(() -> System.out.println("New subscription"))

    .doOnUnsubscribe(() -> System.out.println("Subscription over"));



Subscription s1 = values.subscribe(new PrintSubscriber("1st"));

subject.onNext(0);

Subscription s2 = values.subscribe(new PrintSubscriber("2st"));

subject.onNext(1);

s1.unsubscribe();

subject.onNext(2);

subject.onNext(3);

subject.onCompleted();

結果:

New subscription

1st: 0

New subscription

2st: 0

1st: 1

2st: 1

Subscription over

2st: 2

2st: 3

2st: Completed

Subscription over

使用 AsObservable 函數來封裝

Rx 使用面向對象的 Java 語言來實現 functional programming 風格編碼。 需要注意 面向對象中的問題。 例如下面一個天真版的返回 observable 的服務:

public class BrakeableService {

    public BehaviorSubject<String> items = BehaviorSubject.create("Greet");

    public void play() {

        items.onNext("Hello");

        items.onNext("and");

        items.onNext("goodbye");

    }

}

上面的實現中, 調用者可以自己修改 items 引用的對象,也可以修改 Observable 發射的數據。所以需要對調用者隱藏 Subject 接口,只暴露 Observable 接口:

public class BrakeableService {

    private final BehaviorSubject<String> items = BehaviorSubject.create("Greet");



    public Observable<String> getValues() {

        return items;

    }



    public void play() {

        items.onNext("Hello");

        items.onNext("and");

        items.onNext("goodbye");

    }

}

上面的改進版本,看起來我們返回的是一個 Observable,但該返回的對象是不安全的,返回的其實是一個 Subject。

asObservable

由于 Observable 是不可變的,所以 asObservable 函數是為了把一個 Observable 對象包裝起來并安全的分享給其他人使用。

public Observable<String> getValues() {

    return items.asObservable();

}

這樣的話,我們的 Subject 對象就被合理的保護起來了。這樣其他惡意人員也無法修改你的 Observable 返回的數據了,在使用的過程中也可以避免出現錯誤了。

無法保護可變對象

在 RxJava 中, Rx 傳遞的是對象引用 而不是 對象的副本。在一個 地方修改了對象,在傳遞路徑的其他地方上也是可見的。例如下面一個可變的對象:

class Data {

    public int id;

    public String name;

    public Data(int id, String name) {

        this.id = id;

        this.name = name;

    }

}

使用該對象的一個 Observable 和兩個 Subscriber:

Observable<Data> data = Observable.just(

    new Data(1, "Microsoft"),

    new Data(2, "Netflix")

);



data.subscribe(d -> d.name = "Garbage");

data.subscribe(d -> System.out.println(d.id + ": " + d.name));


結果:

1: Garbage

2: Garbage

第一個 Subscriber 先處理每個數據,在第一個 Subscriber 完成后第二個 Subscriber 開始處理數據,由于 Observable 在傳遞路徑中使用的是對象引用,所以 第一個 Subscriber 中對對象做的修改,第二個 Subscriber 也會看到。



來自:http://blog.chengyunfeng.com/?p=968

 本文由用戶 T-top 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!