RXJava實例解析
要點
- 響應式編程是一種處理異步數據流的規范
- 響應式為數據流的轉換和聚合以及數據流的控制管理提供了工具支持
- 彈珠交互圖(Marble Diagram)以可交互的方式可視化響應式的結構
- 響應式編程風格看起來跟Java Streams API有點相似,不過本質上是不一樣的
- 如何連接到動態流處理異步數據源
在高并發編程范式的發展過程中,我們使用過很多工具,比如java.util.concurrent包、Akka Streams框架、CompletableFuture類以及Netty框架。響應式編程近來大受歡迎,這要得益于它強大的功能和健壯的工具包。
響應式編程是一種處理異步數據流的規范,它為數據流的轉換和聚合以及數據流的控制管理提供了工具支持,它讓考量程序整體設計的工作變得簡單。
但它使用起來并不簡單,它的學習曲線也并不平坦。對于我們當中的那些數學家來說,學習響應式就好比當初他們從學習標準代數的無向量過渡到學習線性代數的向量、矩陣和張量,它們實際上是被單元化的數據流。傳統的編程模式以對象為基礎,而響應式以事件流為基礎。事件可能以多種形式出現,比如對象、數據源、鼠標移動信息或者異常。在傳統的編程范式里,“異常”這個詞描述的是對意外情況的處理,因為在這個背景下,沒有按照預想發生的情況都算異常。而在響應式編程范式里,異常卻是一等公民。因為數據流一般是異步的,所以拋出異常是沒有意義的,任何一個異常都會被當成數據流里的一個事件。
在這篇文章里,我們會探討響應式編程的基本原理,以一種教與學的方式來強化一些重要的概念。
首先要記住的是,響應式里所有的東西都是流。Observable封裝了流,是最基本的單元。流可以包含零個或多個事件,有未完成和已完成兩種狀態,可以正常結束也可以發生錯誤。如果一個流正常完成或者發生錯誤,說明處理結束了,雖然有些工具可以對錯誤進行重試或者使用不同的流替換發生錯誤的流。
在運行我們給出的例子之前,需要把RxJava的依賴加入到項目里。可以在Maven里加入這個依賴:
<dependency>
<groupId>io.reactivex.rxjava</groupId>
<artifactId>rxjava</artifactId>
<version>1.1.10</version>
</dependency>
Observable類有幾個靜態工廠方法和實例方法,它們被用來生成各種新的Observable對象,或者把Observable對象添加到感興趣的處理流程里。Observable是可變的,所以針對它們的操作總是會生成新的Observable對象。為了更好地理解我們的例子,我們先來溫習一下Observable的基本操作,因為在后面的例子里會用到它們。
Observable.just方法生成一個簡單對象,然后返回。例如:
Observable.just("Howdy!")
這行代碼生成一個新的Observable對象,在結束之前觸發一個單獨的事件,生成字符串“Howdy!”。
可以把新生成的Observable對象賦給一個Observable變量:
Observable<String> hello = Observable.just("Howdy!");
不過知道這個還遠遠不夠。就像那個著名的哲學問題一樣,森林里的一顆樹倒下來,如果周圍沒有人聽見,那么就等于說樹的倒下是無聲無息的。一個Observable對象必須要有一個訂閱者來處理它所生成的事件。所幸的是,現在Java支持Lambda表達式,我們就可以使用簡潔的聲明式風格來表示訂閱操作:
Observable<String> howdy = Observable.just("Howdy!");
howdy.subscribe(System.out::println);
這段代碼仍然會生成字符串“Howdy!”。
跟Observable的其它方法一樣,just方法可以被重載:
Observable.just("Hello", "World").subscribe(System.out::println);
這行代碼會輸出
Hello
World
just方法可以被重載,最多可以接收10個參數。這里要注意,輸出的結果分成兩行顯示,說明它們是兩個獨立的事件。
讓我們來看看如果使用列表會發生什么情況:
List<String> words = Arrays.asList(
"the",
"quick",
"brown",
"fox",
"jumped",
"over",
"the",
"lazy",
"dog"
);
Observable.just(words).subscribe(word->System.out.println(word));</code></pre>
這段代碼輸出一個很平常的結果:
[the, quick, brown, fox, jumped, over, the, lazy, dog]
我們本以為每個單詞會是一個單獨的事件,但實際上整個列表被當成了一個事件。為了達到我們想要的結果,我們引入from方法:
Observable.from(words).subscribe(System.out::println);
這行代碼把數組或者列表轉換成一系列事件,每個元素就是一個事件。
執行這行代碼會得到我們想要的多行輸出:
the
quick
brown
fox
jumped
over
the
lazy
dog
為了能從中獲取編號,我們要在Observable上多做一些工作。
不過在寫代碼之前,我們先來看看另外兩個操作,range和zip。range(i,n)會創建一個包含n個數的流,它的第一個數是從i開始的。如果我們有辦法把這種區間流跟上面的單詞流組合在一起,就可以解決編號的問題。
RX Marbles 這個網站對我們學習響應式編程很有幫助。這個網站使用JavaScript渲染大部分響應式操作,而且是可交互的。每個響應式操作使用“彈珠”來描述一個或多個源流(source stream)以及由操作生成的結果流(result stream)。時間從左到右,事件用彈珠表示。單擊或者拖動彈珠,可以看到它們是如何影響結果的。
執行一個zip操作就跟遵照醫囑一樣簡單。讓我們用 彈珠交互圖 來解釋一下這個過程:

