RxJava 教程第三部分:馴服數據流之 hot & cold Observable

c4r79936 8年前發布 | 13K 次閱讀 RxJava Java開發

 

Observable 數據流有兩種類型:hot 和 cold。這兩種類型有很大的不同。本節介紹他們的區別,以及作為 Rx 開發者應該如何正確的使用他們。

Cold observables

只有當有訂閱者訂閱的時候, Cold Observable 才開始執行發射數據流的代碼。并且每個訂閱者訂閱的時候都獨立的執行一遍數據流代碼。 Observable.interval 就是一個 Cold Observable。每一個訂閱者都會獨立的收到他們的數據流。

Observable<Long> cold = Observable.interval(200, TimeUnit.MILLISECONDS);
 
cold.subscribe(i -> System.out.println("First: " + i));
Thread.sleep(500);
cold.subscribe(i -> System.out.println("Second: " + i));
 

結果:

First: 0
First: 1
First: 2
Second: 0
First: 3
Second: 1
First: 4
Second: 2
...
 

雖然這兩個 Subscriber 訂閱到同一個Observable 上,只是訂閱的時間不同,他們都收到同樣的數據流,但是同一時刻收到的數據是不同的。

在本教程中之前所見到的 Observable 都是 Cold Observable。 Observable.create 創建的也是 Cold Observable,而 just, range, timer 和 from 這些創建的同樣是 Cold Observable。

Hot observables

Hot observable 不管有沒有訂閱者訂閱,他們創建后就開發發射數據流。 一個比較好的示例就是 鼠標事件。 不管系統有沒有訂閱者監聽鼠標事件,鼠標事件一直在發生,當有訂閱者訂閱后,從訂閱后的事件開始發送給這個訂閱者,之前的事件這個訂閱者是接受不到的;如果訂閱者取消訂閱了,鼠標事件依然繼續發射。

Publish

Cold Observable 和 Hot Observable 之間可以相互轉化。使用 publish 操作函數可以把 Cold Observable 轉化為 Hot Observable。

public final ConnectableObservable<T> publish()
 

publish 返回一個 ConnectableObservable 對象,這個對象是 Observable 的之類,多了三個函數:

public final Subscriptionconnect()
public abstract void connect(Action1<? super Subscription> connection)
public Observable<T> refCount()
 

另外還有一個重載函數,可以在發射數據之前對數據做些處理:

public final <R> Observable<R> publish(Func1<? super Observable<T>,? extends Observable<R>> selector)
 

之前介紹的所有對 Observable 的操作都可以在 selector 中使用。你可以通過 selector 參數創建一個 Subscription ,后來的訂閱者都訂閱到這一個 Subscription 上,這樣可以確保所有的訂閱者都在同一時刻收到同樣的數據。

這個重載函數返回的是 Observable 而不是 ConnectableObservable , 所以下面討論的操作函數無法在這個重載函數返回值上使用。

connect

ConnectableObservable 如果不調用 connect 函數則不會觸發數據流的執行。當調用 connect 函數以后,會創建一個新的 subscription 并訂閱到源 Observable (調用 publish 的那個 Observable)。這個 subscription 開始接收數據并把它接收到的數據轉發給所有的訂閱者。這樣,所有的訂閱者在同一時刻都可以收到同樣的數據。

ConnectableObservable<Long> cold = Observable.interval(200, TimeUnit.MILLISECONDS).publish();
cold.connect();
 
cold.subscribe(i -> System.out.println("First: " + i));
Thread.sleep(500);
cold.subscribe(i -> System.out.println("Second: " + i));    
 

結果:

First: 0
First: 1
First: 2
Second: 2
First: 3
Second: 3
First: 4
Second: 4
First: 5
Second: 5
 

Disconnecting

connect 函數返回的是一個 Subscription,和 Observable.subscribe返回的結果一樣。 可以使用這個 Subscription 來取消訂閱到 ConnectableObservable。 如果調用 這個 Subscription 的 unsubscribe 函數,可以停止把數據轉發給 Observer,但是這些 Observer 并沒有從 ConnectableObservable 上取消注冊,只是停止接收數據了。如果再次調用 connect , 則 ConnectableObservable 開始一個新的訂閱,在 ConnectableObservable 上訂閱的 Observer 會再次開始接收數據。

