用 RxJS 連接世界
這篇文章將將更多的異步業務(Http 請求) 接入我們的 Todo App 中。在例子中,會使用更多操作符(RxJS Operator) 來處理我們的業務,后續的文章中會詳細的講解這些操作符的作用和使用場景。
準備工作
首先在 GitHub - teambition/learning-rxjs: Learning RxJS step by step clone 項目所需的 seed,并 基于 article1 分支 checkout 一個你的 article2 分支。本文中所有涉及到 RxJS 的代碼將全部使用 TypeScript 編寫。
這篇文章中,我們將使用 RxJS 實現以下幾個功能:
- 按回車或點擊 add button 后發送一個請求,在請求返回結果后再清空輸入框,并將返回的結果變成一個 todo item。如果在請求返回結果前又一次按下回車或 add 按鈕,對比此時輸入框的值和上次發送的值是否相同,如果相同則不進行任何操作,如果不同則取消掉上次的請求并發送新的請求。
- 在點擊一個 todo item 的時候發送請求,間隔 300 毫秒內的點擊,只會發出一次請求。
- 在輸入框中每次輸入字符,都會在 200 毫秒后發送一個請求,搜索是否匹配到已存在的 todo item,如果已存在則高亮這個 todo item。如果在一次搜索的結果返回前輸入了新的字符,則取消掉前一個請求,再發一個搜索請求。
使用 switchMap 切換 Observable
為了實現需求,首先需要在原有的邏輯中加入請求的邏輯,我們可以在 lib.ts 中找到 mockHttpPost 方法:
export const mockHttpPost = (value: string): Observable<HttpResponse> => {
return Observable.create((observer: Observer<HttpResponse>) => {
let status = 'pending'
const timmer = setTimeout(() => {
const result = {
_id: ++dbIndex, value,
isDone: false
}
searchStorage.set(result._id, result)
status = 'done'
observer.next(result)
observer.complete()
}, random(10, 1000))
return () => {
clearTimeout(timmer)
if (status === 'pending') {
console.warn('post canceled')
}
}
})
}
這里我并沒有真正的發送一個 http 請求,在真實的業務場景中,將請求轉化成 Observable 的過程應該是這樣的:
Observable.create(observer => {
request(xxxx, response => {
// success callback
observer.next(parse(response))
observer.complete()
}, err => {
// error callback
observer.error(err)
})
// teardown logic
return () => request.abort()
})
在 app.ts 中引入 mockHttpPost:
...
import {
createTodoItem,
mockHttpPost
} from './lib'
...
const item$ = input$
.map(() => $input.value)
.filter(r => r !== '')
.switchMap(mockHttpPost)
.map(data => createTodoItem(data.value))
.do((ele: HTMLLIElement) => {
$list.appendChild(ele)
$input.value = ''
})
.publishReplay(1)
.refCount()
修改 createTodoItem helper,讓它支持傳入 HttpResponse 格式的數據:
// lib.ts
export const createTodoItem = (data: HttpResponse) => {
const result = <HTMLLIElement>document.createElement('LI')
result.classList.add('list-group-item', `todo-item-${data._id}`)
result.setAttribute('data-id', `${data._id}`)
const innerHTML = `
${data.value}
<button type="button" class="btn btn-default button-remove pull-right" aria-label="right Align">
<span class="glyphicon glyphicon-remove" aria-hidden="true"></span>
</button>
`
result.innerHTML = innerHTML
return result
}
這樣 $item 部分的代碼可以簡化成:
const item$ = input$
.map(() => $input.value)
.filter(r => r !== '')
.switchMap(mockHttpPost)
.map(createTodoItem)
.do((ele: HTMLLIElement) => {
$list.appendChild(ele)
$input.value = ''
})
.publishReplay(1)
.refCount()
此時代碼運行的行為是這樣的:
-
直接輸入值并回車,todo item 像以前一樣被創建
-
輸入值,并在 todo item 生成前多次回車,可以看到請求被 cancel 了多次:
這里的 switchMap 其實是 map and switch ,而 switch 操作符的行為是:
如果 Observable 中流動的數據也是 Observable,switch 會將數據流中最新的一個 Observable 訂閱并將它的值傳遞給下一個操作符,然后取消訂閱之前的 Observable。
所以這里的 switchMap 實際是:
const item$ = input$
.map(() => $input.value)
.filter(r => r !== '')
.map(mockHttpPost)
.switch()
.map(createTodoItem)
...
的縮寫。同樣的,之前用到的 mergeMap 也是 map and merge 。
如果你有興趣,可以嘗試下面的代碼觀察 switchMap 行為:
// 你可以在項目目錄下執行: npm i -g ts-node && ts-node example/switchMap.ts 觀察運行結果
import { Observable, Observer } from 'rxjs'
const stream = Observable.create((observer: Observer<number>) => {
let i = 0
const intervalId = setInterval(() => {
observer.next(++i)
}, 1000)
return () => clearInterval(intervalId)
})
function createIntervalObservable(base: number) {
let i = 0
return Observable.create((observer: Observer<string>) => {
const intervalId = setInterval(() => {
observer.next(`base: ${base}, value: ${++i}`)
}, 200)
return () => {
clearInterval(intervalId)
console.log(`unsubscribe base: ${base}`)
}
})
}
stream.switchMap(createIntervalObservable)
.subscribe(result => console.log(result))
使用 distinct* 操作符過濾數據
但這里的邏輯還有一點不足,我們輸入一個值并快速按下多次回車,前幾次的請求被 cancel ,但如果 input 的值不變我們其實不需要 cancel 掉這些請求,只需要忽略后幾次的點擊即可。可以使用 distinct 操作符實現這個需求:
const item$ = input$
.map(() => $input.value)
.filter(r => r !== '')
.distinct()
.switchMap(mockHttpPost)
.map(createTodoItem)
.do((ele: HTMLLIElement) => {
$list.appendChild(ele)
$input.value = ''
})
.publishReplay(1)
.refCount()
此時,如果在請求返回前不停的按下回車,只有在 input value 改變的時候才會 cancel 上一個請求:
使用 Subject 推送數據
此時還存在一個小問題,在生成 todo item 后再輸入與上次同樣的值并按下回車,這次的值會被 distinct 操作符過濾掉。為了解決這個問題,我們可以指定 distinct 操作符的第二個參數 flushes 來清除 distinct 操作符的緩存:
import { Observable, Subject } from 'rxjs'
...
const clearInputSubject$ = new Subject<void>()
const item$ = input$
.map(() => $input.value)
.filter(r => r !== '')
.distinct(null, clearInputSubject$)
.switchMap(mockHttpPost)
.map(createTodoItem)
.do((ele: HTMLLIElement) => {
$list.appendChild(ele)
$input.value = ''
clearInputSubject$.next()
})
.publishReplay(1)
.refCount()
這里出現的 Subject 既有 Observer 的功能,也有 Observable 的功能,但又有一些區別。上一篇講過了 Observable 是 unioncast 的,也就意味著 Observable 中一個值只會發送給一個訂閱者。而 publish/share 操作符可以將它們變成 muticast 的,但它依然是 lazy 的,也就是要有訂閱者它才會執行。而這里的 Subject 與 Observable 相比,不僅是 muticast 的,而且是非 lazy 的,它可以在任意時刻任意地點推送數據,這些數據可以被任意多的訂閱者共享。
根據 Subject 的特性可以看出來,item$ 這個 publish 出來的 Observable 可以改寫成一個 Subject,有興趣的讀者可以自行嘗試(訂閱 input 并在 subscribe 中 next 值)。
使用 debounceTime 過濾重復的操作
我們已經實現了第一個需求,接下來要完成第二個 在點擊一個 todo item 的時候發送請求,在請求返回結果前的點擊都會被忽略。請求的邏輯和上一個一樣:
...
import {
createTodoItem,
mockToggle,
mockHttpPost
} from './lib'
...
const toggle$ = item$.mergeMap($todoItem => {
return Observable.fromEvent<MouseEvent>($todoItem, 'click')
.filter(e => e.target === $todoItem)
.mapTo({
data: {
_id: $todoItem.dataset['id'],
isDone: $todoItem.classList.contains('done')
}, $todoItem
})
})
.switchMap(result => {
return mockToggle(result.data._id, result.data.isDone)
.mapTo(result.$todoItem)
})
...
這里短時間的重復點擊,會讓前一個點擊請求取消掉,但這與我們的需求不符,我們需要的是間隔 300 毫秒內的點擊,只會發出一次請求,debounceTime 操作符可以完成這個工作:
const toggle$ = item$.mergeMap($todoItem => {
return Observable.fromEvent<MouseEvent>($todoItem, 'click')
.debounceTime(300)
.filter(e => e.target === $todoItem)
.mapTo({
data: {
_id: $todoItem.dataset['id'],
isDone: $todoItem.classList.contains('done')
}, $todoItem
})
})
.switchMap(result => {
return mockToggle(result.data._id, result.data.isDone)
.mapTo(result.$todoItem)
})
debounce and switchMap,最小化使用你的資源
最后一個需求,需要同時使用 debounceTime 和 switchMap :
...
import {
createTodoItem,
mockToggle,
mockHttpPost,
search,
HttpResponse
} from './lib'
...
// 后面的 search$ 與 enter 應該時從同一個 Observable 中轉換出來,這里將 input 事件的 Observable publish 成 muticast
const type$ = Observable.fromEvent<KeyboardEvent>($input, 'keydown')
.publish()
.refCount()
const enter$ = type$
.filter(r => r.keyCode === 13)
...
const search$ = type$.debounceTime(200)
.filter(evt => evt.keyCode !== 13)
.map(result => (<HTMLInputElement>result.target).value)
.switchMap(search)
.do((result: HttpResponse | null) => {
const actived = document.querySelectorAll('.active')
Array.prototype.forEach.call(actived, (item: HTMLElement) => {
item.classList.remove('active')
})
if (result) {
const item = document.querySelector(`.todo-item-${result._id}`)
item.classList.add('active')
}
})
const app$ = toggle$.merge(remove$, search$)
.do(r => {
console.log(r)
})
試著用不同的速度輸入一系列的字符串,觀察控制臺的響應。在 200 毫秒內的輸入被忽略,在 response 回來之前的輸入會讓前一個 request abort 掉。如果匹配到相同的 todo item 則會高亮它。
總結
一個簡陋的 Todo App 就此完成(delete 與 toggle 類似,有興趣可以自行實現),它涵蓋了一些 RxJS 擅長的領域:
-
將同步/異步代碼抽象成同樣的形狀,并使用操作符加以組合
-
在需要的時候 cancel ,最大化節約資源
但可以明顯看出,在業務逐漸復雜以后,直接的組合 Observable 與 Observable 已經會讓數據流變得難以預測( hard to reason about ),特別是在它們互相依賴互相派生的情況更加復雜的場景下。而大家知道,Flux/Redux 非常擅長處理這種場景,后面的文章中也會講到如何運用單向數據流的思想管理 Observable,以及如何使用 Redux Observable 將 RxJS 作為 Redux 的 Epics。
來自:https://zhuanlan.zhihu.com/p/23464709