RxJava 教程第三部分:馴服數據流之 避免 monad
monad 是函數式編程中的抽象概念,是一種高度的數學抽象,關于 monad 的詳細介紹請看這里: Functors, Applicatives, And Monads In Pictures ,不要百度搜索其他的資料, 關于 monad 的介紹,在網上有 90% 都是錯誤的,誤導人的。
在 www.introtorx.com 中也有一個簡短的定義:
Monad 是一種在模型域對象中封裝了計算邏輯而不是數據的一種抽象數據構造類型。Monads are a kind of abstract data type constructor that encapsulate program logic instead of data in the domain model.
Observable 就是一個 monad。Rx 代碼定義了需要完成的任務,但是實際執行任務的過程確在 Rx 執行代碼之外執行。本節中的 monad 我們只是指代 Observable。
為什么要避免 monad
主要有兩個原因:第一個原因是 Rx 新手還是習慣傳統的編碼方式。使用另外一種方式(paradigm )來計算部分結果或許可以讓你獲取到正確的結果,但是你依然在嘗試搞明白 Rx 是如何工作的。第二個原因是 我們使用的第三方庫和組件并沒有按照 Rx 的方法來設計。當 重構現有的代碼使用 Rx, 讓 Rx 繼續使用阻塞的方式工作也許是最好的選擇。
BlockingObservable
使用 BlockingObservable 可以把 Observable 中的數據通過阻塞的方式發射出來。任何一個 Observable 都可以使用下面兩種方式來轉換為阻塞的 Observable。
public final BlockingObservable<T> toBlocking()
public static <T> BlockingObservable<T> from(Observable<? extends T> o)
BlockingObservable 并沒有繼承 Observable,所以無法使用常用的操作函數。他自己實現了一部分功能,可以通過阻塞的方式來從中獲取數據。里面有很多我們已經見到過的函數的阻塞實現。
forEach
Observable 有個函數叫做 forEach。 forEach 為 subscribe 的一個沒有返回Subscription 的別名。例如下面的例子:
Observable<Long> values = Observable.interval(100, TimeUnit.MILLISECONDS);
values
.take(5)
.forEach(v -> System.out.println(v));
System.out.println("Subscribed");
結果:
Subscribed
0
1
2
3
4
通過 forEach 可以處理 Observable 每個發射出來的數據。由于是非阻塞執行的,所以結果先答應出來 Subscribed,然后是每個發射的數字。
BlockingObservable 沒有 subscribe 函數,但是有這個 forEach 函數。
Observable<Long> values = Observable.interval(100, TimeUnit.MILLISECONDS);
values
.take(5)
.toBlocking()
.forEach(v -> System.out.println(v));
System.out.println("Subscribed");
結果:
0
1
2
3
4
Subscribed
這里由于使用的是阻塞的 Observable,所以當 forEach 執行完后,才會執行后面的打印 Subscribed 的代碼。同時 阻塞的 Observable 也沒有 onError 和 onCompleted 函數。當執行完成的時候,就執行完了;當錯誤發生的時候,異常就直接就地拋出了;
Observable<Long> values = Observable.error(new Exception("Oops"));
try {
values
.take(5)
.toBlocking()
.forEach(v -> System.out.println(v));
}
catch (Exception e) {
System.out.println("Caught: " + e.getMessage());
}
System.out.println("Subscribed");
結果:
Caught: java.lang.Exception: Oops
Subscribed
first, last, single
BlockingObservable 還有這3個函數,以及帶有默認值的另外三個函數:firstOrDefault, lastOrDefault 和 singleOrDefault.
這些函數會阻塞當前的線程直到有數據發射出來并返回符合結果的數據:
Observable<Long> values = Observable.interval(100, TimeUnit.MILLISECONDS);
long value = values
.take(5)
.toBlocking()
.first(i -> i>2);
System.out.println(value);
結果:
| 3 |
first 會一直阻塞,直到有數據發射并返回符合條件的數據。
和 forEach 一樣,錯誤發生了也是就地拋出:
Observable<Long> values = Observable.interval(100, TimeUnit.MILLISECONDS);
try {
long value = values
.take(5)
.toBlocking()
.single(i -> i>2);
System.out.println(value);
}
catch (Exception e) {
System.out.println("Caught: " + e);
}
結果:
Caught: java.lang.IllegalArgumentException: Sequencecontainstoomanyelements
To Iterable
還可以使用 BlockingObservable 上的一些方法把 Observable 轉換為 iterables ,然后可以傳統的 Java 方式來遍歷這些集合。當需要處理數據的時候,就調用 Iterator 的 next() 函數,如果有數據 next() 就直接返回;如果沒有數據 next() 函數就阻塞直到有數據產生。
有多種方式把 BlockingObservable 轉換為 Iterable ,每種方式都有不同的區別。
toIterable
public java.lang.Iterable<T> toIterable()

