用 RxJS 連接世界

baeme 8年前發布 | 19K 次閱讀 RxJS

這篇文章將將更多的異步業務(Http 請求) 接入我們的 Todo App 中。在例子中,會使用更多操作符(RxJS Operator) 來處理我們的業務,后續的文章中會詳細的講解這些操作符的作用和使用場景。

準備工作

首先在 GitHub - teambition/learning-rxjs: Learning RxJS step by step clone 項目所需的 seed,并 基于 article1 分支 checkout 一個你的 article2 分支。本文中所有涉及到 RxJS 的代碼將全部使用 TypeScript 編寫。

這篇文章中,我們將使用 RxJS 實現以下幾個功能:

  1. 按回車或點擊 add button 后發送一個請求,在請求返回結果后再清空輸入框,并將返回的結果變成一個 todo item。如果在請求返回結果前又一次按下回車或 add 按鈕,對比此時輸入框的值和上次發送的值是否相同,如果相同則不進行任何操作,如果不同則取消掉上次的請求并發送新的請求。
  2. 在點擊一個 todo item 的時候發送請求,間隔 300 毫秒內的點擊,只會發出一次請求。
  3. 在輸入框中每次輸入字符,都會在 200 毫秒后發送一個請求,搜索是否匹配到已存在的 todo item,如果已存在則高亮這個 todo item。如果在一次搜索的結果返回前輸入了新的字符,則取消掉前一個請求,再發一個搜索請求。

使用 switchMap 切換 Observable

為了實現需求,首先需要在原有的邏輯中加入請求的邏輯,我們可以在 lib.ts 中找到 mockHttpPost 方法:

export const mockHttpPost = (value: string): Observable<HttpResponse> => {
  return Observable.create((observer: Observer<HttpResponse>) => {
    let status = 'pending'
    const timmer = setTimeout(() => {
      const result = {
        _id: ++dbIndex, value,
        isDone: false
      }
      searchStorage.set(result._id, result)
      status = 'done'
      observer.next(result)
      observer.complete()
    }, random(10, 1000))
    return () => {
      clearTimeout(timmer)
      if (status === 'pending') {
        console.warn('post canceled')
      }
    }
  })
}

這里我并沒有真正的發送一個 http 請求,在真實的業務場景中,將請求轉化成 Observable 的過程應該是這樣的:

Observable.create(observer => {
  request(xxxx, response => {
    // success callback
    observer.next(parse(response))
    observer.complete()
  }, err => {
    // error callback
    observer.error(err)
  })
  // teardown logic
  return () => request.abort()
})

app.ts 中引入 mockHttpPost:

...
import {
  createTodoItem,
  mockHttpPost
} from './lib'

...

const item$ = input$
  .map(() => $input.value)
  .filter(r => r !== '')
  .switchMap(mockHttpPost)
  .map(data => createTodoItem(data.value))
  .do((ele: HTMLLIElement) => {
    $list.appendChild(ele)
    $input.value = ''
  })
  .publishReplay(1)
  .refCount()

修改 createTodoItem helper,讓它支持傳入 HttpResponse 格式的數據:

// lib.ts
export const createTodoItem = (data: HttpResponse) => {
  const result = <HTMLLIElement>document.createElement('LI')
  result.classList.add('list-group-item', `todo-item-${data._id}`)
  result.setAttribute('data-id', `${data._id}`)
  const innerHTML = `
    ${data.value}
    <button type="button" class="btn btn-default button-remove pull-right" aria-label="right Align">
      <span class="glyphicon glyphicon-remove" aria-hidden="true"></span>
    </button>
  `
  result.innerHTML = innerHTML
  return result
}

這樣 $item 部分的代碼可以簡化成:

const item$ = input$
  .map(() => $input.value)
  .filter(r => r !== '')
  .switchMap(mockHttpPost)
  .map(createTodoItem)
  .do((ele: HTMLLIElement) => {
    $list.appendChild(ele)
    $input.value = ''
  })
  .publishReplay(1)
  .refCount()

