那些年我們錯過的響應式編程

jopen 9年前發布 | 87K 次閱讀 響應式 前端技術

  • 原文鏈接 : The introduction to Reactive Programming you've been missing
  • 作者 : @andrestaltz
  • 譯者 : yaoqinwei
  • 校對者: bboyfeiyuchaossss
  • 狀態 : 完成
  • </ul> </blockquote>

    相信你們在學習響應式編程這個新技術的時候都會充滿了好奇,特別是它的一些變體,例如:Rx系列、Bacon.js、RAC等等……

    在缺乏優秀資料的前提下,響應式編程的學習過程將滿是荊棘。起初,我試圖尋找一些教程,卻只找到少量的實踐指南,而且它們講的都非常淺顯,從來沒人接受圍繞響應式編程建立一個完整知識體系的挑戰。此外,官方文檔通常也不能很好地幫助你理解某些函數,因為它們通常看起來很繞,不信請看這里:

    Rx.Observable.prototype.flatMapLatest(selector, [thisArg])

    根據元素下標,將可觀察序列中每個元素一一映射到一個新的可觀察序列當中,然后...%…………%&¥#@@……&**(暈了)

    天吶,這簡直太繞了!

    我讀過兩本相關的書,一本只是在給你描繪響應式編程的偉大景象,而另一本卻只是深入到如何使用響應式庫而已。我在不斷的構建項目過程中把響應式編程了解的透徹了一些,最后以這種艱難的方式學完了響應式編程。在我工作公司的一個實際項目中我會用到它,當我遇到問題時,還可以得到同事的支持。

    學習過程中最難的部分是如何以響應式的方式來思考,更多的意味著要摒棄那些老舊的命令式和狀態式的典型編程習慣,并且強迫自己的大腦以不同的范式來運作。我還沒有在網絡上找到任何一個教程是從這個層面來剖析的,我覺得這個世界非常值得擁有一個優秀的實踐教程來教你如何以響應式編程的方式來思考,方便引導你開始學習響應式編程。然后看各種庫文檔才可以給你更多的指引。希望這篇文章能夠幫助你快速地進入響應式編程的世界。

    "什是響應式編程?"

    網絡上有一大堆糟糕的解釋和定義,如Wikipedia上通常都是些非常籠統和理論性的解釋,而Stackoverflow上的一些規范的回答顯然也不適合新手來參考,Reactive Manifesto看起來也只像是拿給你的PM或者老板看的東西,微軟的Rx術語"Rx = Observables + LINQ + Schedulers" 也顯得太過沉重,而且充滿了太多微軟式的東西,反而給我們帶來更多疑惑。相對于你使用的MV*框架以及你鐘愛的編程語言,"Reactive" 和"Propagation of change"這樣的術語并沒有傳達任何有意義的概念。當然,我的view框架能夠從model做出反應,我的改變當然也會傳播,如果沒有這些,我的界面根本就沒有東西可渲染。

    所以,不要再扯這些廢話了。

    響應式編程就是與異步數據流交互的編程范式

    一方面,這已經不是什么新事物了。事件總線(Event Buses)或一些典型的點擊事件本質上就是一個異步事件流(asynchronous event stream),這樣你就可以觀察它的變化并使其做出一些反應(do some side effects)。響應式是這樣的一個思路:除了點擊和懸停(hover)的事件外,你可以給任何事物創建數據流。數據流無處不在,任何東西都可以成為一個數據流,例如變量、用戶輸入、屬性、緩存、數據結構等等。舉個栗子,你可以把你的微博訂閱功能想象成跟點擊事件一樣的數據流,你可以監聽這樣的數據流,并做出相應的反應。

    最重要的是,你會擁有一些令人驚艷的函數去結合、創建和過濾任何一組數據流。 這就是"函數式編程"的魔力所在。一個數據流可以作為另一個數據流的輸入,甚至多個數據流也可以作為另一個數據流的輸入。你可以合并兩個數據流,也可以過濾一個數據流得到另一個只包含你感興趣的事件的數據流,還可以映射一個數據流的值到一個新的數據流里。

    數據流是整個響應式編程體系中的核心,要想學習響應式編程,當然要先走進數據流一探究竟了。那現在就讓我們先從熟悉的"點擊一個按鈕"的事件流開始

    Click event stream

    一個數據流是一個按時間排序的即將發生的事件(Ongoing events ordered in time)的序列。如上圖,它可以發出3種不同的事件(上一句已經把它們叫做事件):一個某種類型的值事件,一個錯誤事件和一個完成事件。當一個完成事件發生時,在某些情況下,我們可能會做這樣的操作:關閉包含那個按鈕的窗口或者視圖組件。

    我們只能異步捕捉被發出的事件,使得我們可以在發出一個值事件時執行一個函數,發出錯誤事件時執行一個函數,發出完成事件時執行另一個函數。有時候你可以忽略后兩個事件,只需聚焦于如何定義和設計在發出值事件時要執行的函數,監聽這個事件流的過程叫做訂閱,我們定義的函數叫做觀察者,而事件流就可以叫做被觀察的主題(或者叫被觀察者)。你應該察覺到了,對的,它就是觀察者模式

    上面的示意圖我們也可以用ASCII碼的形式重新畫一遍,請注意,下面的部分教程中我們會繼續使用這幅圖:

    --a---b-c---d---X---|->
    
    a, b, c, d 是值事件
    X 是錯誤事件
    | 是完成事件
    ---> 是時間線(軸)

    現在你對響應式編程事件流應該非常熟悉了,為了不讓你感到無聊,讓我們來做一些新的嘗試吧:我們將創建一個由原始點擊事件流演變而來的一種新的點擊事件流。

    首先,讓我們來創建一個記錄按鈕點擊次數的事件流。在常用的響應式庫中,每個事件流都會附有一些函數,例如map,filter,scan等,當你調用這其中的一個方法時,比如clickStream.map(f),它會返回基于點擊事件流的一個新事件流。它不會對原來的點擊事件流做任何的修改。這種特性叫做不可變性(immutability),而且它可以和響應式事件流搭配在一起使用,就像豆漿和油條一樣完美的搭配。這樣我們可以用鏈式函數的方式來調用,例如:clickStream.map(f).scan(g):

    clickStream: ---c----c--c----c------c-->
                   vvvvv map(c becomes 1) vvvv
                   ---1----1--1----1------1-->
                   vvvvvvvvv scan(+) vvvvvvvvv
    counterStream: ---1----2--3----4------5-->

    map(f)函數會根據你提供的f函數把原事件流中每一個返回值分別映射到新的事件流中。在上圖的例子中,我們把每一次點擊事件都映射成數字1,scan(g)函數則把之前映射的值聚集起來,然后根據x = g(accumulated, current)算法來作相應的處理,而本例的g函數其實就是簡單的加法函數。然后,當一個點擊事件發生時,counterStream函數則上報當前點擊事件總數。

    為了展示響應式編程真正的魅力,我們假設你有一個"雙擊"事件流,為了讓它更有趣,我們假設這個事件流同時處理"三次點擊"或者"多次點擊"事件,然后深吸一口氣想想如何用傳統的命令式和狀態式的方式來處理,我敢打賭,這么做會相當的討厭,其中還要涉及到一些變量來保存狀態,并且還得做一些時間間隔的調整。

    而用響應式編程的方式處理會非常的簡潔,實際上,邏輯處理部分只需要四行代碼。但是,當前階段讓我們現忽略代碼的部分,無論你是新手還是專家,看著圖表思考來理解和建立事件流將是一個非常棒的方法。

    多次點擊事件流

    圖中,灰色盒子表示將上面的事件流轉換下面的事件流的函數過程,首先根據250毫秒的間隔時間(event silence, 譯者注:無事件發生的時間段,上一個事件發生到下一個事件發生的間隔時間)把點擊事件流一段一隔開,再將每一段的一個或多個點擊事件添加到列表中(這就是這個函數:buffer(stream.throttle(250ms))所做的事情,當前我們先不要急著去理解細節,我們只需專注響應式的部分先)。現在我們得到的是多個含有事件流的列表,然后我們使用了map()中的函數來算出每一個列表長度的整數數值映射到下一個事件流當中。最后我們使用了過濾filter(x >= 2)函數忽略掉了小于1的整數。就這樣,我們用了3步操作生成了我們想要的事件流,接下來,我們就可以訂閱("監聽")這個事件并作出我們想要的操作了。

    我希望你能感受到這個示例的優雅之處。當然了,這個示例也只是響應式編程魔力的冰山一角而已,你同樣可以將這3步操作應用到不同種類的事件流中去,例如,一串API響應的事件流。另一方面,你還有非常多的函數可以使用。

    "我為什么要采用響應式編程?"

    響應式編程可以加深你代碼抽象的程度,讓你可以更專注于定義與事件相互依賴的業務邏輯,而不是把大量精力放在實現細節上,同時,使用響應式編程還能讓你的代碼變得更加簡潔。

    特別對于現在流行的webapps和mobile apps,它們的 UI 事件與數據頻繁地產生交互,在開發這些應用時使用響應式編程的優點將更加明顯。十年前,web頁面的交互是通過提交一個很長的表單數據到后端,然后再做一些簡單的前端渲染操作。而現在的Apps則演變的更具有實時性:僅僅修改一個單獨的表單域就能自動的觸發保存到后端的代碼,就像某個用戶對一些內容點了贊,就能夠實時反映到其他已連接的用戶一樣,等等。

    當今的Apps都含有豐富的實時事件來保證一個高效的用戶體驗,我們就需要采用一個合適的工具來處理,那么響應式編程就正好是我們想要的答案。

    以響應式編程方式思考的例子

    讓我們深入到一些真實的例子,一個能夠一步一步教你如何以響應式編程的方式思考的例子,沒有虛構的示例,沒有一知半解的概念。在這個教程的末尾我們將產生一些真實的函數代碼,并能夠知曉每一步為什么那樣做的原因(知其然,知其所以然)。

    我選了JavaScript和RxJS來作為本教程的編程語言,原因是:JavaScript是目前最多人熟悉的語言,而Rx系列的庫對于很多語言和平臺的運用是非常廣泛的,例如(.NET, Java, Scala, Clojure, JavaScript, Ruby, Python, C++, Objective-C/Cocoa, Groovy等等。所以,無論你用的是什么語言、庫、工具,你都能從下面這個教程中學到東西(從中受益)。

    實現一個推薦關注(Who to follow)的功能

    在推ter里有一個UI元素向你推薦你可以關注的用戶,如下圖:

    推ter Who to follow suggestions box

    我們將聚焦于模仿它的主要功能,它們是:

    • 開始階段,從API加載推薦關注的用戶賬戶數據,然后顯示三個推薦用戶
    • 點擊刷新,加載另外三個推薦用戶到當前的三行中顯示
    • 點擊每一行的推薦用戶上的'x'按鈕,清楚當前被點擊的用戶,并顯示新的一個用戶到當前行
    • 每一行顯示一個用戶的頭像并且在點擊之后可以鏈接到他們的主頁。

    我們可以先不管其他的功能和按鈕,因為它們是次要的。因為推ter最近關閉了未經授權的公共API調用,我們將用Github獲取用戶的API代替,并且以此來構建我們的UI。

    如果你想先看一下最終效果,這里有完成后的代碼

    Request和Response

    在Rx中是怎么處理這個問題呢?,在開始之前,我們要明白,(幾乎)一切都可以成為一個事件流,這就是Rx的準則(mantra)。讓我們從最簡單的功能開始:"開始階段,從API加載推薦關注的用戶賬戶數據,然后顯示三個推薦用戶"。其實這個功能沒什么特殊的,簡單的步驟分為: (1)發出一個請求,(2)獲取響應數據,(3)渲染響應數據。ok,讓我們把請求作為一個事件流,一開始你可能會覺得這樣做有些夸張,但別急,我們也得從最基本的開始,不是嗎?

    開始時我們只需做一次請求,如果我們把它作為一個數據流的話,它只能成為一個僅僅返回一個值的事件流而已。一會兒我們還會有很多請求要做,但當前,只有一個。

    --a------|->
    
    a就是字符串:'https://api.github.com/users'

    這是一個我們要請求的URL事件流。每當發生一個請求時,它將告訴我們兩件事:什么時候和做了什么事(when and what)。什么時候請求被執行,什么時候事件就被發出。而做了什么就是請求了什么,也就是請求的URL字符串。

    在Rx中,創建返回一個值的事件流是非常簡單的。其實事件流在Rx里的術語是叫"被觀察者",也就是說它是可以被觀察的,但是我發現這名字比較傻,所以我更喜歡把它叫做事件流。

    var requestStream = Rx.Observable.just('https://api.github.com/users');

    但現在,這只是一個字符串的事件流而已,并沒有做其他操作,所以我們需要在發出這個值的時候做一些我們要做的操作,可以通過訂閱(subscribing)這個事件來實現。

    requestStream.subscribe(function(requestUrl) { // execute the request jQuery.getJSON(requestUrl, function(responseData) { // ... });
    }

    注意到我們這里使用的是JQuery的AJAX回調方法(我們假設你已經很了解JQuery和AJAX了)來的處理這個異步的請求操作。但是,請稍等一下,Rx就是用來處理異步數據流的,難道它就不能處理來自請求(request)在未來某個時間響應(response)的數據流嗎?好吧,理論上是可以的,讓我們嘗試一下。

    requestStream.subscribe(function(requestUrl) { // execute the request var responseStream = Rx.Observable.create(function (observer) {
        jQuery.getJSON(requestUrl)
        .done(function(response) { observer.onNext(response); })
        .fail(function(jqXHR, status, error) { observer.onError(error); })
        .always(function() { observer.onCompleted(); });
      });
    
      responseStream.subscribe(function(response) { // do something with the response });
    }

    Rx.Observable.create()操作就是在創建自己定制的事件流,且對于數據事件(onNext())和錯誤事件(onError())都會顯示的通知該事件每一個觀察者(或訂閱者)。我們做的只是小小的封裝一下jQuery Ajax Promise而已。等等,這是否意味者jQuery Ajax Promise本質上就是一個被觀察者呢(Observable)?

    Amazed

    是的。

    Promise++就是被觀察者(Observable),在Rx里你可以使用這樣的操作:var stream = Rx.Observable.fromPromise(promise),就可以很輕松的將Promise轉換成一個被觀察者(Observable),非常簡單的操作就能讓我們現在就開始使用它。不同的是,這些被觀察者都不能兼容Promises/A+,但理論上并不沖突。一個Promise就是一個只有一個返回值的簡單的被觀察者,而Rx就遠超于Promise,它允許多個值返回。

    這樣更好,這樣更突出被觀察者至少比Promise強大,所以如果你相信Promise宣傳的東西,那么也請留意一下響應式編程能勝任些什么。

    現在回到示例當中,你應該能快速發現,我們在subscribe()方法的內部再次調用了subscribe()方法,這有點類似于回調地獄(callback hell),而且responseStream的創建也是依賴于requestStream的。在之前我們說過,在Rx里,有很多很簡單的機制來從其他事件流的轉化并創建出一些新的事件流,那么,我們也應該這樣做試試。

    現在你需要了解的一個最基本的函數是map(f),它可以從事件流A中取出每一個值,并對每一個值執行f()函數,然后將產生的新值填充到事件流B。如果將它應用到我們的請求和響應事件流當中,那我們就可以將請求的URL映射到一個響應Promises上了(偽裝成數據流)。

    var responseMetastream = requestStream
      .map(function(requestUrl) { return Rx.Observable.fromPromise(jQuery.getJSON(requestUrl));
      });

    然后,我們創造了一個叫做"metastream"的怪獸:一個裝載了事件流的事件流。先別驚慌,metastream就是每一個發出的值都是另一個事件流的事件流,你看把它想象成一個[指針(pointers)]((https://en.wikipedia.org /wiki/Pointer_(computer_programming))數組:每一個單獨發出的值就是一個_指針_,它指向另一個事件流。在我們的示例里,每一個請求URL都映射到一個指向包含響應數據的promise數據流。

    Response metastream

    一個響應的metastream,看起來確實讓人容易困惑,看樣子對我們一點幫助也沒有。我們只想要一個簡單的響應數據流,每一個發出的值是一個簡單的JSON對象就行,而不是一個'Promise' 的JSON對象。ok,讓我們來見識一下另一個函數:Flatmap,它是map()函數的另一個版本,它比metastream更扁平。一切在"主軀干"事件流發出的事件都將在"分支"事件流中發出。Flatmap并不是metastreams的修復版,metastreams也不是一個bug。它倆在Rx中都是處理異步響應事件的好工具、好幫手。

    var responseStream = requestStream
      .flatMap(function(requestUrl) { return Rx.Observable.fromPromise(jQuery.getJSON(requestUrl));
      });

    Response stream

    很贊,因為我們的響應事件流是根據請求事件流定義的,如果我們以后有更多事件發生在請求事件流的話,我們也將會在相應的響應事件流收到響應事件,就如所期待的那樣:

    requestStream:  --a-----b--c------------|->
    responseStream: -----A--------B-----C---|->
    
    (小寫的是請求事件流, 大寫的是響應事件流)

    現在,我們終于有響應的事件流了,并且可以用我們收到的數據來渲染了:

    responseStream.subscribe(function(response) { // render `response` to the DOM however you wish });

    讓我們把所有代碼合起來,看一下:

    var requestStream = Rx.Observable.just('https://api.github.com/users'); var responseStream = requestStream
      .flatMap(function(requestUrl) { return Rx.Observable.fromPromise(jQuery.getJSON(requestUrl));
      });
    
    responseStream.subscribe(function(response) { // render `response` to the DOM however you wish });

    刷新按鈕

    我還沒提到本次響應的JSON數據是含有100個用戶數據的list,這個API只允許指定頁面偏移量(page offset),而不能指定每頁大小(page size),我們只用到了3個用戶數據而浪費了其他97個,現在可以先忽略這個問題,稍后我們將學習如何緩存響應的數據。

    每當刷新按鈕被點擊,請求事件流就會發出一個新的URL值,這樣我們就可以獲取新的響應數據。這里我們需要兩個東西:點擊刷新按鈕的事件流(準則:一切都能作為事件流),我們需要將點擊刷新按鈕的事件流作為請求事件流的依賴(即點擊刷新事件流會引起請求事件流)。幸運的是,RxJS已經有了可以從事件監聽者轉換成被觀察者的方法了。

    var refreshButton = document.querySelector('.refresh'); var refreshClickStream = Rx.Observable.fromEvent(refreshButton, 'click');

    因為刷新按鈕點擊事件不會攜帶將要請求的API的URL,我們需要將每次的點擊映射到一個實際的URL上,現在我們將請求事件流轉換成了一個點擊事件流,并將每次的點擊映射成一個隨機的頁面偏移量(offset)參數來組成API的URL。

    var requestStream = refreshClickStream
      .map(function() { var randomOffset = Math.floor(Math.random()*500); return 'https://api.github.com/users?since=' + randomOffset;
      });

    因為我比較笨而且也沒有使用自動化測試,所以我剛把之前做好的一個功能搞爛了。這樣,請求在一開始的時候就不會執行,而只有在點擊事件發生時才會執行。我們需要的是兩種情況都要執行:剛開始打開網頁和點擊刷新按鈕都會執行的請求。

    我們知道如何為每一種情況做一個單獨的事件流:

    var requestOnRefreshStream = refreshClickStream
      .map(function() { var randomOffset = Math.floor(Math.random()*500); return 'https://api.github.com/users?since=' + randomOffset;
      }); var startupRequestStream = Rx.Observable.just('https://api.github.com/users');

    但是我們是否可以將這兩個合并成一個呢?沒錯,是可以的,我們可以使用merge()方法來實現。下圖可以解釋merge()函數的用處:

    stream A: ---a--------e-----o----->
    stream B: -----B---C-----D-------->
              vvvvvvvvv merge vvvvvvvvv
              ---a-B---C--e--D--o----->

    現在做起來應該很簡單:

    var requestOnRefreshStream = refreshClickStream
      .map(function() { var randomOffset = Math.floor(Math.random()*500); return 'https://api.github.com/users?since=' + randomOffset;
      }); var startupRequestStream = Rx.Observable.just('https://api.github.com/users'); var requestStream = Rx.Observable.merge(
      requestOnRefreshStream, startupRequestStream
    );

    還有一個更干凈的寫法,省去了中間事件流變量:

    var requestStream = refreshClickStream
      .map(function() { var randomOffset = Math.floor(Math.random()*500); return 'https://api.github.com/users?since=' + randomOffset;
      })
      .merge(Rx.Observable.just('https://api.github.com/users'));

    甚至可以更簡短,更具有可讀性:

    var requestStream = refreshClickStream
      .map(function() { var randomOffset = Math.floor(Math.random()*500); return 'https://api.github.com/users?since=' + randomOffset;
      })
      .startWith('https://api.github.com/users');

    startWith()函數做的事和你預期的完全一樣。無論你的輸入事件流是怎樣的,使用startWith(x)函數處理過后輸出的事件流一定是一個x開頭的結果。但是我沒有總是重復代碼( DRY),我只是在重復API的URL字符串,改進的方法是將startWith()函數挪到refreshClickStream那里,這樣就可以在啟動時,模擬一個刷新按鈕的點擊事件了。

    var requestStream = refreshClickStream.startWith('startup click')
      .map(function() { var randomOffset = Math.floor(Math.random()*500); return 'https://api.github.com/users?since=' + randomOffset;
      });

    不錯,如果你倒回到"搞爛了的自動測試"的地方,然后再對比這兩個地方,你會發現我僅僅是加了一個startWith()函數而已。

    用事件流將3個推薦的用戶數據模型化

    直到現在,在響應事件流(responseStream)的訂閱(subscribe())函數發生的渲染步驟里,我們只是稍微提及了一下推薦關注的UI。現在有了刷新按鈕,我們就會出現一個問題:當你點擊了刷新按鈕,當前的三個推薦關注用戶沒有被清楚,而只要響應的數據達到后我們就拿到了新的推薦關注的用戶數據,為了讓UI看起來更漂亮,我們需要在點擊刷新按鈕的事件發生的時候清楚當前的三個推薦關注的用戶。

    refreshClickStream.subscribe(function() { // clear the 3 suggestion DOM elements  });

    不,老兄,還沒那么快。我們又出現了新的問題,因為我們現在有兩個訂閱者在影響著推薦關注的UI DOM元素(另一個是responseStream.subscribe()),這看起來并不符合關注分離(Separation of concerns)原則,還記得響應式編程的原則么?

    Mantra

    現在,讓我們把推薦關注的用戶數據模型化成事件流形式,每個被發出的值是一個包含了推薦關注用戶數據的JSON對象。我們將把這三個用戶數據分開處理,下面是推薦關注的1號用戶數據的事件流:

    var suggestion1Stream = responseStream
      .map(function(listUsers) { // get one random user from the list return listUsers[Math.floor(Math.random()*listUsers.length)];
      });

    其他的,如推薦關注的2號用戶數據的事件流suggestion2Stream和推薦關注的3號用戶數據的事件流suggestion3Stream都可以方便的從suggestion1Stream復制粘貼就好。這里并不是重復代碼,只是為讓我們的示例更加簡單,而且我認為這是一個思考如何避免重復代碼的好案例。

    Instead of having the rendering happen in responseStream's subscribe(), we do that here:

    suggestion1Stream.subscribe(function(suggestion) { // render the 1st suggestion to the DOM });

    我們不在responseStream的subscribe()中處理渲染了,我們這樣處理:

    suggestion1Stream.subscribe(function(suggestion) { // render the 1st suggestion to the DOM });

    回到"當刷新時,清楚掉當前的推薦關注的用戶",我們可以很簡單的把刷新點擊映射為沒有推薦數據(nullsuggestion data),并且在suggestion1Stream中包含進來,如下:

    var suggestion1Stream = responseStream
      .map(function(listUsers) { // get one random user from the list return listUsers[Math.floor(Math.random()*listUsers.length)];
      })
      .merge(
        refreshClickStream.map(function(){ return null; })
      );

    當渲染時,我們將null解釋為"沒有數據",然后把UI元素隱藏起來。

    suggestion1Stream.subscribe(function(suggestion) {
      if (suggestion === null) {
        // hide the first suggestion DOM element
      }
      else {
        // show the first suggestion DOM element
        // and render the data
      }
    });


    現在我們大概的示意圖如下:

    refreshClickStream: ----------o--------o---->
         requestStream: -r--------r--------r---->
        responseStream: ----R---------R------R-->   
     suggestion1Stream: ----s-----N---s----N-s-->
     suggestion2Stream: ----q-----N---q----N-q-->
     suggestion3Stream: ----t-----N---t----N-t-->

    N代表null

    作為一種補充,我們可以在一開始的時候就渲染空的推薦內容。這通過把startWith(null)添加到推薦關注的事件流就可以了:

    var suggestion1Stream = responseStream
      .map(function(listUsers) { // get one random user from the list return listUsers[Math.floor(Math.random()*listUsers.length)];
      })
      .merge(
        refreshClickStream.map(function(){ return null; })
      )
      .startWith(null);

    結果是這樣的:

    refreshClickStream: ----------o---------o---->
         requestStream: -r--------r---------r---->
        responseStream: ----R----------R------R-->   
     suggestion1Stream: -N--s-----N----s----N-s-->
     suggestion2Stream: -N--q-----N----q----N-q-->
     suggestion3Stream: -N--t-----N----t----N-t-->

    推薦關注的關閉和使用已緩存的響應數據(responses)

    只剩這一個功能沒有實現了,每個推薦關注的用戶UI會有一個'x'按鈕來關閉自己,然后在當前的用戶數據UI中加載另一個推薦關注的用戶。最初的想法是:點擊任何關閉按鈕時都需要發起一個新的請求:

    var close1Button = document.querySelector('.close1');
    var close1ClickStream = Rx.Observable.fromEvent(close1Button, 'click');
    // and the same for close2Button and close3Button
    
    var requestStream = refreshClickStream.startWith('startup click')
      .merge(close1ClickStream) // we added this
      .map(function() {
        var randomOffset = Math.floor(Math.random()*500);
        return 'https://api.github.com/users?since=' + randomOffset;
      });


    這樣沒什么效果,這樣會關閉和重新加載全部的推薦關注用戶,而不僅僅是處理我們點擊的那一個。這里有幾種方式來解決這個問題,并且讓它變得有趣,我們將重用之前的請求數據來解決這個問題。這個API響應的每頁數據大小是100個用戶數據,而我們只使用了其中三個,所以還有一大堆未使用的數據可以拿來用,不用去請求更多數據了。

    ok,再來,我們繼續用事件流的方式來思考。當'close1'點擊事件發生時,我們想要使用最近發出的響應數據,并執行responseStream函數來從響應列表里隨機的抽出一個用戶數據來,就像下面這樣:

    requestStream: --r--------------->
       responseStream: ------R----------->
    close1ClickStream: ------------c----->
    suggestion1Stream: ------s-----s----->

    在Rx中一個組合函數叫做combineLatest,應該是我們需要的。這個函數會把數據流A和數據流B作為輸入,并且無論哪一個數據流發出一個值了,combineLatest函數就會將從兩個數據流最近發出的值a和b作為f函數的輸入,計算后返回一個輸出值(c = f(x,y)),下面的圖表會讓這個函數的過程看起來會更加清晰:

    stream A: --a-----------e--------i-------->
    stream B: -----b----c--------d-------q---->
              vvvvvvvv combineLatest(f) vvvvvvv
              ----AB---AC--EC---ED--ID--IQ---->
    
    f是轉換成大寫的函數

    這樣,我們就可以把combineLatest()函數用在close1ClickStream和responseStream上了,只要關閉按鈕被點擊,我們就可以獲得最近的響應數據,并在suggestion1Stream上產生出一個新值。另一方面,combineLatest()函數也是相對的:每當在responseStream上發出一個新的響應,它將會結合一次新的點擊關閉按鈕事件來產生一個新的推薦關注的用戶數據,這非常有趣,因為它可以給我們的suggestion1Stream簡化代碼:

    var suggestion1Stream = close1ClickStream
      .combineLatest(responseStream, function(click, listUsers) { return listUsers[Math.floor(Math.random()*listUsers.length)];
        }
      )
      .merge(
        refreshClickStream.map(function(){ return null; })
      )
      .startWith(null);

    現在,我們的拼圖還缺一小塊地方。combineLatest()函數使用了最近的兩個數據源,但是如果某一個數據源還沒有發出任何東西,combineLatest()函數就不能在輸出流上產生一個數據事件。如果你看了上面的ASCII圖表(文章中第一個圖表),你會明白當第一個數據流發出一個值a時并沒有任何的輸出,只有當第二個數據流發出一個值b的時候才會產生一個輸出值。

    這里有很多種方法來解決這個問題,我們使用最簡單的一種,也就是在啟動的時候模擬'close 1'的點擊事件:

    var suggestion1Stream = close1ClickStream.startWith('startup click') // we added this
      .combineLatest(responseStream,             
        function(click, listUsers) {l
          return listUsers[Math.floor(Math.random()*listUsers.length)];
        }
      )
      .merge(
        refreshClickStream.map(function(){ return null; })
      )
      .startWith(null);


    封裝起來

    我們完成了,下面是封裝好的完整示例代碼:

    var refreshButton = document.querySelector('.refresh');
    var refreshClickStream = Rx.Observable.fromEvent(refreshButton, 'click');
    
    var closeButton1 = document.querySelector('.close1');
    var close1ClickStream = Rx.Observable.fromEvent(closeButton1, 'click');
    // and the same logic for close2 and close3
    
    var requestStream = refreshClickStream.startWith('startup click')
      .map(function() {
        var randomOffset = Math.floor(Math.random()*500);
        return 'https://api.github.com/users?since=' + randomOffset;
      });
    
    var responseStream = requestStream
      .flatMap(function (requestUrl) {
        return Rx.Observable.fromPromise($.ajax({url: requestUrl}));
      });
    
    var suggestion1Stream = close1ClickStream.startWith('startup click')
      .combineLatest(responseStream,             
        function(click, listUsers) {
          return listUsers[Math.floor(Math.random()*listUsers.length)];
        }
      )
      .merge(
        refreshClickStream.map(function(){ return null; })
      )
      .startWith(null);
    // and the same logic for suggestion2Stream and suggestion3Stream
    
    suggestion1Stream.subscribe(function(suggestion) {
      if (suggestion === null) {
        // hide the first suggestion DOM element
      }
      else {
        // show the first suggestion DOM element
        // and render the data
      }
    });


    你可以在這里看到可演示的示例工程

    以上的代碼片段雖小但做到很多事:它適當的使用關注分離(separation of concerns)原則的實現了對多個事件流的管理,甚至做到了響應數據的緩存。這種函數式的風格使得代碼看起來更像是聲明式編程而非命令式編程:我們并不是在給一組指令去執行,只是定義了事件流之間關系來告訴它這是什么。例如,我們用Rx來告訴計算機suggestion1Stream是'close 1'事件結合從最新的響應數據中拿到的一個用戶數據的數據流,除此之外,當刷新事件發生時和程序啟動時,它就是null。

    留意一下代碼中并未出現例如if,for,while等流程控制語句,或者像JavaScript那樣典型的基于回調(callback-based)的流程控制。如果可以的話(稍候會給你留一些實現細節來作為練習),你甚至可以在subscribe()上使用filter()函數來擺脫if和else。在Rx里,我們有例如:map,filter,scan,merge,combineLatest,startWith等數據流的函數,還有很多函數可以用來控制事件驅動編程(event-driven program)的流程。這些函數的集合可以讓你使用更少的代碼實現更強大的功能。

    接下來

    如果你認為Rx將會成為你首選的響應式編程庫,接下來就需要花一些時間來熟悉一大批的函數用來變形、聯合和創建被觀察者。如果你想在事件流的圖表當中熟悉這些函數,那就來看一下這個:RxJava's very useful documentation with marble diagrams。請記住,無論何時你遇到問題,可以畫一下這些圖,思考一下,看一看這一大串函數,然后繼續思考。以我個人經驗,這樣效果很有效。

    一旦你開始使用了Rx編程,請記住,理解Cold vs Hot Observables的概念是非常必要的,如果你忽視了這一點,它就會反彈回來并殘忍的反咬你一口。我這里已經警告你了,學習函數式編程可以提高你的技能,熟悉一些常見問題,例如Rx會帶來的副作用

    但是響應式編程庫并不僅僅是Rx,還有相對容易理解的,沒有Rx那些怪癖的Bacon.jsElm Language則以它自己的方式支持響應式編程:它是一門會編譯成Javascript + HTML + CSS的響應式編程語言,并有一個time travelling debugger功能,很棒吧。

    而Rx對于像前端和App這樣需要處理大量的編程效果是非常棒的。但是它不只是可以用在客戶端,還可以用在后端或者接近數據庫的地方。事實上,RxJava就是Netflix服務端API用來處理并行的組件。Rx并不是局限于某種應用程序或者編程語言的框架,它真的是你編寫任何事件驅動程序,可以遵循的一個非常棒的編程范式。

    如果這篇教程對你有幫助, 那么就請來轉發一下吧(tweet it forward).

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!