這種實現方式,把 Observable 所發射的所有數據給收集起來并緩存到一個集合中。由于緩存的存在,所以不會丟失數據。一單有下一個數據 next() 函數就返回。否則的話就阻塞到數據可用。注意 上圖畫的有點問題,看起來好像等 Observable 發射完后來返回集合。
Observable<Long> values = Observable.interval(500, TimeUnit.MILLISECONDS);
Iterable<Long> iterable = values.take(5).toBlocking().toIterable();
for (long l : iterable) {
System.out.println(l);
}
結果:
0
1
2
3
4
注意: iterable 的 hasNext() 或者 next() 函數都會阻塞直到有數據可用。如果 Observable 完成了, hasNext 返回 false, next 拋出異常:java.util.NoSuchElementException。
next
public java.lang.Iterable<T> next()

這種實現數據沒有緩存。 iterator 總是等待下一個數據并立刻返回。
Observable<Long> values = Observable.interval(500, TimeUnit.MILLISECONDS);
values.take(5)
.subscribe(v -> System.out.println("Emitted: " + v));
Iterable<Long> iterable = values.take(5).toBlocking().next();
for (long l : iterable) {
System.out.println(l);
Thread.sleep(750);
}
結果:
Emitted: 0
0
Emitted: 1
Emitted: 2
2
Emitted: 3
Emitted: 4
4
這里的示例中, 打印語句(消費者)處理的速度比數據發射的速度慢。所以消費者會錯過一些數據。
latest
public java.lang.Iterable<T> latest()
latest 和 next 類似,區別就是 latest 會緩存一個數據。
Observable<Long> values = Observable.interval(500, TimeUnit.MILLISECONDS);
values.take(5)
.subscribe(v -> System.out.println("Emitted: " + v));
Iterable<Long> iterable = values.take(5).toBlocking().latest();
for (long l : iterable) {
System.out.println(l);
Thread.sleep(750);
}
結果:
Emitted: 0
0
Emitted: 1
1
Emitted: 2
Emitted: 3
3
Emitted: 4
使用 latest 的時候,如果在下一個數據發射之前,當前的數據還沒有被消費者消費,則當前的值就會丟失。如果 消費者比 生產者(Observable)發射的數據快,則 iterator 會阻塞并且等待下一個數據。
上面示例中的最后一個數據 4 并沒有被消費掉。由于 onCompleted 是立刻結束的,導致下一次消費者通過 next 獲取數據的時候,看到的是一個已經結束的 Observable,而 iterator.hasNext() 如果發現是一個已經結束的 Observable 則返回 false,盡管還有一個數據還沒有被消費。
mostRecent
public java.lang.Iterable<T> mostRecent(T initialValue)

mostRecent 返回的 iterator 從來不會阻塞。他會緩存最近一個值,如果消費者比 生產者處理的速度慢,則有數據會丟失。和 latest 不一樣的是, 只要消費者需要數據,則緩存的數據就會直接返回。這樣,如果消費者處理數據的速度快,則消費者就會看到重復的數據。所以為了實現不阻塞的操作,該函數需要一個初始化的值。如果 Observable 還沒有發射數據,消費者這個時候看到的就是這個初始化的值。
Observable<Long> values = Observable.interval(500, TimeUnit.MILLISECONDS);
values.take(5)
.subscribe(v -> System.out.println("Emitted: " + v));
Iterable<Long> iterable = values.take(5).toBlocking().mostRecent(-1L);
for (long l : iterable) {
System.out.println(l);
Thread.sleep(400);
}
結果:
-1
-1
Emitted: 0
0
Emitted: 1
1
Emitted: 2
2
Emitted: 3
3
3
Emitted: 4
Future
使用 toFuture 函數也可以把 BlockingObservable 轉換為一個 Future ,該方法只是創建一個 Future 并返回,不會阻塞。Future 可以讓消費者決定如何處理異步操作。Future 也可以處理異常情況。
Observable<Long> values = Observable.timer(500, TimeUnit.MILLISECONDS);
values.subscribe(v -> System.out.println("Emitted: " + v));
Future<Long> future = values.toBlocking().toFuture();
System.out.println(future.get());
結果:
Emitted: 0
0
通過這種方式創建的 Future,要求 Observable 只發射一個數據,和 single 函數要求的一樣。如果發射了多個數據,則 Future 會拋出 java.lang.IllegalArgumentException.
Locks
Deadlocks
到目前為止我們都選擇忽略可能導致死鎖的情況。 Rx 的非阻塞特性導致很難創建非必要的死鎖。然后本節中我們把 Observable 轉換為 阻塞的操作,這樣又導致死鎖很容易出現了。
例如:
ReplaySubject<Integer> subject = ReplaySubject.create();
subject.toBlocking().forEach(v -> System.out.println(v));
subject.onNext(1);
subject.onNext(2);
subject.onCompleted();
forEach 只有當 Observable 結束發射的時候才返回。而后面的 onNext 和 onCompleted 需要 forEach 返回后才能執行,這樣就導致了死鎖。所以 forEach 會一直等待下去。
沒有結束的Observable
有些阻塞操作(比如 last() )需要 Observable 結束發射數據才能返回。而 有些操作(比如 first() )需要 Observable 需要至少發射一個數據才能返回。所以在 BlockingObservable 上使用這些函數需要注意 ,如果 Observable 不滿足條件則可能會導致該操作永遠阻塞。所以為了避免永遠阻塞的問題,可以指定一個超時時間間隔, 在后面的 Timeshifter 數據流部分會介紹如何做。