ConnectableObservable<Long> connectable = Observable.interval(200, TimeUnit.MILLISECONDS).publish();
Subscription s = connectable.connect();
 
connectable.subscribe(i -> System.out.println(i));
 
Thread.sleep(1000);
System.out.println("Closing connection");
s.unsubscribe();
 
Thread.sleep(1000);
System.out.println("Reconnecting");
s = connectable.connect();
 

結果:

0
1
2
3
4
Closingconnection
Reconnecting
0
1
2
...
 

通過調用 connect 來重新開始訂閱,會創建一個新的訂閱。如果源 Observable 為 Cold Observable 則數據流會重新執行一遍。

如果你不想結束數據流,只想從 publish 返回的 Hot Observable 上取消注冊,則可以使用 subscribe 函數返回的 Subscription 對象。

ConnectableObservable<Long> connectable = Observable.interval(200, TimeUnit.MILLISECONDS).publish();
Subscription s = connectable.connect();
 
Subscriptions1 = connectable.subscribe(i -> System.out.println("First: " + i));
Thread.sleep(500);
Subscriptions2 = connectable.subscribe(i -> System.out.println("Second: " + i));
 
Thread.sleep(500);
System.out.println("Unsubscribing second");
s2.unsubscribe();
 

結果:

First: 0
First: 1
First: 2
Second: 2
First: 3
Second: 3
First: 4
Second: 4
Unsubscribingsecond
First: 5
First: 6
 

refCount

ConnectableObservable.refCount 返回一個特殊的 Observable , 這個 Observable 只要有訂閱者就會繼續發射數據。

Observable<Long> cold = Observable.interval(200, TimeUnit.MILLISECONDS).publish().refCount();
 
Subscriptions1 = cold.subscribe(i -> System.out.println("First: " + i));
Thread.sleep(500);
Subscriptions2 = cold.subscribe(i -> System.out.println("Second: " + i));
Thread.sleep(500);
System.out.println("Unsubscribe second");
s2.unsubscribe();
Thread.sleep(500);
System.out.println("Unsubscribe first");
s1.unsubscribe();
 
System.out.println("First connection again");
Thread.sleep(500);
s1 = cold.subscribe(i -> System.out.println("First: " + i));
 

結果:

First: 0
First: 1
First: 2
Second: 2
First: 3
Second: 3
Unsubscribesecond
First: 4
First: 5
First: 6
Unsubscribefirst
Firstconnectionagain
First: 0
First: 1
First: 2
First: 3
First: 4
 

如果沒有訂閱者訂閱到 refCount 返回的 Observable,則不會執行數據流的代碼。如果所有的訂閱者都取消訂閱了,則數據流停止。重新訂閱再回重新開始數據流。

replay

public final ConnectableObservable<T> replay()
 

replay 和 ReplaySubject 類似。當和源 Observable 鏈接后,開始收集數據。當有 Observer 訂閱的時候,就把收集到的數據線發給 Observer。然后和其他 Observer 同時接受數據。

ConnectableObservable<Long> cold = Observable.interval(200, TimeUnit.MILLISECONDS).replay();
Subscription s = cold.connect();
 
System.out.println("Subscribe first");
Subscriptions1 = cold.subscribe(i -> System.out.println("First: " + i));
Thread.sleep(700);
System.out.println("Subscribe second");
Subscriptions2 = cold.subscribe(i -> System.out.println("Second: " + i));
Thread.sleep(500);
 

結果:

Subscribefirst
First: 0
First: 1
First: 2
Subscribesecond
Second: 0
Second: 1
Second: 2
First: 3
Second: 3
 

replay 和 publish 一樣也返回一個 ConnectableObservable 。所以我們可以在上面使用 refCount 來創建新的 Observable 也可以取消注冊。

replay 有 8個重載函數:

ConnectableObservable<T> replay()
<R> Observable<R> replay(Func1<? super Observable<T>,? extends Observable<R>> selector)
<R> Observable<R> replay(Func1<? super Observable<T>,? extends Observable<R>> selector, int bufferSize)
<R> Observable<R> replay(Func1<? super Observable<T>,? extends Observable<R>> selector, int bufferSize, long time, java.util.concurrent.TimeUnitunit)
<R> Observable<R> replay(Func1<? super Observable<T>,? extends Observable<R>> selector, long time, java.util.concurrent.TimeUnitunit)
ConnectableObservable<T> replay(int bufferSize)
ConnectableObservable<T> replay(int bufferSize, long time, java.util.concurrent.TimeUnitunit)
ConnectableObservable<T> replay(long time, java.util.concurrent.TimeUnitunit)
 

有三個參數 bufferSize、 selector 和 time (以及指定時間單位的 unit)

– bufferSize 用來指定緩存的最大數量。當新的 Observer 訂閱的時候,最多只能收到 bufferSize 個之前緩存的數據。

– time, unit 用來指定一個數據存貨的時間,新訂閱的 Observer 只能收到時間不超過這個參數的數據。

– selector 和 publish(selector) 用來轉換重復的 Observable。

下面是一個 bufferSize 的示例:

ConnectableObservable<Long> source = Observable.interval(1000, TimeUnit.MILLISECONDS)
    .take(5)
    .replay(2);
 
source.connect();
Thread.sleep(4500);
source.subscribe(System.out::println);
 

結果:

2
3
4
 

cache

cache 操作函數和 replay 類似,但是隱藏了 ConnectableObservable ,并且不用管理 subscription 了。當第一個 Observer 訂閱的時候,內部的 ConnectableObservable 訂閱到源 Observable。后來的訂閱者會收到之前緩存的數據,但是并不會重新訂閱到源 Observable 上。

public final Observable<T> cache()
public final Observable<T> cache(int capacity)
 

Observable<Long> obs = Observable.interval(100, TimeUnit.MILLISECONDS)
    .take(5)
    .cache();
 
Thread.sleep(500);
obs.subscribe(i -> System.out.println("First: " + i));
Thread.sleep(300);
obs.subscribe(i -> System.out.println("Second: " + i));
 

結果:

First: 0
First: 1
First: 2
Second: 0
Second: 1
Second: 2
First: 3
Second: 3
First: 4
Second: 4
 

從上面示例中可以看到,只有當有訂閱者訂閱的時候,源 Observable 才開始執行。當第二個訂閱者訂閱的時候,會收到之前緩存的數據。

需要注意的是,如果所有的訂閱者都取消訂閱了 內部的 ConnectableObservable 不會取消訂閱,這點和 refCount 不一樣。只要第一個訂閱者訂閱了,內部的 ConnectableObservable 就鏈接到源 Observable上了并且不會取消訂閱了。 這點非常重要,因為當我們一單訂閱了,就沒法取消源 Observable了, 直到源 Observable 結束或者程序內存溢出。 可以指定緩存個數的重載函數也沒法解決這個問題,緩存限制只是作為一個優化的提示,并不會限制內部的緩存大小。

Observable<Long> obs = Observable.interval(100, TimeUnit.MILLISECONDS)
    .take(5)
    .doOnNext(System.out::println)
    .cache()
    .doOnSubscribe(() -> System.out.println("Subscribed"))
    .doOnUnsubscribe(() -> System.out.println("Unsubscribed"));
 
Subscriptionsubscription = obs.subscribe();
Thread.sleep(150);
subscription.unsubscribe();
 

結果:

Subscribed
0
Unsubscribed
1
2
3
4
 

上面的示例中,doOnNext 打印源 Observable 發射的每個數據。而 doOnSubscribe 和doOnUnsubscribe 打印緩存后的 Observable 的訂閱和取消訂閱事件。可以看到當訂閱者訂閱的時候,數據流開始發射,取消訂閱數據流并不會停止。

Multicast

share 函數是 Observable.publish().refCount() 的別名。可以讓你的訂閱者分享一個 subscription,只要還有訂閱者在,這個 subscription 就繼續工作。

 

來自: http://blog.chengyunfeng.com/?p=975

 本文由用戶 c4r79936 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!