RxJava 教程第一部分:入門之 生命周期管理

Miguel07I 8年前發布 | 11K 次閱讀 RxJava Java開發

 

Rx 背后的理念是:無法知道事件流何時發射數據、也不知何時結束發射,但是你需要控制何時開始和結束接受事件。訂閱者可能使用了一些資源,這些資源需要在停止接收事件的時候釋放。 通過 subscription 可以實現生命周期管理。

Subscribing

Observable.subscribe 有好幾個重載函數,每個函數都是某種情況的簡化形式。

Subscription    subscribe()
Subscription    subscribe(Action1<? super T> onNext)
Subscription    subscribe(Action1<? super T> onNext, Action1<java.lang.Throwable> onError)
Subscription    subscribe(Action1<? super T> onNext, Action1<java.lang.Throwable> onError, Action0onComplete)
Subscription    subscribe(Observer<? super T> observer)
Subscription    subscribe(Subscriber<? super T> subscriber)
 

第一個 subscribe() 函數只是訂閱事件,但是不去處理這些事件。帶有一個或者多個 Action 參數的,使用這些參數來構造一個 Subscriber 對象。這些參數分別對應 onNext、onError和 onComplete 這三種類型的事件,如果沒有提供則代表不處理這個類型的事件。

下面的示例演示處理 error 的情況:

Subject<Integer, Integer> s = ReplaySubject.create();
s.subscribe(
    v -> System.out.println(v),
    e -> System.err.println(e));
s.onNext(0);
s.onError(new Exception("Oops"));
 

結果:

0
java.lang.Exception: Oops
 

如果你不處理 error,則 在發生錯誤的時候,會拋出 OnErrorNotImplementedException 異常。該異常發生在生產者這邊,上面的示例 生產者和消費者位于同一線程,所以你可以直接 try- catch 住,但是在實際應用中,生產者和消費者通常不再同一個線程,所以最好還是提供一個 錯誤處理函數,否則你不知道錯誤發生了并導致事件流結束了。

Unsubscribing

在事件流結束發射之前,你可以主動停止接收事件。每個 subscribe 函數都會返回一個 Subscription 示例,該示例有兩個函數:

boolean isUnsubscribed()
void unsubscribe()
 

只需要調用 unsubscribe 函數就可以停止接收數據了。

Subject<Integer, Integer>  values = ReplaySubject.create();
Subscriptionsubscription = values.subscribe(
    v -> System.out.println(v),
    e -> System.err.println(e),
    () -> System.out.println("Done")
);
values.onNext(0);
values.onNext(1);
subscription.unsubscribe();
values.onNext(2);
 

結果:

0
1
 

一個 observer 調用 unsubscribe 取消監聽并不妨礙同一個 observable 上的其他 Observer 對象。

Subject<Integer, Integer>  values = ReplaySubject.create();
Subscriptionsubscription1 = values.subscribe(
    v -> System.out.println("First: " + v)
);
Subscriptionsubscription2 = values.subscribe(
    v -> System.out.println("Second: " + v)
);
values.onNext(0);
values.onNext(1);
subscription1.unsubscribe();
System.out.println("Unsubscribed first");
values.onNext(2);
 

結果:

First: 0
Second: 0
First: 1
Second: 1
Unsubscribedfirst
Second: 2
 

onError 和 onCompleted

onError 和 onCompleted 意味著結束事件流。observable 需要遵守該規范,在 onError 或者 onCompleted 發生后就不應該再發射事件了。

Subject<Integer, Integer>  values = ReplaySubject.create();
Subscriptionsubscription1 = values.subscribe(
    v -> System.out.println("First: " + v),
    e -> System.out.println("First: " + e),
    () -> System.out.println("Completed")
);
values.onNext(0);
values.onNext(1);
values.onCompleted();
values.onNext(2);
 

結果:

First: 0
First: 1
Completed
 

釋放資源

Subscription 和其使用的資源綁定在一起。所以你應該記得釋放 Subscription 來釋放資源。使用 Subscriptions 的工廠函數可以把 Subscription 和需要的資源綁定在一起,然后可以使用 unsubscribe 來釋放綁定的資源。

Subscription s = Subscriptions.create(() -> System.out.println("Clean"));
s.unsubscribe();
 

結果:

Clean
 

Subscriptions.create 函數需要一個 Action 接口類型參數,在 unsubscribe 調用的時候會執行該接口來釋放資源。 也有其他一些函數可以簡化開發:

– Subscriptions.empty() 返回一個當 unsubscribe 的時候 啥也不做的Subscription 。當要求你返回一個 Subscription ,但是你確沒有資源需要釋放,則可以返回這個空的 Subscription。

– Subscriptions.from(Subscription… subscriptions),返回的 Subscription 釋放的時候,會調用所有參數 Subscription 的 unsubscribe 函數。

– Subscriptions.unsubscribed() 返回一個已經釋放過的 Subscription。

Subscription 也有一些標準的實現:

– BooleanSubscription

– CompositeSubscription

– MultipleAssignmentSubscription

– RefCountSubscription

– SafeSubscriber

– Scheduler.Worker

– SerializedSubscriber

– SerialSubscription

– Subscriber

– TestSubscriber

在后面將會看到他們的使用方式。這里注意 Subscriber 同時也實現了 Subscription。所以我們也可以直接用 Subscriber 來取消監聽。

來自: http://blog.chengyunfeng.com/?p=954

 本文由用戶 Miguel07I 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!