此時代碼運行的行為是這樣的:

  1. 直接輸入值并回車,todo item 像以前一樣被創建

  2. 輸入值,并在 todo item 生成前多次回車,可以看到請求被 cancel 了多次:

這里的 switchMap 其實是 map and switch ,而 switch 操作符的行為是:

如果 Observable 中流動的數據也是 Observable,switch 會將數據流中最新的一個 Observable 訂閱并將它的值傳遞給下一個操作符,然后取消訂閱之前的 Observable。

所以這里的 switchMap 實際是:

const item$ = input$
  .map(() => $input.value)
  .filter(r => r !== '')
  .map(mockHttpPost)
  .switch()
  .map(createTodoItem)
...

的縮寫。同樣的,之前用到的 mergeMap 也是 map and merge

如果你有興趣,可以嘗試下面的代碼觀察 switchMap 行為:

// 你可以在項目目錄下執行: npm i -g ts-node && ts-node example/switchMap.ts 觀察運行結果
import { Observable, Observer } from 'rxjs'

const stream = Observable.create((observer: Observer<number>) => {
  let i = 0
  const intervalId = setInterval(() => {
    observer.next(++i)
  }, 1000)
  return () => clearInterval(intervalId)
})

function createIntervalObservable(base: number) {
  let i = 0
  return Observable.create((observer: Observer<string>) => {
    const intervalId = setInterval(() => {
      observer.next(`base: ${base}, value: ${++i}`)
    }, 200)
    return () => {
      clearInterval(intervalId)
      console.log(`unsubscribe base: ${base}`)
    }
  })
}

stream.switchMap(createIntervalObservable)
  .subscribe(result => console.log(result))

使用 distinct* 操作符過濾數據

但這里的邏輯還有一點不足,我們輸入一個值并快速按下多次回車,前幾次的請求被 cancel ,但如果 input 的值不變我們其實不需要 cancel 掉這些請求,只需要忽略后幾次的點擊即可。可以使用 distinct 操作符實現這個需求:

const item$ = input$
  .map(() => $input.value)
  .filter(r => r !== '')
  .distinct()
  .switchMap(mockHttpPost)
  .map(createTodoItem)
  .do((ele: HTMLLIElement) => {
    $list.appendChild(ele)
    $input.value = ''
  })
  .publishReplay(1)
  .refCount()

此時,如果在請求返回前不停的按下回車,只有在 input value 改變的時候才會 cancel 上一個請求:

使用 Subject 推送數據

此時還存在一個小問題,在生成 todo item 后再輸入與上次同樣的值并按下回車,這次的值會被 distinct 操作符過濾掉。為了解決這個問題,我們可以指定 distinct 操作符的第二個參數 flushes 來清除 distinct 操作符的緩存:

import { Observable, Subject } from 'rxjs'
...

const clearInputSubject$ = new Subject<void>()

const item$ = input$
  .map(() => $input.value)
  .filter(r => r !== '')
  .distinct(null, clearInputSubject$)
  .switchMap(mockHttpPost)
  .map(createTodoItem)
  .do((ele: HTMLLIElement) => {
    $list.appendChild(ele)
    $input.value = ''
    clearInputSubject$.next()
  })
  .publishReplay(1)
  .refCount()

這里出現的 Subject 既有 Observer 的功能,也有 Observable 的功能,但又有一些區別。上一篇講過了 Observable 是 unioncast 的,也就意味著 Observable 中一個值只會發送給一個訂閱者。而 publish/share 操作符可以將它們變成 muticast 的,但它依然是 lazy 的,也就是要有訂閱者它才會執行。而這里的 Subject 與 Observable 相比,不僅是 muticast 的,而且是非 lazy 的,它可以在任意時刻任意地點推送數據,這些數據可以被任意多的訂閱者共享。

根據 Subject 的特性可以看出來,item$ 這個 publish 出來的 Observable 可以改寫成一個 Subject,有興趣的讀者可以自行嘗試(訂閱 input 并在 subscribe 中 next 值)。

