使用 RxJS 掌控異步

3093279013 7年前發布 | 22K 次閱讀 RxJS

教你使用 RxJS 在 200 行代碼內優雅的實現文件分片斷點續傳。

本文是一系列介紹 RxJS 文章的第三篇,這一系列的文章將從一個小的例子開始,逐漸深入的講解 RxJS 在各種場景下的應用。對應的,也會有對 RxJS 各種操作符的講解。這篇文章將接著第二篇用 RxJS 連接世界 中的例子,實現一個文件分片斷點續傳的例子。在例子中,會使用更多操作符(RxJS Operator) 來處理我們的業務,后續的文章中將會詳細的講解這些操作符的作用和使用場景。

Intro

ben lesh 經常在他的各種 talking 中將 RxJS 比作 Lodash for Async 用來彰顯 RxJS 的強大異步控制能力,而 RxJS 對于異步而言確實有著媲美 lodash 之于 Array 的強大功能。與 lodash 的優秀性能類似,RxJS 在操作異步任務的時的性能也是非常優秀的,并不會因為高等級的抽象而犧牲過多的性能。本篇文章將會以一個相對復雜的異步任務為例子,逐步介紹 RxJS 如何簡潔優雅的進行復雜的異步控制。

準備工作

learning-rxjs clone 項目所需的 seed,并基于 article3-seed checkout 一個你的 article3 分支。本文中所有涉及到 RxJS 的代碼將全部使用 TypeScript 編寫。

這篇文章中,我們將使用 RxJS 實現以下功能:

article3-seed 分支附帶了一個簡單的文件上傳的 server,它的作用是實現一個簡單的文件分片上傳 API。

一共有三個 API 提供調用:

  • post /api/upload/chunk

    用來獲取文件的分片信息,上傳文件大小,文件 md5,文件修改時間和文件名

    服務端返回文件分片數量,每個分片大小和文件唯一的 fileKey

    • Request body:

    {
      fileSize: string // 文件大小
      fileMD5: string // 文件 md5
      lastUpdated: ISOString // 文件上次修改時間
      fileName: string // 文件名
    }
    • Response:

    {
      chunkSize: number // 每個分片大小
      chunks: number  // 分片數量
      fileKey: string // 唯一文件 id
    }
  • post /api/upload/chunk/:fileKey?chunk=:chunk&chunks=:chunks

    用來上傳文件分片

    • Request header: 'Content-Type': 'application/octet-stream'

    • Request body: blob

    • Response: 'ok' or error message

  • post /api/upload/chunk/:fileKey

    結算文件分片,后端會將各分片拼接成一個完整的文件并返回

    Response: 'ok' or error message

在這幾個 API 的基礎上,我們將在這篇文章中實現以下的功能

  1. 在 add 按鈕左邊增加一個按鈕,用來選擇一個文件 & (暫停 & 恢復) 文件的上傳

  2. 在增加文件后:

    • 計算文件 md5 ,文件名,文件上次修改時間,文件大小等信息

    • 調用 post /api/upload/chunk 接口,獲取文件分片信息

    • 根據獲取的分片信息對文件分片,并且調用 post /api/upload/chunk/:fileKey?chunk=:chunk&chunks=:chunks 上傳文件分片,上傳過程保證每次只有三個分片同時上傳

    • 上傳完所有的分片后,調用 post /api/upload/chunk/:fileKey 結算文件

  3. 上傳的過程中,input 框下有一個進度條顯示上傳進度

  4. 在上傳開始后,選擇文件的按鈕變成暫停按鈕,點擊則暫停上傳

  5. 點擊暫停上傳按鈕后,按鈕變成繼續上傳按鈕,點擊則在暫停的地方繼續上傳

為了實現上面的功能,并且將這些功能與之前的 todo app 區分開來,我們在 src 文件夾下新建一個 FileUploader.ts 文件并在這個文件中實現這些需求:

// FileUploader.ts
import { Observable } from 'rxjs'

// @warn memory leak
const $attachment = document.querySelector('.attachment')

export class FileUploader {

  private file$ = Observable.fromEvent($attachment, 'change')
    .map((r: Event) => (r.target as HTMLInputElement).files[0])
    .filter(f => !!f)

  uploadStream$ = this.file$
}

在 html 中加入 attachment 節點:

// index.html
...
<div class="input-group-btn">
  <label class="btn btn-default btn-file glyphicon glyphicon-paperclip attachment">
    <input type="file" style="display: none;">
  </label>
  <div class="btn btn-default button-add">Add</div>
