可能是最好的 Rx 初學者教程
想必你對 Reactive Programming 這個新東西很好奇吧,尤其是他的衍生,比如: Rx , Bacon.js , RAC 等等。
講真,如果沒有好資料的話,學習 Reactive Programming 是一件很艱難的事情。還記得剛開始學習的時候,我不停地找教程,后來找到了一個很容易上手的實戰指南,但是它僅僅涉及了表面的東西,并沒有告訴我如何圍繞 Reactive Programming 來構建整個應用的架構。另外,官方的文檔對我的幫助也不是很大,尤其是我想理解某個函數的時候。看看下面的例子你就知道:
Rx.Observable.prototype.flatMapLatest(selector, [thisArg])
Projects each element of an observable sequence into a new sequence of observable sequences by incorporating the element’s index and then transforms an observable sequence of observable sequences into an observable sequence producing values only from the most recent observable sequence.
此時我的內心是崩潰的。

我曾經還閱讀過兩本書,一本講得很抽象,而另外一本則是教你如何使用 Reactive 相關的庫。最后,我用了最笨的方法來學習:邊用邊學,把他運用到公司一個實際的項目當中,在遇到問題的時候得到了我同事們的 幫助 。
在我學習的過程中,最艱難的部分是如何 thinking in Reactive 。這需要我們擺脫 imperative and stateful 風格的編程習慣,然后強迫你大腦去思考如何用另外一種方式來解決同樣的問題。我并沒有找到任何關于這個的教程。所以,我覺得要有一個實戰的教程告訴我們如何 thinking in Reactive ,這樣我們才能著手學習 Reactive Programming 。然后,閱讀官方文檔就事半功倍了。因此,我希望這篇教程能幫助到你。
什么是 Reactive Programming
對于什么是 Reactive Programming ,你會在網上看到很多不好的解釋或者定義。Wikipedia 一如既往地萬金油和偏理論化。 Stackoverflow 的這個答案又太規范化,不適合初學者。而, Reactive Manifesto 看起來像是用來忽悠產品經理。另外,微軟的 解釋 Rx = Observables + LINQ + Schedulers 又太 Microsoftish ,看到就覺得好難的樣子。其實,像 reactive 和 propagation of change 等等這些詞條和我們平常在 MV* 或者某些編程語言里看到的沒有什么不同,都是解決同樣的問題。view 要實時響應 model ,也就是當 model 改變時,view 也要做出相應的變化。
我們還是廢話少說。

Reactive Programming 其實就是處理異步數據流
也就是說,他并不是什么新東西。 Event buses 或者 click events 這些不就是異步事件流(Async event streams)嗎?你可以監聽他們,然后做出相應的副作用(side effects)。 Reactive 其實就是一個 idea ,推而廣之的話,不僅僅是 click 或者 hover 事件能夠創建 data stream,所有東西都可以當作一個 stream :比如變量,用戶的輸入,屬性,緩存,數據結構等等。不妨想象一下,你的 推ter feed 其實就是一個 data stream ,同樣地 click 事件也是。你可以監聽他們,然后做出響應。
在此基礎上,你可以使用很多非常棒的函數,比如可以 combine ,create,filter 各種各樣的 stream ,因為 Rx 借鑒了函數式編程。一個 stream 可以當作另一個 stream 的輸入(input)。甚至多個 stream 也能當作另外一個 stream 的輸入。而且,你可以合并(merge)兩個 stream 。你也可以把一個 stream 里你只感興趣的事件 filter 到另外一個 stream 。你還可以把一個 stream 中的數據映射(map)到另外一個 stream 中。
如果 stream 對于 Reactive 這么重要的話,就讓我們來研究研究他。首先,從我們最熟悉的例子開始:「點擊一個按鈕」 。