使用 debounceTime 過濾重復的操作

我們已經實現了第一個需求,接下來要完成第二個 在點擊一個 todo item 的時候發送請求,在請求返回結果前的點擊都會被忽略。請求的邏輯和上一個一樣:

...
import {
  createTodoItem,
  mockToggle,
  mockHttpPost
} from './lib'
...

const toggle$ = item$.mergeMap($todoItem => {
return Observable.fromEvent<MouseEvent>($todoItem, 'click')
    .filter(e => e.target === $todoItem)
    .mapTo({
      data: {
        _id: $todoItem.dataset['id'],
        isDone: $todoItem.classList.contains('done')
      }, $todoItem
    })
})
  .switchMap(result => {
    return mockToggle(result.data._id, result.data.isDone)
      .mapTo(result.$todoItem)
  })
...

這里短時間的重復點擊,會讓前一個點擊請求取消掉,但這與我們的需求不符,我們需要的是間隔 300 毫秒內的點擊,只會發出一次請求,debounceTime 操作符可以完成這個工作:

const toggle$ = item$.mergeMap($todoItem => {
  return Observable.fromEvent<MouseEvent>($todoItem, 'click')
    .debounceTime(300)
    .filter(e => e.target === $todoItem)
    .mapTo({
      data: {
        _id: $todoItem.dataset['id'],
        isDone: $todoItem.classList.contains('done')
      }, $todoItem
    })
})
  .switchMap(result => {
    return mockToggle(result.data._id, result.data.isDone)
      .mapTo(result.$todoItem)
  })

debounce and switchMap,最小化使用你的資源

最后一個需求,需要同時使用 debounceTime 和 switchMap :

...
import {
  createTodoItem,
  mockToggle,
  mockHttpPost,
  search,
  HttpResponse
} from './lib'

...
// 后面的 search$ 與 enter 應該時從同一個 Observable 中轉換出來,這里將 input 事件的 Observable publish 成 muticast
const type$ = Observable.fromEvent<KeyboardEvent>($input, 'keydown')
  .publish()
  .refCount()

const enter$ = type$
  .filter(r => r.keyCode === 13)

...
const search$ = type$.debounceTime(200)
  .filter(evt => evt.keyCode !== 13)
  .map(result => (<HTMLInputElement>result.target).value)
  .switchMap(search)
  .do((result: HttpResponse | null) => {
    const actived = document.querySelectorAll('.active')
    Array.prototype.forEach.call(actived, (item: HTMLElement) => {
      item.classList.remove('active')
    })
    if (result) {
      const item = document.querySelector(`.todo-item-${result._id}`)
      item.classList.add('active')
    }
  })

const app$ = toggle$.merge(remove$, search$)
  .do(r => {
    console.log(r)
  })

試著用不同的速度輸入一系列的字符串,觀察控制臺的響應。在 200 毫秒內的輸入被忽略,在 response 回來之前的輸入會讓前一個 request abort 掉。如果匹配到相同的 todo item 則會高亮它。

總結

一個簡陋的 Todo App 就此完成(delete 與 toggle 類似,有興趣可以自行實現),它涵蓋了一些 RxJS 擅長的領域:

  1. 將同步/異步代碼抽象成同樣的形狀,并使用操作符加以組合

  2. 在需要的時候 cancel ,最大化節約資源

但可以明顯看出,在業務逐漸復雜以后,直接的組合 Observable 與 Observable 已經會讓數據流變得難以預測( hard to reason about ),特別是在它們互相依賴互相派生的情況更加復雜的場景下。而大家知道,Flux/Redux 非常擅長處理這種場景,后面的文章中也會講到如何運用單向數據流的思想管理 Observable,以及如何使用 Redux Observable 將 RxJS 作為 Redux 的 Epics。

 

來自:https://zhuanlan.zhihu.com/p/23464709

 

 本文由用戶 baeme 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!