</div>
...

調整一下樣式:

// style.css
...
.attachment {
  top: 0;
}

然后在 app.ts 中將這個我們將要實現功能的流 merge 到 app$ 中:

...
import { FileUploader } from './FileUploader'
...
const uploader = new FileUploader()

const app$ = toggle$.merge(remove$, search$, uploader.uploadStream$)
  .do(r => {
    console.log(r)
  })

app$.subscribe()

這個時候通過 attachment 按鈕選擇一個文件,就已經可以在控制臺中看到從 app$ 中流出的 file了:

獲取文件分片信息

我們使用 FileReader + spark-md5 計算文件的 md5 信息,其它信息直接可以從 File 對象上拿到。而這里的 FileReader 讀取文件是一個異步的過程,我們將它封裝成 Observable 以便和 uploadStream$ 組合:

import { Observable, Observer } from 'rxjs'
// spark-md5 沒有第三方 .d.ts 文件,這里用 commonjs 風格的 require 它
// 如果未再 tsconfig.json 中設置 noImplicitAny: true 且 TypeScript 版本大于 2.1 則也可以用
// import * as SparkMD5 from 'spark-md5' 的方式引用
const SparkMD5 = require('spark-md5')
const $attachment = document.querySelector('.attachment')

interface FileInfo {
  fileSize: number
  fileMD5: string
  lastUpdated: string
  fileName: string
}

export class FileUploader {

  private file$ = Observable.fromEvent($attachment, 'change')
    .map((r: Event) => (r.target as HTMLInputElement).files[0])
    .filter(f => !!f)

  uploadStream$ = this.file$
    .switchMap(this.readFileInfo)

  private readFileInfo(file: File): Observable<{ file: File, fileinfo: FileInfo }> {
    const reader = new FileReader()
    const spark = new SparkMD5.ArrayBuffer()
    reader.readAsArrayBuffer(file)
    return Observable.create((observer: Observer<{ file: File, fileinfo: FileInfo }>) => {
      reader.onload = (e: Event) => {
        spark.append((e.target as FileReader).result)
        const fileMD5 = spark.end()
        observer.next({
          file, fileinfo: {
            fileMD5, fileSize: file.size,
            lastUpdated: file.lastModifiedDate.toISOString(),
            fileName: file.name
          }
        })
        observer.complete()
      }
      return () => {
        if (!reader.result) {
          console.warn('read file aborted')
          reader.abort()
        }
      }
    })
  }
}

此時已經可以看到文件的 FileInfo可以從 app$ 中流出:

再使用文件信息通過 post /api/upload/chunk 接口獲取文件的分片信息:

...
const apiHost = 'http://127.0.0.1:5000/api'
...

interface ChunkMeta {
  fileSize: number
  chunkSize: number
  chunks: number
  fileKey: string
}
...

export class FileUploader {
  ...
  uploadStream$ = this.file$
    .switchMap(this.readFileInfo)
    .switchMap(i => Observable.ajax
      .post(`${apiHost}/upload/chunk`, i.fileinfo)
    )
}

分片上傳

獲取分片信息之后,我們首先要做的事情是將文件按照分片信息分片,做一個 slice 方法來將文件分片:

...
export class FileUploader {
  ...
  uploadStream$ = this.file$
    .switchMap(this.readFileInfo)
    .switchMap(i => Observable.ajax
      .post(`${apiHost}/upload/chunk`, i.fileinfo)
      .map((r) => {
        const blobs = this.slice(i.file, r.response.chunks, r.response.chunkSize)
        return { blobs, chunkMeta: r.response }
      })
    )

  ...

  private slice(file: File, n: number, chunkSize: number): Blob[] {
    const result: Blob[] = []
    for (let i = 0; i < n; i ++) {
      const startSize = i * chunkSize
      const slice = file.slice(startSize, i === n - 1 ? startSize + (file.size - startSize) : (i + 1) * chunkSize)
      result.push(slice)
    }
    return result
  }
}

這時,我們就能看到分片后的 blobs 和 meta 信息:

將文件切片完成之后,我們需要實現一個上傳分片的方法:

...
export class FileUploader {
  ...

