探索 RxJS - 核心概念
Table of Contents generated withDocToc
- 探索 RxJS - Core Concept
探索 RxJS - Core Concept
Steam in ReactiveX
ReactiveX,又稱 Reactive Extensions(響應式擴展),其中的 X 代表各種語言。因為它實質上只是一個事件流處理庫,不需要什么其他依賴。
講一講 ReactiveX 中的 “流”:
我的理解是,Rx 通過 “流” 的概念,將事件串聯成一個個事件流,各個事件流之間還可進行 "并聯" 的作用。當某個流上的事件被調用時,就可以觸發我們設定好的監聽回調。
那么什么樣的事件可以成為流呢?答案是任何事件。無論是異步非阻塞事件(setTimeOut、網絡請求等),還是同步可阻塞事件(點擊事件、對迭代器的遍歷等),一切都是流。此時不得不祭出一張神棍圖:

事件的串聯是流。比方說,用戶對一個按鈕進行了猛烈的點擊,所有的點擊事件就是一個流;再或者,并發多個網絡請求,它們也是一個流。而 Rx 的主要作用,就是為流的處理提供了一整套的解決方案:將不同的流進行組合,或者監聽事件的觸發及時給予響應等等。
Quick Intro
RxJS 的作者曾在 The introduction to Reactive Programming you've been missing 一文中詳細講解了 RxJS 的初步使用和流的概念,我們先僅看文中的一幅圖來理解 Rx 的概念:

這是一個多次點擊事件形成的事件流,在從左往右的時間線上有很多個點擊事件,而每個點擊事件的時間間隔各不相同。通過 Rx 我們可以對流上的各個事件進行篩選,并獲取到在某一段時間內的連續點擊次數。
我們在頁面上的點擊事件就可以組成流。比如一個記錄每次點擊時坐標的流。隨機點擊頁面多次之后,可能會產生這樣一個流:

而我們可以通過 RxJS 中的方法對這個流內的各個事件進行篩選,比如選出橫坐標 x < 250 的點擊:
filter( (event) -> event.x < 250 )

也就是說,我們可以像對待 JavaScript 中可遍歷對象一樣,對流上的各個事件進行遍歷,選出符合條件的事件。這就是 Rx 的魅力所在。
主要運用場景
既然 Rx 是為了流而生的,那么最佳運用場景當然是面對一系列較復雜的事件流時了。
含有異步請求和事件觸發的混合流
比如,用戶在一個 input 內輸入文字。每次 keyup 的時候就會根據輸入內容,請求 Wikipedia 的 API 進行搜索:
var input = document.getElementById('input');
// 通過 fromEvent 以及 input keyup 事件創建一個流
var dictionarySuggest = Rx.Observable.fromEvent(input, 'keyup')
// 獲取到每次 keyup 時的input value,并通過 filter 保證其合法性
.map(function () { return input.value; })
.filter(function (text) { return !!text; })
.distinctUntilChanged()
.debounce(250)
// searchWikipedia 為異步請求方法
.flatMapLatest(searchWikipedia)
.subscribe(
// onNext
function (results) {
list = [];
list.concat(results.map(createItem));
},
// onError
function (err) {
logError(err);
}
);
我們創建了一個流來處理從用戶 keyup ,到 searchWikipedia ,再到處理網絡請求結果這一系列事件,并且在其中對事件進行了篩選判斷:
- filter 剔除掉不合法的值
- distinctUntilChanged 當用戶按下例如 左、右 這種按鈕時,不會改變 input 的值,但也會觸發 keyup 事件。這種時候就完全沒有必要重復發送異步請求了。 distinctUntilChanged 會剔除流中有著相同的值的元素
- debounce 在過了一段指定的時間還沒觸發事件時才觸發下一個事件。也就是說,在打字過程中,如果用戶在指定事件間隔(250ms)內沒有再打字,則觸發下一個事件(searchWikipedia);否則我們認為用戶在連續打字,所以不會頻繁的發送網絡請求
- flatMapLatest
- 首先,它是一個 flatMap 方法。它用一個指定的函數(searchWikipedia)對原始流中的每一項數據執行變換操作,并返回一個 Observable , flatMap 將所有的返回值組成一個新的流。
- 其次, flatMapLatest 擁有 flatMap 的全部特性。但不同的是,在 flatMapLatest 的遍歷調用過程中,如果一個事件 A 還沒有觸發完畢獲取到返回值,就觸發了下一個事件 B,則將忽略 A 返回的值。這樣,我們就可以 避免 A 異步的返回值因為返回較慢,反而覆蓋了之后 B 異步的返回值 。用圖解釋如下:

- subscribe 創建對流的監聽,并提供了成功和失敗的回調
而在傳統的編寫方法里,我們可能會創建 input 的 keyup 監聽事件,并緩存上一次的值;每次 keyup 時,要判斷當前值是否合法,并且與上一次的值不一樣。除此以外,還要創建一個定時器,每隔一段時間就用合法的值去請求 searchWikipedia 方法 --- 即便這樣,也無法保證不在用戶連續打字時發送請求。
可以看到,在我們把事件串成流并進行處理之后,要比傳統的編寫方式方便很多。
處理一系列的異步請求隊列
假設我們要讀取一個 4GB 的大文件,將其加密后寫入到一個新文件里。直接將整個文件讀到內存里再加密、寫入肯定是不行的,反之,我們依賴 RxJS 的流,創建多個讀取、加密、寫入事件,形成三個流出來:
- 文件讀取流:每次調用方法時異步讀取 64k 的文件
- 加密流:對讀取的文件進行加密
- 寫入流:將加密好的內容異步寫入新文件
- 最后對整個 observable 進行監聽
var fs = require('fs');
var Rx = require('rx');
// Read/write from stream implementation
function readAsync(fd, chunkSize) { /* impl */ }
function appendAsync(fd, buffer) { /* impl */ }
function encrypt(buffer) { /* impl */}
// 打開一個 4GB 的文件,每次只讀取 64k
var inFile = fs.openSync('4GBfile.txt', 'r+');
var outFile = fs.openSync('Encrypted.txt', 'w+');
readAsync(inFile, 2 << 15)
.map(encrypt)
.flatMap(function (data) {
return appendAsync(outFile, data);
})
.subscribe(
// onNext
function () { },
// onError
function (err) {
console.log('An error occurred while encrypting the file: %s', err.message);
fs.closeSync(inFile);
fs.closeSync(outFile);
},
// onCompleted
function () {
console.log('Successfully encrypted the file.');
fs.closeSync(inFile);
fs.closeSync(outFile);
}
);
由此可以看出,在應對較復雜的事件流或者處理多個異步事件的時候,使用 RxJS 會有一定優勢;但如果復雜度沒有這么高的時候則沒有太大的使用必要。
目前為止,本文基本介紹了 RxJS 的核心概念 --- 針對事件流的管理與掌控。
來自:https://github.com/ecmadao/Coding-Guide/blob/master/Notes/RxJS/探索RxJS-CoreConcept.md