RxJava 教程第一部分:入門之 生命周期管理
Rx 背后的理念是:無法知道事件流何時發射數據、也不知何時結束發射,但是你需要控制何時開始和結束接受事件。訂閱者可能使用了一些資源,這些資源需要在停止接收事件的時候釋放。 通過 subscription 可以實現生命周期管理。
Subscribing
Observable.subscribe 有好幾個重載函數,每個函數都是某種情況的簡化形式。
Subscription subscribe() Subscription subscribe(Action1<? super T> onNext) Subscription subscribe(Action1<? super T> onNext, Action1<java.lang.Throwable> onError) Subscription subscribe(Action1<? super T> onNext, Action1<java.lang.Throwable> onError, Action0onComplete) Subscription subscribe(Observer<? super T> observer) Subscription subscribe(Subscriber<? super T> subscriber)
第一個 subscribe() 函數只是訂閱事件,但是不去處理這些事件。帶有一個或者多個 Action 參數的,使用這些參數來構造一個 Subscriber 對象。這些參數分別對應 onNext、onError和 onComplete 這三種類型的事件,如果沒有提供則代表不處理這個類型的事件。
下面的示例演示處理 error 的情況:
Subject<Integer, Integer> s = ReplaySubject.create(); s.subscribe( v -> System.out.println(v), e -> System.err.println(e)); s.onNext(0); s.onError(new Exception("Oops"));
結果:
0 java.lang.Exception: Oops
如果你不處理 error,則 在發生錯誤的時候,會拋出 OnErrorNotImplementedException 異常。該異常發生在生產者這邊,上面的示例 生產者和消費者位于同一線程,所以你可以直接 try- catch 住,但是在實際應用中,生產者和消費者通常不再同一個線程,所以最好還是提供一個 錯誤處理函數,否則你不知道錯誤發生了并導致事件流結束了。
Unsubscribing
在事件流結束發射之前,你可以主動停止接收事件。每個 subscribe 函數都會返回一個 Subscription 示例,該示例有兩個函數:
boolean isUnsubscribed() void unsubscribe()
只需要調用 unsubscribe 函數就可以停止接收數據了。
Subject<Integer, Integer> values = ReplaySubject.create(); Subscriptionsubscription = values.subscribe( v -> System.out.println(v), e -> System.err.println(e), () -> System.out.println("Done") ); values.onNext(0); values.onNext(1); subscription.unsubscribe(); values.onNext(2);
結果:
0 1
一個 observer 調用 unsubscribe 取消監聽并不妨礙同一個 observable 上的其他 Observer 對象。
Subject<Integer, Integer> values = ReplaySubject.create(); Subscriptionsubscription1 = values.subscribe( v -> System.out.println("First: " + v) ); Subscriptionsubscription2 = values.subscribe( v -> System.out.println("Second: " + v) ); values.onNext(0); values.onNext(1); subscription1.unsubscribe(); System.out.println("Unsubscribed first"); values.onNext(2);
結果:
First: 0 Second: 0 First: 1 Second: 1 Unsubscribedfirst Second: 2
onError 和 onCompleted
onError 和 onCompleted 意味著結束事件流。observable 需要遵守該規范,在 onError 或者 onCompleted 發生后就不應該再發射事件了。
Subject<Integer, Integer> values = ReplaySubject.create(); Subscriptionsubscription1 = values.subscribe( v -> System.out.println("First: " + v), e -> System.out.println("First: " + e), () -> System.out.println("Completed") ); values.onNext(0); values.onNext(1); values.onCompleted(); values.onNext(2);
結果:
First: 0 First: 1 Completed
釋放資源
Subscription 和其使用的資源綁定在一起。所以你應該記得釋放 Subscription 來釋放資源。使用 Subscriptions 的工廠函數可以把 Subscription 和需要的資源綁定在一起,然后可以使用 unsubscribe 來釋放綁定的資源。
Subscription s = Subscriptions.create(() -> System.out.println("Clean")); s.unsubscribe();
結果:
Clean
Subscriptions.create 函數需要一個 Action 接口類型參數,在 unsubscribe 調用的時候會執行該接口來釋放資源。 也有其他一些函數可以簡化開發:
– Subscriptions.empty() 返回一個當 unsubscribe 的時候 啥也不做的Subscription 。當要求你返回一個 Subscription ,但是你確沒有資源需要釋放,則可以返回這個空的 Subscription。
– Subscriptions.from(Subscription… subscriptions),返回的 Subscription 釋放的時候,會調用所有參數 Subscription 的 unsubscribe 函數。
– Subscriptions.unsubscribed() 返回一個已經釋放過的 Subscription。
Subscription 也有一些標準的實現:
– BooleanSubscription
– CompositeSubscription
– MultipleAssignmentSubscription
– RefCountSubscription
– SafeSubscriber
– Scheduler.Worker
– SerializedSubscriber
– SerialSubscription
– Subscriber
– TestSubscriber
在后面將會看到他們的使用方式。這里注意 Subscriber 同時也實現了 Subscription。所以我們也可以直接用 Subscriber 來取消監聽。