  uploadStream$ = this.file$
    .switchMap(this.readFileInfo)
    .switchMap(i => Observable.ajax
      .post(`${apiHost}/upload/chunk`, i.fileinfo)
      .map((r) => {
        const blobs = this.slice(i.file, r.response.chunks, r.response.chunkSize)
        return { blobs, chunkMeta: r.response }
      })
    )
    .switchMap(({ blobs, chunkMeta }) => {
      const dists = blobs.map((blob, index) => this.uploadChunk(chunkMeta, index, blob))
      const uploadStream = Observable.from(dists)
        .mergeAll(this.concurrency)

      return Observable.forkJoin(uploadStream)
        .mapTo(chunkMeta)
    })

  constructor(
    private concurrency = 3
  ) { }
  ...
  private uploadChunk(meta: ChunkMeta, index: number, blob: Blob) {
    const host = `${apiHost}/upload/chunk/${meta.fileKey}?chunk=${index + 1}&chunks=${meta.chunks}`
    return Observable.ajax({
      url: host,
      body: blob,
      method: 'post',
      crossDomain: true,
      headers: { 'Content-Type': 'application/octet-stream' }
    })
  }
}

這里的 uploadChunk 是上傳單個文件分片的方法,uploadStream$ 中最后面一個 switchMap 中的邏輯是使用 mergeAll 操作符將所有上傳的流 merge 成一個 Observable,行為就是并發的上傳所有的分片。而下面的 forkJoin 操作符則是等 merge 之后的 uploadStream complete 之后再 emit 一個結果。這里的 mergeAll + forkJoin 的用法其實與 Promise.all 的行為非常類似,這里也可以寫成:

...
const dists = blobs.map((blob, index) => this.uploadChunk(chunkMeta, index, blob))

return Observable.forkJoin(... dists)
  .mapTo(chunkMeta)
...

但我們有一個需求是 上傳過程保證每次只有三個分片同時上傳 , 所以需要使用 mergeAll 方法并傳入 concurrency = 3 來控制并發數量,現在可以選擇一個文件在 Devtool 上觀察上傳的行為。如果程序沒有出問題行為應該是:并發上傳文件分片,并且永遠只有 3 個分片同時上傳,在上傳完所有分片后 app$ 中流出 chunkMeta 數據。

最后,我們只需要結算這些分片,這個文件就算上傳完成了:

...
export class FileUploader {
  ...
  uploadStream$ = this.file$
    .switchMap(this.readFileInfo)
    .switchMap(i => Observable.ajax
      .post(`${apiHost}/upload/chunk`, i.fileinfo)
      .map((r) => {
        const blobs = this.slice(i.file, r.response.chunks, r.response.chunkSize)
        return { blobs, chunkMeta: r.response }
      })
    )
    .switchMap(({ blobs, chunkMeta }) => {
      const dists = blobs.map((blob, index) => this.uploadChunk(chunkMeta, index, blob))
      const uploadStream = Observable.from(dists)
        .mergeAll(this.concurrency)

      return Observable.forkJoin(uploadStream)
        .mapTo(chunkMeta)
    })
    .switchMap((r: ChunkMeta) => Observable.ajax.post(`${apiHost}/upload/chunk/${r.fileKey}`)
      .mapTo({
        action: 'UPLOAD_SUCCESS',
        payload: r
      })
    )
}

這時,選擇一個文件后,可以看到它被分片上傳,并且在結算后在 $app 中發送了一條數據:

{
  "action": "UPLOAD_SUCCESS",
  "payload": {
    "chunkSize": 1048576,
    "chunks": 26,
    "fileKey": "00a12bdc10449d8ec93883a7d45292a30c",
    "fileSize": 26621938
  }
}

并且在項目的 chunks 文件夾下面可以找到這個被結算的文件。

進度條

為了實現在界面中實時顯示進度條,我們先要在 index.html 中加入進度條標簽:

// index.html
...
 <div class="progress">
   <div class="progress-bar progress-bar-success" role="progressbar" aria-valuenow="0" aria-valuemin="0" aria-valuemax="100" style="width: 0%">
     <span>0%</span>
   </div>
</div>
...

調整一下樣式中文字的顏色:

// style.css
...
.progress-bar > span {
  color: black;
}

這個時候界面看起來應該是這樣的:

要獲取總體的上傳進度,必須先獲取單個分片的上傳進度,Observable.ajax 有一個方法可以獲取 progress:..

...
import { Observable, Observer, Subscriber } from 'rxjs'
...

export class FileUploader {
  ...