stream 是一序列按時間排序的 正在發生的事件 (A stream is a sequence of ongoing events ordered in time)。他可以 emit 三種不同的東西:值(value),錯誤(error),或者一個 completed 的標志。舉個例子,當點擊窗口的關閉按鈕時,對應的 completed 事件就會觸發。
我們只能 異步地 捕獲已經 emit 的事件:當一個值 emit 的時候就調用一個事先定義好的回調函數,同樣地,當 error 或者 completed 時調用其對應的回調函數。有時候,你可以不用管后面兩個函數,如果只關注值的話。監聽 stream 也就是所謂的 subscribing ;回調函數就是所謂的 observers ;而 stream 也就是所謂的 subject (observable)。以上其實就是 觀察者設計模式 (Observer Desgin Pattern)。
另外,我們也可以使用 ASCII 來描繪我們的 stream 示例圖。
--a---b-c---d---X---|->
a, b, c, d are emmited valus
X is an error
| is the 'completed' signal
---> is the timeline
想必你對上面的東西都很熟悉吧,那么為了讓你不感到無聊,讓我們來弄點新東西:把一個原始的 click event stream 轉換成一個新的 click event stream 。
首先,讓我們創建一個 counter stream ,他表示某個按鈕被點擊了多少次。在常見的 Reactive library 里面,每個 stream 都有很多函數。比如 map , filter , scan 等等。當你調用其中某個時,比如 clickStream.map(f) ,他會返回一個基于 clickStream 的 新的 stream 。他并不改變原來的 clickStream ,這就是所謂的 immutability (不變性),他和 Reactive stream 總是形影不離。這樣,我們可以鏈式地調用 stream 的函數像這樣 clickStream.map(f).scan(g) :
clickStream: ---c----c--c----c------c-->
vvvvv map(c becomes 1) vvvv
---1----1--1----1------1-->
vvvvvvvvv scan(+) vvvvvvvvv
counterStream: ---1----2--3----4------5-->
map(f) 函數會根據你傳進來的函數 f ,替換掉 clickStream 每個 emit 的值,到新的 stream 中。在我們的例子中,我們把每個 click 映射成數字 1 。 scan(g) 會累加 stream 的過去的所有的值(例子中的 g 其實就是一個簡單的 add 函數)。接著,無論 click 事件什么時候發生, counterStream 都會 emit 鼠標點擊過的總次數。
為了展示 Reactive 的真正實力,我們不妨假設你有一個「double click」event stream 。為了讓他更加有趣一些,我們想要的新的 stream 可以是 「triple clicks」或者直接「multiple clicks」。那么,現在請深呼吸一下,想象一下你用傳統的 imperative and stateful 編程風格來實現這個效果。我敢打賭,這一定是一件很令人討厭的事情,并且你還需要定義一些變量去保存狀態,以及解決鼠標連續點擊的時間間隔問題。
沒錯,用 Reactive 的話實現的話,是很簡單的。實際上,關于邏輯的代碼只有 4 行 。但是,我們暫時先不看代碼。 Thinking in diagrams (畫圖思考) 是理解和構建 stream 的最好方法,無論你是初學者還是老手。

上圖中,灰色的矩形是把一個 stream 轉換成另一個 stream 的函數。我們會每隔 250ms 把所有 click stream 都緩沖在一個數組里面,這是 buffer(stream.throttle(250ms)) 所要做的事情(如果你現在不了解細節的話不要在意,因為我們現在只是初探一下 Reactive 而已)。于是,我們得到的是一個包含多個數組的 stream,接著調用 map() 函數,把每個數組都映射成一個整數(數組的長度)。隨后,我們調用 filter(x >= 2) 來過濾掉那些長度為 1 的數組。綜上,我們只需要3次操作就能得到我們想要的 stream 。最后,我們調用 subscribe() 來監聽,響應我們想要做的事情。
我希望你能夠欣賞這種很優美的方法。上面的例子其實只是冰山一角:你可以在不同類型的 stream 中調用相同的 operator (例如, map , filter 等等)。此外,還有很多有用的函數供你使用。
Why Reactive Programming(RP)
Reactive Programming 提高了你代碼的抽象級別,因此你可以專注寫業務邏輯(business logic),而不是不停地去折騰一大堆的實現細節,所以 RP 的代碼看起來簡潔很多。
RP 的優勢在現代的 webapp 和 mobile app 中更加明顯,因為他們需要和眾多的 UI 事件(與數據事件相關)進行高度的交互。十年前,和 web 頁面交互僅僅只是提交一個表單給后臺,然后返回重新渲染好頁面給前端。而如今的應用就需要更加實時(real-time)了:修改一個單獨的表單域就會自動保存到后臺,比如給某些內容的「點贊」就能夠實時地反映給當前在線的其他用戶。
為了提高用戶體驗,現代的應用都需要大量的實時的事件。我們需要工具來正確地解決這些問題,而 Reactive Programming 正是我們想要的答案。
實戰 Thinking in RP
讓我們回到現實世界吧,用一個真實的例子來說明如何一步一步地 thinking in RP 。不是偽代碼,沒有講一半不講另一半的概念性的東西。在教程的最后,我們的代碼不僅可以跑起來,還知道每一步為什么要這樣做。
我選擇 JavaScript 和 RxJS 作為我們的工具,是因為:JavaScript 是如今最流行的語言,雖然 Rx* library family 已經被大量應用到需要的語言和平臺中(.NET,Java,Scala,Clojure,JavaScript,Ruby,Phtyhon,C++,Object-C/Cocoa,Groovy 等等)。無論你選擇哪個,你都可以從這篇教程中學到東西。
實現一個「 Who to follow 」
推ter 有一個推薦關注用戶的 UI 組件:

下面,我們就來實現他的主要功能 :
- 在 App 啟動時,從 API 中加載用戶數據,并顯示 3 個推薦
- 點擊「刷新」按鈕時,重新加載另外 3 個推薦用戶
- 點擊每行(每個推薦)的「 x 」(關閉按鈕)時,移除掉當前的推薦,加載新的
- 每行都有用戶的頭像和主頁的鏈接
我們先不理其他比較小的功能。由于 推ter 關閉了公用 API ,所以我們就轉用 GitHub 獲取用戶的 API 。
請求和響應(Request & Response)
你怎么用 Rx 解決 API 請求和響應的問題?首先記住, (most) everything is a stream ,這是 施展 Rx 魔法的咒語。現在我們先實現最簡單的功能:「在 App 啟動時,從 API 中加載用戶數據,并顯示 3 個推薦」。這里沒有什么特別的,就和往常一樣:(1)發請求,(2)獲取后臺的響應,(3)渲染響應。接下來,我們把請求看作一個 stream 。雖然這看起來有點 overkill,但是我們需要從基本的東西開始,不是嗎?
App 啟動時我們只需要發一個請求,因此我們可以把他看作一個 data stream ,他只 emit 一個值。(以后我們會有多個請求,但現在我們只有一個)。
--a------|->
where a is the string 'https://api.github.com/users'
這就是我們想要發請求的 URL stream 。無論該請求事件何時發生,他都會告訴我們兩件事: when and what 。「 when 」是說當事件 emit 時,請求才被執行。而「 what 」則表示請求的就是 emit 的值,即是這個 URL 字符串 。
在 Rx* 中創建只有單獨一個值的 stream 是很簡單的。stream 的官方術語是「 Observable 」,因為他可以被觀察(observe)。但是我發現這是一個很蠢的名字,所以我通常都叫他「 stream 」。
var requestStream = Rx.Observable.just('https://api.github.com/users');
但現在,這只是一個 string stream ,并沒有其他的操作,所以我們要想辦法在他 emit 值的時候干些事情。這個時候就需要 subscribe (訂閱) 他。
requestStream.subscribe(function(requestUrl) {
// execute the request
jQuery.getJSON(requestUrl, function(responseData) {
// ...
});
}
注意到現在我們用 jQuery Ajax 回調函數來處理請求后的異步操作。 但你不是說 Rx 就是用來處理異步數據流的嗎 !難道這個請求的響應不能是一個包含數據,并且會在未來某個時間點到達的 stream ?理論上看起來是行的,讓我們試試吧。
requestStream.subscribe(function(requestUrl) {
// execute the request
var responseStream = Rx.Observable.create(function (observer) {
jQuery.getJSON(requestUrl)
.done(function(response) { observer.onNext(response); })
.fail(function(jqXHR, status, error) { observer.onError(error); })
.always(function() { observer.onCompleted(); });
});
responseStream.subscribe(function(response) {
// do something with the response
});
});
Rx.Observable.create() 可以自定義我們自己的 stream,通過定義一個 observer( onNext() , onError )。不難發現,上面我們的工作其實就是封裝一個 jQuery Ajax Promise 而已。慢著,這也就是說, Promise 是一個 Observable(stream) ?
Yes. 是的!(這都被你發現了!!)
Observable 其實就是 Promise++ 。在 Rx 中,你可以很簡單地把一個 Promise 轉換成一個 Observable,只需要: var stream = Rx.Observable.fromPromise(promise) ,接下來我們會使用他。Observable 和 Promise++ 的唯一區別是前者不兼容 Promise/A+ ,但是理論上來講是沒有沖突的。Promise 其實就是只有單獨一個值 的 Observable ,但后者更勝一籌的是允許多個返回值(多次 emit)。
這其實是一件很棒的事情,Promise 能做的事情,Observable 也能做。Promise 不能做的事情,Observable 還是能做。因此,如果你是 Promise 粉絲的話,那么你也應該嘗試一下 Rx 的 Observable 。
回到我們的例子中,你會看到,我們的 subscribe() 函數嵌套著另一個 subscribe() ,這很快就會形成 callback hell 。并且, responseStream 的創建依賴于 requestStream 。如果你在前面有仔細閱讀的話,我們說過 Rx 可以很簡單地讓不同 stream 之間變換和創建,現在我們要把他應用到我們的例子中。
你首先要了解的最基本的函數是 map(f) ,他會把 stream A 的每個值,傳到 f() ,然后產生新的值傳給 stream B 。那么,應用到我們例子的話:
var responseMetastream = requestStream
.map(function(requestUrl) {
return Rx.Observable.fromPromise(jQuery.getJSON(requestUrl));
});
以上,我們創建了一個叫「 metastream 」的怪獸:stream 嵌套著 stream (a stream of streams)。不用緊張,metastream 其實不過是一個 emit value 為 stream 的 stream 。你可以把他想象成一個 指針 :每個 emit 的值就是一個指向另一個 stream 的指針。

顯然,返回一個 metastream 對我們一點用都沒有,我們只想要一個 emit value 為 JSON 對象(而不是一個包含 JSON 對象的「 Promise 」)的 stream 。現在,來認識一下我們的新朋友 Mr.flatMap :他是特殊的 map ,可以 flatten 上面講到的 「 metastream 」。他通過 emit 主干(trunk stream) 的值,間接 emit 了分支(branch stream)的值。需要注意的是 flatMap 并不是一個「 fix 」,而 metastreams 更不是一個「 bug 」,他們都各自的用途。
var responseStream = requestStream
.flatMap(function(requestUrl) {
return Rx.Observable.fromPromise(jQuery.getJSON(requestUrl));
});

漂亮~ 現在我們的 response stream 是基于 request stream 而創建的。request stream 每次 emit 一個值,在 response stream 都會有相對應的值。就像這樣:
requestStream: --a-----b--c------------|->
responseStream: -----A--------B-----C---|->
(lowercase is a request, uppercase is its response)
終于,我們搞定了 response stream ,那么就可以渲染我們得到的數據了:
responseStream.subscribe(function(response) {
// render `response` to the DOM however you wish
});
整理我們以上的所有代碼,有:
var requestStream = Rx.Observable.just('https://api.github.com/users');
var responseStream = requestStream
.flatMap(function(requestUrl) {
return Rx.Observable.fromPromise(jQuery.getJSON(requestUrl));
});
responseStream.subscribe(function(response) {
// render `response` to the DOM however you wish
});
刷新按鈕
我還沒告訴你 GitHub 這個 API 返回的 JSON 對象包含了 100 用戶。他只允許我們設置 page offset 而不能設置 page size ,但是我們只需要 3 個所以浪費了剩下的 97 個。我們可以暫時先不管這個,因為后面我們會講到如何緩存 API 返回的響應。
每次點擊刷新按鈕的時候,request stream 都應該 emit 一個新的 URL ,這樣我們才能得到新的 response 。那么,我們現在需要兩樣東西:點擊刷新按鈕的 refresh click stream (咒語:anything can be a stream ),以及依賴于 refresh click stream 的 request stream 。幸運的是,RxJS 可以很簡單地創建監聽事件的 Observables 。
var refreshButton = document.querySelector('.refresh');
var refreshClickStream = Rx.Observable.fromEvent(refreshButton, 'click');
顯然, refreshClickStream 并沒有包含任何的 API URL ,所以我們需要把它們映射(map)到一個真正的 URL :
var requestStream = refreshClickStream
.map(function() {
var randomOffset = Math.floor(Math.random()*500);
return 'https://api.github.com/users?since=' + randomOffset;
});
因為我沒做自動化測試,所以之前的功能在加了新功能之后跑不起來了:在 App 啟動時并沒有發送我們的請求,只有在點擊刷新按鈕的時候發送。但是,這兩個情景我都想實現。
根據我們現在的知識,可以分別為每個情景定義一個 stream :
var requestOnRefreshStream = refreshClickStream
.map(function() {
var randomOffset = Math.floor(Math.random()*500);
return 'https://api.github.com/users?since=' + randomOffset;
});
var startupRequestStream = Rx.Observable.just('https://api.github.com/users');
可以把兩個 stream 合并成一個嗎?答案是 merge() 。用圖來解釋的話:
stream A: ---a--------e-----o----->
stream B: -----B---C-----D-------->
vvvvvvvvv merge vvvvvvvvv
---a-B---C--e--D--o----->
因此我們現在可以:
var requestOnRefreshStream = refreshClickStream
.map(function() {
var randomOffset = Math.floor(Math.random()*500);
return 'https://api.github.com/users?since=' + randomOffset;
});
var startupRequestStream = Rx.Observable.just('https://api.github.com/users');
var requestStream = Rx.Observable.merge(
requestOnRefreshStream, startupRequestStream
);
然而我們還有另外一種更加簡潔的寫法,
var requestStream = refreshClickStream
.map(function() {
var randomOffset = Math.floor(Math.random()*500);
return 'https://api.github.com/users?since=' + randomOffset;
})
.merge(Rx.Observable.just('https://api.github.com/users'));
甚至還可以更加簡短和可讀:
var requestStream = refreshClickStream
.map(function() {
var randomOffset = Math.floor(Math.random()*500);
return 'https://api.github.com/users?since=' + randomOffset;
})
.startWith('https://api.github.com/users');
startWith 顧名思義,不管 input stream 是怎樣的,output stream 的開頭都會有一個值 x ,因為我們設置了 startWith(x) 。但是我沒有遵循 DRY (Dont Repeat Youself) ,因為我重復寫了 API 兩次。如果要 fix 這個問題的話,我們可以為 refreshClickStream 設置 startWith ,他「模擬」了在應用啟動時點擊了刷新按鈕:
var requestStream = refreshClickStream.startWith('startup click')
.map(function() {
var randomOffset = Math.floor(Math.random()*500);
return 'https://api.github.com/users?since=' + randomOffset;
});
太棒了!你可以看到我們只多加了 startWith() ,和「因為我沒有做自動化測試,所以我弄壞了…」那個時候的代碼比較的話。
「三個關注用戶推薦」UI
在此之前,我們只在 responseStream 的 subscribe 函數里面渲染我們的「用戶推薦 UI 」。但現在我們有了「刷新按鈕」,就產生了一個新的問題:當你點擊了刷新按鈕,當前的三個用戶推薦不會被清除掉,而當一個新的 response 到達時,新的推薦會緊跟在之前的推薦后面渲染。因此,如果我們點擊了刷新按鈕的話,我們需要移除掉當前的推薦。
refreshClickStream.subscribe(function() {
// clear the 3 suggestion DOM elements
});
很顯然,這個的做法是不對的,而且很糟糕,因為我們現在有 兩個 subscriber 是可以修改「推薦界面」的 DOM 結構(另一個是之前的 responseStream.subscribe() ),并且這一點也不 Separation of concerns 。還記得 Reactive 的咒語?

因此,我們可以把「推薦」也看作一個 stream ,他 emit 的值是一個包含推薦數據的 JSON 對象。我們可以分別為3個推薦寫一個 stream 。下面是「推薦用戶一」的 stream :
var suggestion1Stream = responseStream
.map(function(listUsers) {
// get one random user from the list
return listUsers[Math.floor(Math.random()*listUsers.length)];
});
剩下的兩個 suggestion2Stream 和 suggestion3Stream 都可以簡單地從 suggestion1Stream 中復制過來。注意到,這一點也不 DRY ,但我不打算重構他,因為我想讓我們的例子簡單一些,并且也是一個好機會讓你思考如何才能做到 DRY 。
那么,我們現在就不用在 responseStream 的 subscribe 里面渲染「推薦界面」,而是:
suggestion1Stream.subscribe(function(suggestion) {
// render the 1st suggestion to the DOM
});
回到前面所說的「點擊刷新按鈕,移除掉當前的推薦」(即是本部分的開頭),現在我們可以把「刷新按鈕點擊」映射為一個 null 的推薦數據,然后把他加進 suggestion1Stream 里面,就像這樣:
var suggestion1Stream = responseStream
.map(function(listUsers) {
// get one random user from the list
return listUsers[Math.floor(Math.random()*listUsers.length)];
})
.merge(
refreshClickStream.map(function(){ return null; })
);
當渲染的時候,我們可以把 null 解讀為「沒有數據」,所以就隱藏了他的 UI 元素。
suggestion1Stream.subscribe(function(suggestion) {
if (suggestion === null) {
// hide the first suggestion DOM element
}
else {
// show the first suggestion DOM element
// and render the data
}
});
來看看我們現在所有的 stream 圖示:
refreshClickStream: ----------o--------o---->
requestStream: -r--------r--------r---->
responseStream: ----R---------R------R-->
suggestion1Stream: ----s-----N---s----N-s-->
suggestion2Stream: ----q-----N---q----N-q-->
suggestion3Stream: ----t-----N---t----N-t-->
上面的 N 表示 null 。
我們還可以在啟動時渲染一個空的推薦,需要在 suggestion stream 上添加一個 startWith(null) :
var suggestion1Stream = responseStream
.map(function(listUsers) {
// get one random user from the list
return listUsers[Math.floor(Math.random()*listUsers.length)];
})
.merge(
refreshClickStream.map(function(){ return null; })
)
.startWith(null);
結果就是這樣:
refreshClickStream: ----------o---------o---->
requestStream: -r--------r---------r---->
responseStream: ----R----------R------R-->
suggestion1Stream: -N--s-----N----s----N-s-->
suggestion2Stream: -N--q-----N----q----N-q-->
suggestion3Stream: -N--t-----N----t----N-t-->
關閉一個推薦 & 緩存 response
我們還需要實現一個功能:每個推薦都應該有一個「x」按鈕去關閉它,然后加載一個新的推薦。一開始我們的想法可能會這樣:當點擊關閉按鈕時,發一個新請求就可以啦:
var close1Button = document.querySelector('.close1');
var close1ClickStream = Rx.Observable.fromEvent(close1Button, 'click');
// and the same for close2Button and close3Button
var requestStream = refreshClickStream.startWith('startup click')
.merge(close1ClickStream) // we added this
.map(function() {
var randomOffset = Math.floor(Math.random()*500);
return 'https://api.github.com/users?since=' + randomOffset;
});
但這樣做是不行的,因為他會刷新所有的推薦而不是我們點擊的那個。其實有很多種方法可以解決這個問題,但是為了有趣一些,我們決定重用之前的 response stream 。還記得 API 返回的 page size 是 100 個用戶,但我們只用了 3 個,因此我們還有新的可用的數據,不需要再請求一遍。
再說一遍,讓我們 think in streams 。當「 close1 」的 click 事件觸發后,我們想要的做的是:在 responseStream 最近(the most recently)emit 的值里面,隨機一個出來:
requestStream: --r--------------->
responseStream: ------R----------->
close1ClickStream: ------------c----->
suggestion1Stream: ------s-----s----->
在 Rx* 中,有一個叫 combineLatest 的 combinator function ,他可以把 stream A 和 stream B 作為輸入,無論何時,只要其中一個 emit 了一個值, combineLatest 都會把兩個 stream 最近 emit 的值 a 和 b 組合在一起,然后輸出一個值 c = f(x, y) ( f 是一個你定義好的函數)。用圖示的話會更好理解:
stream A: --a-----------e--------i-------->
stream B: -----b----c--------d-------q---->
vvvvvvvv combineLatest(f) vvvvvvv
----AB---AC--EC---ED--ID--IQ---->
where f is the uppercase function
我們可以把 combineLatest() 應用到 close1ClickStream 和 responseStream 上,所以無論何時點擊了「關閉按鈕一」,我們都得到最近的 emit 的值,然后返回給 suggestion1Stream 。另一個方面, combineLatest() 是對稱的:無論何時 responseStream emit 了一個值,他都會組合 close1ClickStream 最近 emit 的值,然后返回給 responseStream 。這就好了,我們可以簡化之前 suggestion1Stream 的代碼:
var suggestion1Stream = close1ClickStream
.combineLatest(responseStream,
function(click, listUsers) {
return listUsers[Math.floor(Math.random()*listUsers.length)];
}
)
.merge(
refreshClickStream.map(function(){ return null; })
)
.startWith(null);
我們還差最后一塊拼圖。 combineLatest() 需要 2 個 source 的最近的值,但是如果其中一個 source 尚未 emit 任何值呢?這樣的話, combineLatest 不會產生任何的值。如果你注意到上面的圖示,你會發現:當第一個 stream emit 了 a 之后,output stream 沒有產生任何值,直到第二個 stream emit 了 b ,output stream 才有值 AB 。
同樣地,這個問題有多種方法可以解決。但是我們使用最簡單的一種:在啟動時模擬點擊「關閉按鈕一」:
var suggestion1Stream = close1ClickStream.startWith('startup click') // we added this
.combineLatest(responseStream,
function(click, listUsers) {l
return listUsers[Math.floor(Math.random()*listUsers.length)];
}
)
.merge(
refreshClickStream.map(function(){ return null; })
)
.startWith(null);
總結
終于,大功告成~ 以下是上面涉及到的所有代碼:
var refreshButton = document.querySelector('.refresh');
var refreshClickStream = Rx.Observable.fromEvent(refreshButton, 'click');
var closeButton1 = document.querySelector('.close1');
var close1ClickStream = Rx.Observable.fromEvent(closeButton1, 'click');
// and the same logic for close2 and close3
var requestStream = refreshClickStream.startWith('startup click')
.map(function() {
var randomOffset = Math.floor(Math.random()*500);
return 'https://api.github.com/users?since=' + randomOffset;
});
var responseStream = requestStream
.flatMap(function (requestUrl) {
return Rx.Observable.fromPromise($.ajax({url: requestUrl}));
});
var suggestion1Stream = close1ClickStream.startWith('startup click')
.combineLatest(responseStream,
function(click, listUsers) {
return listUsers[Math.floor(Math.random()*listUsers.length)];
}
)
.merge(
refreshClickStream.map(function(){ return null; })
)
.startWith(null);
// and the same logic for suggestion2Stream and suggestion3Stream
suggestion1Stream.subscribe(function(suggestion) {
if (suggestion === null) {
// hide the first suggestion DOM element
}
else {
// show the first suggestion DOM element
// and render the data
}
});
雖然我們的代碼很簡短,但是也實現了不少的功能:他對多個事件的管理可以做到 separation of concerns ,甚至還緩存了 responses 。函數式風格讓代碼更加 declarative(聲明式),而不是 imperative(命令式):我們沒有給出一序列的指令去執行,而是在告訴某些東西(如何定義 stream 之間的關系)。比如,Rx 告訴計算機, suggestion1Stream 是 close1ClickStream 組合 response stream 最近的一個值。并且,當點擊刷新按鈕或者啟動時, suggestion1Stream 的值為 null 。
容易注意到,我們代碼都沒有使用像 if , for , while 和 callback-based 等常用的控制流程語句。你甚至可以在 subscribe 函數里面使用 filter() ,這樣一來你也不需要 if-else 了(至于如何實現是我留給你們的練習)。在 Rx 里,我們有很多 stream 函數,比如 map , filter , scan , merge , combineLatest , startWith 等等 event-driven 應用經常用到的控制流程函數。這些函數可以讓你 write less,run more power 。
未完待續
如果你認為 Rx* 適合你用來進行 Reactive Programming 的話,可以花點時間去熟悉那些可以變換,組合,創建 Observable 的 函數 。如果你想用圖示的方式來了解這些函數的話,可以去看看 RxJava 的有圖示的文檔 。當你遇到困難的時候,可以畫圖,想一想,然后看一看文檔對函數的定義,然后再想一想。這個 workflow 在我的學習經歷中起到了很大的作用。
如果你想開始學習 Rx ,那么你必須要理解: Cold vs Hot Observables 。如果你忽略了這個,你會后悔的。記住我已經警告過你了。如果想要更深入的話,就需要學習真正的 functional programming ,以及熟悉那些會影響到 Rx 的一些 issue ,比如 side effects 。
然而,Reactive Programming 并不只是 Rx 。還有其他比如 Bacon.js ,他沒有 Rx 有時會遇到的一些怪異行為。還有 Elm 語言 :他是一種能夠編譯成 JavaScript + HTML + CSS 的 Functional Reactive Programming 語言,并且還可以 time travelling debug ,很厲害吧。
Rx 的應用場景是 event-heavy 的前端應用。但是,他不僅僅是前端的東西,同時他也能夠勝任后臺甚至數據庫。實際上,RxJava 已經成為了 Netflix 處理后臺 API 并發問題的利刃 。Rx 并不是局限于某種類型的應用或者語言,他是一種范式(paradigm),總之你可以用它來開發 event-driven 的軟件。
來自:http://qianduan.guru/2017/03/02/The-Intro-to-Reactive-Programming/