zip操作通過成對的“zip”映射轉換把源流的元素跟另一個給定流的元素組合起來,其中的映射可以使用Lambda表達式來表示。只要其中的一個流完成操作,整個zip操作也跟著停止,另一個未完成的流剩下的事件就會被忽略。zip可以支持最多9個源流的zip操作。zipWith操作可以把一個指定流合并到一個已存在的流里。
現在回到我們的例子上,我們可以使用range和zipWith操作加入編號,并用String.format做映射轉換:
Observable.from(words)
.zipWith(Observable.range(1, Integer.MAX_VALUE),
(string, count)->String.format("%2d. %s", count, string))
.subscribe(System.out::println);
這段代碼會輸出:
1. the
- quick
- brown
- fox
- jumped
- over
- the
- lazy
- dog</code></pre>
看起來很不錯!現在假設我們要列出單詞里的字母而不是單詞本身,這個時候要用到flatMap,flatMap會從Observable里獲取事件源(對象、集合或數組),并把這些元素分別映射成Observable,然后把這些Observable扁平化成一個單獨的Observable。
對于我們的例子來說,我們會先用split方法把每個單詞拆分成一個字母數組,然后用flatMap創建一個新的Observable對象,這個Observable對象包含了組成這些單詞的所有字母:
Observable.from(words)
.flatMap(word -> Observable.from(word.split("")))
.zipWith(Observable.range(1, Integer.MAX_VALUE),
(string, count) -> String.format("%2d. %s", count, string))
.subscribe(System.out::println);
這段代碼會輸出:
1. t
- h
- e
- q
- u
- i
- c
- k
...
- l
- a
- z
- y
- d
- o
- g</code></pre>
所有單詞的字母都出現在這里。不過這樣太繁瑣了,我們希望相同的字母只出現一次:
Observable.from(words)
.flatMap(word -> Observable.from(word.split("")))
.distinct()
.zipWith(Observable.range(1, Integer.MAX_VALUE),
(string, count) -> String.format("%2d. %s", count, string))
.subscribe(System.out::println);
這段代碼輸出:
1. t
- h
- e
- q
- u
- i
- c
- k
- b
- r
- o
- w
- n
- f
- x
- j
- m
- p
- d
- v
- l
- a
- z
- y
- g</code></pre>
我們從小被告知“quick brown fox”這個全字母短句包含了英語里所有的字母,不過在這里我們只看到25個,而不是26個。現在讓我們對這些字母進行排序,找出丟失的那個字母:
.flatMap(word -> Observable.from(word.split("")))
.distinct()
.sorted()
.zipWith(Observable.range(1, Integer.MAX_VALUE),
(string, count) -> String.format("%2d. %s", count, string))
.subscribe(System.out::println);
這段代碼輸出:
1. a
- b
- c
...
- q
- r
- t
- u
- v
- w
- x
- y
- z</code></pre>
看樣子是字母“s”丟掉了。為了得到我們期望的結果,需要對數組做一點修改:
List<String> words = Arrays.asList(
"the",
"quick",
"brown",
"fox",
"jumped",
"over",
"the",
"lazy",
"dogs"
);
Observable.from(words)
.flatMap(word -> Observable.from(word.split("")))
.distinct()
.sorted()
.zipWith(Observable.range(1, Integer.MAX_VALUE),
(string, count) -> String.format("%2d. %s", count, string))
.subscribe(System.out::println);
- a
- b
- c
- d
- e
- f
- g
- h
i
- j
- k
- l
- m
- n
- o
- p
- q
- r
- s
- t
- u
- v
- w
- x
- y
z</code></pre>
現在好了!
到目前為止,所有的代碼都跟Java 8里引入的Streams API很相似,不過這種相似只是一種巧合,因為響應式包含的內容遠不止這些。
Java Streams和Lambda表達式為編程語言帶來很大的價值,不過歸根結底,它們只是提供了一種方式來遍歷集合和生成集合。它們的作用很有限,而且缺乏可擴展性和可重用性。盡管Stream的parallel操作可以并行執行任務,但在返回結果前程序無法對整個過程進行干預。相反,響應式引入了執行時間、節流、流量控制等概念,而且它們可以被連接到“永不停止”的處理流程里。響應式產生的結果雖然不是集合,但你可以用任何期望的方式來處理這些結果。
讓我們通過彈珠交互圖更好地理解這些概念。
merge操作可以把最多9個源流合并到一個結果里,而且可以保留它們的順序。無需擔心這里會出現競賽條件,因為所有的事件都被“扁平化”到一個單獨的線程里,包括異常事件和結束事件。
debounce 操作會把在一個時間段內緊挨在一起的幾個事件看成一個單獨事件,這幾個事件里只有最后一個會被觸發:

可以看到,上下兩個圖中的“1”之間有一個指定的時間間隔,而2、3、4、5之間的時間間隔都小于這個間隔,所以它們被看成單個事件。如果把“5”往右挪一點,結果就不一樣了:

另一個有趣的操作是amb,它是一種不確定性的操作。
amb操作會從所有的輸入流中選擇第一個出現的流,然后忽略其它剩下的流。如下圖,第二個流是最先出現的,所以amb操作選擇了這個流。

如果把第一個流里的“20”往左移動,超過第二個流的第一個元素,那么生成的結果又會不一樣:

如果你有一個需要接入到某個數據源的處理流程,比如從消息主題上獲取數據,可能是Bloomberg或者Reuters,你并不關心接入的到底是哪一個,只要從中選擇一個就可以了。在這種情況下,amb操作就會很有用。
Tick Tock
現在,我們可以使用這些工具基于流生成各種有意義的結果。在接下來的這個例子里,我們有一個數據源,它會每秒鐘生成一個事件。不過為了節省CPU,我們讓它在周末時每三秒生成一次。我們使用混合型的“節奏器”按照一定的節奏生成數據。
首先,我們要創建一個返回boolean的方法,它會檢查當前時間是否是周末,如果是就返回true,否則就返回false:
private static boolean isSlowTickTime() {
return LocalDate.now().getDayOfWeek() == DayOfWeek.SATURDAY ||
LocalDate.now().getDayOfWeek() == DayOfWeek.SUNDAY;
}
對于邊讀這篇文章邊在IDE里執行這段代碼的讀者來說,他們可能不想等到下個周末才來驗證這個方法是否可行,所以可以使用下面的替代實現,這個實現會在一個15秒鐘內返回true,在另一個15秒鐘內返回false:
private static long start = System.currentTimeMillis();
public static Boolean isSlowTime() {
return (System.currentTimeMillis() - start) % 30_000 >= 15_000;
}
接下來我們創建兩個Observable對象,fast和slow,然后使用過濾器對它們進行調度,并把它們合并起來。
我們使用Observable.interval操作來安排調度,它會在每個指定的時間間隔內產生一次數據(從0開始計算)。
Observable<Long> fast = Observable.interval(1, TimeUnit.SECONDS);
Observable<Long> slow = Observable.interval(3, TimeUnit.SECONDS);
fast每秒生成一個事件,slow每三秒生成一個事件(我們會忽略事件的值,因為我們只對執行時間感興趣)。
現在我們把這兩個Observable合并到一起,通過使用過濾器讓fast流在工作日生成數據(或者在15秒內),slow流在周末生成數據(或者在另一個15秒內)。
Observable<Long> clock = Observable.merge(
slow.filter(tick-> isSlowTickTime()),
fast.filter(tick-> !isSlowTickTime())
);
最后,我們要添加一個打印時間的訂閱動作。在執行這些代碼時,它會根據我們的調度安排打印出系統時間。
clock.subscribe(tick-> System.out.println(new Date()));
為了防止程序中途退出,需要在方法的末尾添加一行代碼(注意要處理InterruptedException異常)。
Thread.sleep(60_000);
運行代碼的結果:
Fri Sep 16 03:08:18 BST 2016
Fri Sep 16 03:08:19 BST 2016
Fri Sep 16 03:08:20 BST 2016
Fri Sep 16 03:08:21 BST 2016
Fri Sep 16 03:08:22 BST 2016
Fri Sep 16 03:08:23 BST 2016
Fri Sep 16 03:08:24 BST 2016
Fri Sep 16 03:08:25 BST 2016
Fri Sep 16 03:08:26 BST 2016
Fri Sep 16 03:08:27 BST 2016
Fri Sep 16 03:08:28 BST 2016
Fri Sep 16 03:08:29 BST 2016
Fri Sep 16 03:08:30 BST 2016
Fri Sep 16 03:08:31 BST 2016
Fri Sep 16 03:08:32 BST 2016
Fri Sep 16 03:08:35 BST 2016
Fri Sep 16 03:08:38 BST 2016
Fri Sep 16 03:08:41 BST 2016
Fri Sep 16 03:08:44 BST 2016
. . .
可以看到,前面15個事件之間的時間間隔都是1秒,后面15秒內的事件之間的時間間隔是3秒,就像我們所期望的那樣。
連接到已存在的數據源
以上方法用于創建能夠生成靜態數據的Observable是沒有問題的。但如何把Observable連接到已有的數據源上,并享受響應式的流量控制和流操作策略為我們帶來的好處呢?
靜態Observable和動態Observable
首先我們先岔開話題,來介紹一下靜態Observable和動態Observable之間的區別。
到目前為止我們討論的都是靜態Observable,它們提供靜態的數據,盡管我們可以在執行時間上做一些調節,不過這遠遠 不夠。靜態Observable只在有訂閱者的情況下才會生成事件,而且訂閱者收到的是歷史數據,不管它們是從何時開始訂閱的。相反,動態Observable不管有多少個訂閱者都會生成數據,而且只生成最新的數據(除非使用了緩存)。可以通過兩個步驟把靜態Observable轉化成動態Observable:
- 調用Observable的publish方法,生成一個新的ConnectableObservable
- 調用ConnectableObservable的connect方法,開始生成數據
要連接到一個已有的數據源上,可以在這個數據源上添加監聽器(如果你喜歡這么做),監聽器會把事件傳播給訂閱者,然后在每個事件發生時調用訂閱者的onNext方法。在實現監聽器的時候要確保每個訂閱者仍然處于訂閱狀態,否則就要停止把事件傳播給它,同時要注意回壓信號。所幸的是,這些工作可以由RxJava的AsyncEmitter來處理。假設我們有一個叫做SomeFeed的數據服務,它會生成報價事件,同時有一個SomeListener監聽這些報價事件以及其它生命周期事件。在GitHub上已經有一個 實現 ,如果你想自己動手運行這些代碼,可以去下載。
我們的數據源監聽器有兩個方法:
public void priceTick(PriceTick event);
public void error(Throwable throwable);
PriceTick類包含了date、instrument和price字段,還有一個isLast方法用來判斷它是否是最后一個事件:
(點擊放大圖像)