  private uploadChunk(meta: ChunkMeta, index: number, blob: Blob): Observable<ProgressEvent> {
    const host = `${apiHost}/upload/chunk/${meta.fileKey}?chunk=${index + 1}&chunks=${meta.chunks}`
    return Observable.create((subscriber: Subscriber<ProgressEvent>) => {
      const ajax$ = Observable.ajax({
        url: host,
        body: blob,
        method: 'post',
        crossDomain: true,
        headers: { 'Content-Type': 'application/octet-stream' },
        progressSubscriber: subscriber
      })
      const subscription = ajax$.subscribe()
      return () => subscription.unsubscribe()
    })
  }
}

這樣一來我們就可以在 uploadSteram$ 中計算總體的上傳進度了:

...
export class FileUploader {

  uploadStream$ = this.file$
    .switchMap(this.readFileInfo)
    .switchMap(i => Observable.ajax
      .post(`${apiHost}/upload/chunk`, i.fileinfo)
      .map((r) => {
        const blobs = this.slice(i.file, r.response.chunks, r.response.chunkSize)
        return { blobs, chunkMeta: r.response }
      })
    )
    .switchMap(({ blobs, chunkMeta }) => {
      const uploaded: number[] = []
      const dists = blobs.map((blob, index) => {
        let currentLoaded = 0
        return this.uploadChunk(chunkMeta, index, blob)
          .do(r => {
            currentLoaded = r.loaded / chunkMeta.fileSize
            uploaded[index] = currentLoaded
            const percent = uploaded.reduce((acc, val) => acc + (val ? val : 0))
            const p = Math.round(percent * 100)
            $progressBar.style.width = `${p}%`
            $progressBar.firstElementChild.textContent = `${p > 1 ? p - 1 : p} %`
          })
      })

      const uploadStream = Observable.from(dists)
        .mergeAll(this.concurrency)

      return Observable.forkJoin(uploadStream)
        .mapTo(chunkMeta)
    })
    .switchMap((r: ChunkMeta) => Observable.ajax.post(`${apiHost}/upload/chunk/${r.fileKey}`)
      .mapTo({
        action: 'UPLOAD_SUCCESS',
        payload: r
      })
    )
    .do(() => {
      $progressBar.firstElementChild.textContent = '100 %'
    })
}

這個時候我們可以在界面中看到文件分片上傳的進度了。

而一般為了方便使用與調試,我們一般將所有的類似:

{
  action: 'UPLOAD_SUCCESS',
  payload: {
    chunkSize: 1048576,
    chunks: 26,
    fileKey: "00a12bdc10449d8ec93883a7d45292a30c",
    fileSize: 26621938
  }
}

的 local state 放在一個流里面:

import { Observable, Subscriber, Subject } from 'rxjs'
...
type Action = 'pause' | 'resume' | 'progress' | 'complete'
...
export class FileUploader {
  ...
  private action$ = new Subject<{
    name: Action
    payload?: any
  }>()

  private progress$ = this.action$
    .filter(action => action.name === 'progress')
    .map(action => action.payload)
    .do(r => {
      const percent = Math.round(r * 100)
      $progressBar.style.width = `${percent}%`
      $progressBar.firstElementChild.textContent = `${percent > 1 ? percent - 1 : percent} %`
    })
    .map(r => ({ action: 'PROGRESS', payload: r }))


  uploadStream$ = this.file$
    ...

        return this.uploadChunk(chunkMeta, index, blob)
          .do(r => {
            currentLoaded = r.loaded / chunkMeta.fileSize
            uploaded[index] = currentLoaded
            const percent = uploaded.reduce((acc, val) => acc + (val ? val : 0))
            this.action$.next({ name: 'progress', payload: percent })
          })
    ...
    .merge(this.progerss$)
}

這時控制臺會出現更直觀的調試信息:

暫停,續傳

根據需求,我們在選擇文件后,選擇文件的按鈕將會變成一個暫停按鈕,我們可以用 Observable.fromEvent來實現這個需求:

...
export class FileUploader {
  ...

  private click$ = Observable.fromEvent($attachment, 'click')
    .map((e: Event) => e.target)
    .filter((e: HTMLElement) => e === $attachment)
    .scan((acc: number, val: HTMLElement) => {
      if (val.classList.contains('glyphicon-paperclip')) {
        return 1
      }
      if (acc === 2) {
        return 3
      }
      return 2
    }, 3)
    .filter(v => v !== 1)
    .do((v) => {
      if (v === 2) {
        this.action$.next({ name: 'pause' })
        $attachment.classList.remove('glyphicon-pause')
        $attachment.classList.add('glyphicon-play')
      } else {
        this.action$.next({ name: 'resume' })
        this.buildPauseIcon()
      }
    })

