RxJava 教程第三部分:馴服數據流之 hot & cold Observable
Observable 數據流有兩種類型:hot 和 cold。這兩種類型有很大的不同。本節介紹他們的區別,以及作為 Rx 開發者應該如何正確的使用他們。
Cold observables
只有當有訂閱者訂閱的時候, Cold Observable 才開始執行發射數據流的代碼。并且每個訂閱者訂閱的時候都獨立的執行一遍數據流代碼。 Observable.interval 就是一個 Cold Observable。每一個訂閱者都會獨立的收到他們的數據流。
Observable<Long> cold = Observable.interval(200, TimeUnit.MILLISECONDS);
cold.subscribe(i -> System.out.println("First: " + i));
Thread.sleep(500);
cold.subscribe(i -> System.out.println("Second: " + i));
結果:
First: 0
First: 1
First: 2
Second: 0
First: 3
Second: 1
First: 4
Second: 2
...
雖然這兩個 Subscriber 訂閱到同一個Observable 上,只是訂閱的時間不同,他們都收到同樣的數據流,但是同一時刻收到的數據是不同的。
在本教程中之前所見到的 Observable 都是 Cold Observable。 Observable.create 創建的也是 Cold Observable,而 just, range, timer 和 from 這些創建的同樣是 Cold Observable。
Hot observables
Hot observable 不管有沒有訂閱者訂閱,他們創建后就開發發射數據流。 一個比較好的示例就是 鼠標事件。 不管系統有沒有訂閱者監聽鼠標事件,鼠標事件一直在發生,當有訂閱者訂閱后,從訂閱后的事件開始發送給這個訂閱者,之前的事件這個訂閱者是接受不到的;如果訂閱者取消訂閱了,鼠標事件依然繼續發射。
Publish
Cold Observable 和 Hot Observable 之間可以相互轉化。使用 publish 操作函數可以把 Cold Observable 轉化為 Hot Observable。
public final ConnectableObservable<T> publish()
publish 返回一個 ConnectableObservable 對象,這個對象是 Observable 的之類,多了三個函數:
public final Subscriptionconnect()
public abstract void connect(Action1<? super Subscription> connection)
public Observable<T> refCount()
另外還有一個重載函數,可以在發射數據之前對數據做些處理:
public final <R> Observable<R> publish(Func1<? super Observable<T>,? extends Observable<R>> selector)
之前介紹的所有對 Observable 的操作都可以在 selector 中使用。你可以通過 selector 參數創建一個 Subscription ,后來的訂閱者都訂閱到這一個 Subscription 上,這樣可以確保所有的訂閱者都在同一時刻收到同樣的數據。
這個重載函數返回的是 Observable 而不是 ConnectableObservable , 所以下面討論的操作函數無法在這個重載函數返回值上使用。
connect
ConnectableObservable 如果不調用 connect 函數則不會觸發數據流的執行。當調用 connect 函數以后,會創建一個新的 subscription 并訂閱到源 Observable (調用 publish 的那個 Observable)。這個 subscription 開始接收數據并把它接收到的數據轉發給所有的訂閱者。這樣,所有的訂閱者在同一時刻都可以收到同樣的數據。
ConnectableObservable<Long> cold = Observable.interval(200, TimeUnit.MILLISECONDS).publish();
cold.connect();
cold.subscribe(i -> System.out.println("First: " + i));
Thread.sleep(500);
cold.subscribe(i -> System.out.println("Second: " + i));
結果:
First: 0
First: 1
First: 2
Second: 2
First: 3
Second: 3
First: 4
Second: 4
First: 5
Second: 5
Disconnecting
connect 函數返回的是一個 Subscription,和 Observable.subscribe返回的結果一樣。 可以使用這個 Subscription 來取消訂閱到 ConnectableObservable。 如果調用 這個 Subscription 的 unsubscribe 函數,可以停止把數據轉發給 Observer,但是這些 Observer 并沒有從 ConnectableObservable 上取消注冊,只是停止接收數據了。如果再次調用 connect , 則 ConnectableObservable 開始一個新的訂閱,在 ConnectableObservable 上訂閱的 Observer 會再次開始接收數據。
ConnectableObservable<Long> connectable = Observable.interval(200, TimeUnit.MILLISECONDS).publish();
Subscription s = connectable.connect();
connectable.subscribe(i -> System.out.println(i));
Thread.sleep(1000);
System.out.println("Closing connection");
s.unsubscribe();
Thread.sleep(1000);
System.out.println("Reconnecting");
s = connectable.connect();
結果:
0
1
2
3
4
Closingconnection
Reconnecting
0
1
2
...
通過調用 connect 來重新開始訂閱,會創建一個新的訂閱。如果源 Observable 為 Cold Observable 則數據流會重新執行一遍。
如果你不想結束數據流,只想從 publish 返回的 Hot Observable 上取消注冊,則可以使用 subscribe 函數返回的 Subscription 對象。
ConnectableObservable<Long> connectable = Observable.interval(200, TimeUnit.MILLISECONDS).publish();
Subscription s = connectable.connect();
Subscriptions1 = connectable.subscribe(i -> System.out.println("First: " + i));
Thread.sleep(500);
Subscriptions2 = connectable.subscribe(i -> System.out.println("Second: " + i));
Thread.sleep(500);
System.out.println("Unsubscribing second");
s2.unsubscribe();
結果:
First: 0
First: 1
First: 2
Second: 2
First: 3
Second: 3
First: 4
Second: 4
Unsubscribingsecond
First: 5
First: 6
refCount
ConnectableObservable.refCount 返回一個特殊的 Observable , 這個 Observable 只要有訂閱者就會繼續發射數據。
Observable<Long> cold = Observable.interval(200, TimeUnit.MILLISECONDS).publish().refCount();
Subscriptions1 = cold.subscribe(i -> System.out.println("First: " + i));
Thread.sleep(500);
Subscriptions2 = cold.subscribe(i -> System.out.println("Second: " + i));
Thread.sleep(500);
System.out.println("Unsubscribe second");
s2.unsubscribe();
Thread.sleep(500);
System.out.println("Unsubscribe first");
s1.unsubscribe();
System.out.println("First connection again");
Thread.sleep(500);
s1 = cold.subscribe(i -> System.out.println("First: " + i));
結果:
First: 0
First: 1
First: 2
Second: 2
First: 3
Second: 3
Unsubscribesecond
First: 4
First: 5
First: 6
Unsubscribefirst
Firstconnectionagain
First: 0
First: 1
First: 2
First: 3
First: 4
如果沒有訂閱者訂閱到 refCount 返回的 Observable,則不會執行數據流的代碼。如果所有的訂閱者都取消訂閱了,則數據流停止。重新訂閱再回重新開始數據流。
replay
public final ConnectableObservable<T> replay()
replay 和 ReplaySubject 類似。當和源 Observable 鏈接后,開始收集數據。當有 Observer 訂閱的時候,就把收集到的數據線發給 Observer。然后和其他 Observer 同時接受數據。
ConnectableObservable<Long> cold = Observable.interval(200, TimeUnit.MILLISECONDS).replay();
Subscription s = cold.connect();
System.out.println("Subscribe first");
Subscriptions1 = cold.subscribe(i -> System.out.println("First: " + i));
Thread.sleep(700);
System.out.println("Subscribe second");
Subscriptions2 = cold.subscribe(i -> System.out.println("Second: " + i));
Thread.sleep(500);
結果:
Subscribefirst
First: 0
First: 1
First: 2
Subscribesecond
Second: 0
Second: 1
Second: 2
First: 3
Second: 3
replay 和 publish 一樣也返回一個 ConnectableObservable 。所以我們可以在上面使用 refCount 來創建新的 Observable 也可以取消注冊。
replay 有 8個重載函數:
ConnectableObservable<T> replay()
<R> Observable<R> replay(Func1<? super Observable<T>,? extends Observable<R>> selector)
<R> Observable<R> replay(Func1<? super Observable<T>,? extends Observable<R>> selector, int bufferSize)
<R> Observable<R> replay(Func1<? super Observable<T>,? extends Observable<R>> selector, int bufferSize, long time, java.util.concurrent.TimeUnitunit)
<R> Observable<R> replay(Func1<? super Observable<T>,? extends Observable<R>> selector, long time, java.util.concurrent.TimeUnitunit)
ConnectableObservable<T> replay(int bufferSize)
ConnectableObservable<T> replay(int bufferSize, long time, java.util.concurrent.TimeUnitunit)
ConnectableObservable<T> replay(long time, java.util.concurrent.TimeUnitunit)
有三個參數 bufferSize、 selector 和 time (以及指定時間單位的 unit)
– bufferSize 用來指定緩存的最大數量。當新的 Observer 訂閱的時候,最多只能收到 bufferSize 個之前緩存的數據。
– time, unit 用來指定一個數據存貨的時間,新訂閱的 Observer 只能收到時間不超過這個參數的數據。
– selector 和 publish(selector) 用來轉換重復的 Observable。
下面是一個 bufferSize 的示例:
ConnectableObservable<Long> source = Observable.interval(1000, TimeUnit.MILLISECONDS)
.take(5)
.replay(2);
source.connect();
Thread.sleep(4500);
source.subscribe(System.out::println);
結果:
2
3
4
cache
cache 操作函數和 replay 類似,但是隱藏了 ConnectableObservable ,并且不用管理 subscription 了。當第一個 Observer 訂閱的時候,內部的 ConnectableObservable 訂閱到源 Observable。后來的訂閱者會收到之前緩存的數據,但是并不會重新訂閱到源 Observable 上。
public final Observable<T> cache()
public final Observable<T> cache(int capacity)
Observable<Long> obs = Observable.interval(100, TimeUnit.MILLISECONDS)
.take(5)
.cache();
Thread.sleep(500);
obs.subscribe(i -> System.out.println("First: " + i));
Thread.sleep(300);
obs.subscribe(i -> System.out.println("Second: " + i));
結果:
First: 0
First: 1
First: 2
Second: 0
Second: 1
Second: 2
First: 3
Second: 3
First: 4
Second: 4
從上面示例中可以看到,只有當有訂閱者訂閱的時候,源 Observable 才開始執行。當第二個訂閱者訂閱的時候,會收到之前緩存的數據。
需要注意的是,如果所有的訂閱者都取消訂閱了 內部的 ConnectableObservable 不會取消訂閱,這點和 refCount 不一樣。只要第一個訂閱者訂閱了,內部的 ConnectableObservable 就鏈接到源 Observable上了并且不會取消訂閱了。 這點非常重要,因為當我們一單訂閱了,就沒法取消源 Observable了, 直到源 Observable 結束或者程序內存溢出。 可以指定緩存個數的重載函數也沒法解決這個問題,緩存限制只是作為一個優化的提示,并不會限制內部的緩存大小。
Observable<Long> obs = Observable.interval(100, TimeUnit.MILLISECONDS)
.take(5)
.doOnNext(System.out::println)
.cache()
.doOnSubscribe(() -> System.out.println("Subscribed"))
.doOnUnsubscribe(() -> System.out.println("Unsubscribed"));
Subscriptionsubscription = obs.subscribe();
Thread.sleep(150);
subscription.unsubscribe();
結果:
Subscribed
0
Unsubscribed
1
2
3
4
上面的示例中,doOnNext 打印源 Observable 發射的每個數據。而 doOnSubscribe 和doOnUnsubscribe 打印緩存后的 Observable 的訂閱和取消訂閱事件。可以看到當訂閱者訂閱的時候,數據流開始發射,取消訂閱數據流并不會停止。
Multicast
share 函數是 Observable.publish().refCount() 的別名。可以讓你的訂閱者分享一個 subscription,只要還有訂閱者在,這個 subscription 就繼續工作。