讓我們來看看如何使用AsyncEmitter把Observable連接到一個實時的數據源上:
SomeFeed<PriceTick> feed = new SomeFeed<>();
Observable<PriceTick> obs =
Observable.fromEmitter((AsyncEmitter<PriceTick> emitter) ->
{
SomeListener listener = new SomeListener() {
@Override
public void priceTick(PriceTick event) {
emitter.onNext(event);
if (event.isLast()) {
emitter.onCompleted();
}
}
@Override
public void error(Throwable e) {
emitter.onError(e);
}
};
feed.register(listener);
}, AsyncEmitter.BackpressureMode.BUFFER);
這段代碼幾乎是逐字逐句地從Observable類的 Javadoc 里摘抄出來的。AsyncEmitter封裝了監聽器(第5行)的創建過程,并把它注冊到數據源上(第19行)。Observable直接讓訂閱者對自己進行了訂閱。數據源生成的事件被委托給了AsyncEmitter(第8行)。第20行告訴觀察者要緩沖所有的事件通知,直到它們被訂閱者消費。除了緩沖,還有其它幾種回壓策略:
BackpressureMode.NONE不使用回壓。如果流的速度無法保持同步,可能會拋出MissingBackpressureException或IllegalStateException。
BackpressureMode.ERROR會在下游跟不上速度時拋出MissingBackpressureException。
BackpressureMode.DROP會在下游跟不上速度時把onNext的值丟棄。
BackpressureMode.LATEST會一直保留最新的onNext的值,直到被下游消費掉。
這樣生成的是靜態Observable。靜態Observable在沒有訂閱者的時候不會生成數據,而且所有訂閱者收到的是同樣的歷史數據,而這不是我們想要的。
為了把它轉化成動態Observable,讓所有訂閱者可以實時地接收事件通知,我們必須調用publish和connect方法,就像之前提到的那樣:
ConnectableObservable<PriceTick> hotObservable = obs.publish();
hotObservable.connect();
最后,我們可以對它進行訂閱并顯示報價:
hotObservable.subscribe((priceTick) ->
System.out.printf("%s %4s %6.2f%n", priceTick.getDate(),
priceTick.getInstrument(), priceTick.getPrice()));
來自:http://www.infoq.com/cn/articles/rxjava-by-example