  uploadStream$ = this.file$
    .switchMap...
    .switchMap...
    .do(() => this.buildPauseIcon())
    ...
    .do(() => {
      $progressBar.firstElementChild.textContent = '100 %'
      // restore icon
      $attachment.classList.remove('glyphicon-pause')
      $attachment.classList.add('glyphicon-paperclip');
      ($attachment.firstElementChild as HTMLInputElement).disabled = false
    })
    .merge(this.progress$, this.click$)

  // side effect
  private buildPauseIcon() {
    $attachment.classList.remove('glyphicon-paperclip')
    $attachment.classList.add('glyphicon-pause');
    ($attachment.firstElementChild as HTMLInputElement).disabled = true
  }
}

這段代碼用到涉及到的概念比較多,我們一點點來理解:

在 uploadStream$ 的兩個 switchMap 下插入了一個 do 操作符,這段代碼的作用是將文件上傳的圖標變成暫停的圖標。

然后我們新建了一個 click$ 流,為了防止事件冒泡導致的重復推送值,我們用 map + filter 過濾掉了子節點冒泡上來的事件。而為了區分點擊的是 上傳文件按鈕還是 暫停按鈕還是 繼續按鈕,我們用 1,2,3 三個值代表三個不同的點擊事件,并使用 scan操作符不停的生成這三個狀態。scan 的行為與 Array#reduce 非常相似,它接受一個 accumulator 不停的根據當前的值和狀態累加出新的狀態(沒錯,和 Redux 中的 reducer 行為一致)。而在下面的 do 操作符中我們根據不同的狀態改變按鈕的icon 。

這個時候我們觀察上傳的過程中,點擊暫停/繼續,圖標的狀態可以正確切換了。并且在上傳完成后圖標也被恢復成上傳文件的初始狀態了。

為了讓整個文件上傳可以暫停與繼續,我們在 uploadChunk 下使用 takeUntil 與 repeatWhen & retryWhen 操作符:

...
export class FileUploader {
   ...
   private action$ = ...
   private pause$ = this.action$.filter(ac => ac.name === 'pause')
   private resume$ = this.action$.filter(ac => ac.name === 'resume')
   private progress$ = this.action$
     ...
     .distinctUntilChanged((x: number, y: number) => x - y >= 0)
     ...

   ...

   private uploadChunk(meta: ChunkMeta, index: number, blob: Blob): Observable<ProgressEvent> {
     ...
     return Observable.create(
       ...
       const ajax$ = Observable.ajax({
        ...
       })
          .takeUntil(this.pause$)
          .repeatWhen(() => this.resume$)
       const subscription = ajax$.subscribe()
       return () => subscription.unsubscribe()
     )
       .retryWhen(() => this.resume$)
   }
}

takeUntil 操作符接受一個 Observable ,它在這個 Observable 發射值的時候終止上面的 Observable

repeatWhenretryWhen 操作符都是接受一個 projectFunc ,它返回一個 Observable 并在這個 Observable 發射值的時候 重復/重試。

而在暫停恢復的過程中,進度條的數字可能顯示錯誤:上傳了一部分的請求被 abort,它的 progress 已經計算過一次了,重試的時候是重新上傳,則可能會導致進度條后退,這時我們在 progress$ 后面用 distinctUntilChanged 方法即可實現 只有在進度增長的時候發射值 這一效果。

結語

這是一篇超級抽象的文章,并且受限于未使用框架,在程序中使用了大量的副作用操作do,總體看起來并沒有特別優雅。真正優雅的 FRP 應該是將 RxJS 與 Redux + React 這樣的框架結合起來,那時這個文件上傳的組件就可以有更優雅的寫法。當然它的功能并不完備,很多 edge case 例如各個步驟中的異常處理都沒有做,但沒有關系,這里只起到一個示范作用來展示 RxJS 在處理異步上的強大功能,并且讓初學者有機會親手把玩 RxJS 的各種操作符并實現一個復雜的異步場景。在后面的文章中,將會深入前面這三篇文章中涉及到或未涉及到的各種操作符,逐漸撥開 RxJS 的迷霧。

 

來自:https://zhuanlan.zhihu.com/p/25059824

 

 本文由用戶 3093279013 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!