Node.js Stream - 進階篇

fc1503 8年前發布 | 15K 次閱讀 Node.js Node.js 開發

上篇(基礎篇)主要介紹了Stream的基本概念和用法,本篇將深入剖析背后工作原理,重點是如何實現流式數據處理和 back pressure 機制。

目錄

本篇介紹 stream 是如何實現流式數據處理的。

數據生產和消耗的媒介

為什么使用流取數據

下面是一個讀取文件內容的例子:

const fs = require('fs')
fs.readFile(file, function (err, body) {
  console.log(body)
  console.log(body.toString())
})

但如果文件內容較大,譬如在440M時,執行上述代碼的輸出為:

<Buffer 64 74 09 75 61 09 63 6f 75 6e 74 0a 0a 64 74 09 75 61 09 63 6f 75 6e 74 0a 32 30 31 35 31 32 30 38 09 4d 6f 7a 69 6c 6c 61 2f 35 2e 30 20 28 63 6f 6d ... >
buffer.js:382
    throw new Error('toString failed');
    ^

Error: toString failed
    at Buffer.toString (buffer.js:382:11)

報錯的原因是 body 這個 Buffer 對象的長度過大,導致 toString 方法失敗。

可見,這種一次獲取全部內容的做法,不適合操作大文件。

可以考慮使用流來讀取文件內容。

const fs = require('fs')
fs.createReadStream(file).pipe(process.stdout)

fs.createReadStream 創建一個可讀流,連接了源頭(上游,文件)和消耗方(下游,標準輸出)。

執行上面代碼時,流會逐次調用 fs.read ,將文件中的內容分批取出傳給下游。

在文件看來,它的內容被分塊地連續取走了。

在下游看來,它收到的是一個先后到達的數據序列。

如果不需要一次操作全部內容,它可以處理完一個數據便丟掉。

在流看來,任一時刻它都只存儲了文件中的一部分數據,只是內容在變化而已。

這種情況就像是用水管去取池子中的水。

每當用掉一點水,水管便會從池子中再取出一點。

無論水池有多大,都只存儲了與水管容積等量的水。

如何通過流取到數據

Readable 創建對象 readable 后,便得到了一個可讀流。

如果實現 _read 方法,就將流連接到一個底層數據源。

流通過調用 _read 向底層請求數據,底層再調用流的 push 方法將需要的數據傳遞過來。

當 readable 連接了數據源后,下游便可以調用 readable.read(n) 向流請求數據,同時監聽 readable 的 data 事件來接收取到的數據。

這個流程可簡述為:

read

read 方法中的邏輯可用下圖表示,后面幾節將對該圖中各環節加以說明。

push方法

消耗方調用 read(n) 促使流輸出數據,而流通過 _read() 使底層調用 push 方法將數據傳給流。

如果流在流動模式下( state.flowing 為 true )輸出數據,數據會自發地通過 data 事件輸出,不需要消耗方反復調用 read(n) 。

如果調用 push 方法時緩存為空,則當前數據即為下一個需要的數據。

這個數據可能先添加到緩存中,也可能直接輸出。

執行 read 方法時,在調用 _read 后,如果從緩存中取到了數據,就以 data 事件輸出。

所以,如果 _read 異步調用 push 時發現緩存為空,則意味著當前數據是下一個需要的數據,且不會被 read 方法輸出,應當在 push 方法中立即以 data 事件輸出。

因此,上圖中“立即輸出”的條件是:

state.flowing && state.length === 0 && !state.sync

end事件

由于流是分次向底層請求數據的,需要底層顯示地告訴流數據是否取完。

所以,當某次(執行 _read() )取數據時,調用了 push(null) ,就意味著底層數據取完。

此時,流會設置 state.ended 。

state.length 表示緩存中當前的數據量。

只有當 state.length 為 0 ,且 state.ended 為 true ,才意味著所有的數據都被消耗了。

一旦在執行 read(n) 時檢測到這個條件,便會觸發 end 事件。

當然,這個事件只會觸發一次。

readable事件

在調用完 _read() 后, read(n) 會試著從緩存中取數據。

如果 _read() 是異步調用 push 方法的,則此時緩存中的數據量不會增多,容易出現數據量不夠的現象。

如果 read(n) 的返回值為 null ,說明這次未能從緩存中取出所需量的數據。

此時,消耗方需要等待新的數據到達后再次嘗試調用 read 方法。

在數據到達后,流是通過 readable 事件來通知消耗方的。

在此種情況下, push 方法如果立即輸出數據,接收方直接監聽 data 事件即可,否則數據被添加到緩存中,需要觸發 readable 事件。

消耗方必須監聽這個事件,再調用 read 方法取得數據。

doRead

流中維護了一個緩存,當緩存中的數據足夠多時,調用 read() 不會引起 _read() 的調用,即不需要向底層請求數據。

用 doRead 來表示 read(n) 是否需要向底層取數據,其邏輯為:

var doRead = state.needReadable

if (state.length === 0 || state.length - n < state.highWaterMark) {
  doRead = true
}

if (state.ended || state.reading) {
  doRead = false
}

if (doRead) {
  state.reading = true
  state.sync = true
  if (state.length === 0) {
    state.needReadable = true
  }
  this._read(state.highWaterMark)
  state.sync = false
}

state.reading 標志上次從底層取數據的操作是否已完成。

一旦 push 方法被調用,就會設置為 false ,表示此次 _read() 結束。

state.highWaterMark 是給緩存大小設置的一個上限閾值。

如果取走 n 個數據后,緩存中保有的數據不足這個量,便會從底層取一次數據。

howMuchToRead

調用 read(n) 去取 n 個數據時, m = howMuchToRead(n) 是將從緩存中實際獲取的數據量。

根據以下幾種情況賦值,一旦確定則立即返回:

  • state.length 為0, state.ended 為 true 。
    數據源已枯竭,且緩存為空,無數據可取, m 為0.
  • state.objectMode 為 true 。
    n 為0,則 m 為0;
    否則 m 為1,將緩存的第一個元素輸出。
  • n 是數字。
    若 n <= 0 ,則 m 為0;
    若 n > state.length ,表示緩存中數據量不夠。
    此時如果還有數據可讀( state.ended 為 false ),則 m 為0,同時設置 state.needReadable ,下次執行 read() 時 doRead 會為 true ,將從底層再取數據。
    如果已無數據可讀( state.ended 為 true ),則 m 為 state.length ,將剩下的數據全部輸出。
    若 0 < n <= state.length ,則緩存中數據夠用, m 為 n 。
  • 其它情況。
    state.flowing 為 true (流動模式),則 m 為緩存中第一個元素( Buffer )的長度,實則還是將第一個元素輸出;
    否則 m 為 state.length ,將緩存讀空。

上面的規則中:

  • n 通常是 undefined 或 0 ,即不指定讀取的字節數。
  • read(0) 不會有數據輸出,但從前面對 doRead 的分析可以看出,是有可能從底層讀取數據的。
  • 執行 read() 時,由于流動模式下數據會不斷輸出,所以每次只輸出緩存中第一個元素輸出,而非流動模式則會將緩存讀空。
  • objectMode 為 true 時, m 為 0 或 1 。此時,一次 push() 對應一次 data 事件。

綜上所述:

可讀流是獲取底層數據的工具,消耗方通過調用 read 方法向流請求數據,流再從緩存中將數據返回,或以 data 事件輸出。

如果緩存中數據不夠,便會調用 _read 方法去底層取數據。

該方法在拿到底層數據后,調用 push 方法將數據交由流處理(立即輸出或存入緩存)。

可以結合 readable 事件和 read 方法來將數據全部消耗,這是暫停模式的消耗方法。

但更常見的是在流動模式下消耗數據,具體見后面的章節。

數據的流式消耗

所謂“流式數據”,是指按時間先后到達的數據序列。

數據消耗模式

可以在兩種模式下消耗可讀流中的數據:暫停模式(paused mode)和流動模式(flowing mode)。

流動模式下,數據會源源不斷地生產出來,形成“流動”現象。

監聽流的 data 事件便可進入該模式。

暫停模式下,需要顯示地調用 read() ,觸發 data 事件。

可讀流對象 readable 中有一個維護狀態的對象, readable._readableState ,這里簡稱為 state 。

其中有一個標記, state.flowing , 可用來判別流的模式。

它有三種可能值:

  • true 。流動模式。
  • false 。暫停模式。
  • null 。初始狀態。

調用 readable.resume() 可使流進入流動模式, state.flowing 被設為 true 。

調用 readable.pause() 可使流進入暫停模式, state.flowing 被設為 false 。

暫停模式

在初始狀態下,監聽 data 事件,會使流進入流動模式。

但如果在暫停模式下,監聽 data 事件并不會使它進入流動模式。

為了消耗流,需要顯示調用 read() 方法。

const Readable = require('stream').Readable

// 底層數據
const dataSource = ['a', 'b', 'c']

const readable = Readable()
readable._read = function () {
  if (dataSource.length) {
    this.push(dataSource.shift())
  } else {
    this.push(null)
  }
}

// 進入暫停模式
readable.pause()
readable.on('data', data => process.stdout.write('\ndata: ' + data))

var data = readable.read()
while (data !== null) {
  process.stdout.write('\nread: ' + data)
  data = readable.read()
}

執行上面的腳本,輸出如下:

data: a
read: a
data: b
read: b
data: c
read: c

可見,在暫停模式下,調用一次 read 方法便讀取一次數據。

執行 read() 時,如果緩存中數據不夠,會調用 _read() 去底層取。

_read 方法中可以同步或異步地調用 push(data) 來將底層數據交給流處理。

在上面的例子中,由于是同步調用 push 方法,數據會添加到緩存中。

read 方法在執行完 _read 方法后,便從緩存中取數據,再返回,且以 data 事件輸出。

如果改成異步調用 push 方法,則由于 _read() 執行完后,數據來不及放入緩存,

將出現 read() 返回 null 的現象。

見下面的示例:

const Readable = require('stream').Readable

// 底層數據
const dataSource = ['a', 'b', 'c']

const readable = Readable()
readable._read = function () {
  process.nextTick(() => {
    if (dataSource.length) {
      this.push(dataSource.shift())
    } else {
      this.push(null)
    }
  })
}

readable.pause()
readable.on('data', data => process.stdout.write('\ndata: ' + data))

while (null !== readable.read()) ;

執行上述腳本,可以發現沒有任何數據輸出。

此時,需要使用 readable 事件:

const Readable = require('stream').Readable

// 底層數據
const dataSource = ['a', 'b', 'c']

const readable = Readable()
readable._read = function () {
  process.nextTick(() => {
    if (dataSource.length) {
      this.push(dataSource.shift())
    } else {
      this.push(null)
    }
  })
}

readable.pause()
readable.on('data', data => process.stdout.write('\ndata: ' + data))

readable.on('readable', function () {
  while (null !== readable.read()) ;;
})

輸出:

data: a
data: b
data: c

當 read() 返回 null 時,意味著當前緩存數據不夠,而且底層數據還沒加進來(異步調用 push() )。

此種情況下 state.needReadable 會被設置為 true 。

push 方法被調用時,由于是暫停模式,不會立即輸出數據,而是將數據放入緩存,并觸發一次 readable 事件。

所以,一旦 read 被調用,上面的例子中就會形成一個循環: readable 事件導致 read 方法調用, read 方法又觸發 readable 事件。

首次監聽 readable 事件時,還會觸發一次 read(0) 的調用,從而引起 _read 和 push 方法的調用,從而啟動循環。

總之,在暫停模式下需要使用 readable 事件和 read 方法來消耗流。

流動模式

流動模式使用起來更簡單一些。

一般創建流后,監聽 data 事件,或者通過 pipe 方法將數據導向另一個可寫流,即可進入流動模式開始消耗數據。

尤其是 pipe 方法中還提供了 back pressure 機制,所以使用 pipe 進入流動模式的情況非常普遍。

本節解釋 data 事件如何能觸發流動模式。

先看一下 Readable 是如何處理 data 事件的監聽的:

Readable.prototype.on = function (ev, fn) {
  var res = Stream.prototype.on.call(this, ev, fn)
  if (ev === 'data' && false !== this._readableState.flowing) {
    this.resume()
  }

  // 處理readable事件的監聽
  // 省略

  return res
}

Stream 繼承自 EventEmitter ,且是 Readable 的父類。

從上面的邏輯可以看出,在將 fn 加入事件隊列后,如果發現處于非暫停模式,則會調用 this.resume() ,開始流動模式。

resume() 方法先將 state.flowing 設為 true ,

然后會在下一個tick中執行 flow ,試圖將緩存讀空:

if (state.flowing) do {
  var chunk = stream.read()
} while (null !== chunk && state.flowing)

flow 中每次 read() 都可能觸發 push() 的調用,

而 push() 中又可能觸發 flow() 或 read() 的調用,

這樣就形成了數據生生不息的流動。

其關系可簡述為:

下面再詳細看一下 push() 的兩個分支:

if (state.flowing && state.length === 0 && !state.sync) {
  stream.emit('data', chunk)
  stream.read(0)
} else {
  state.length += state.objectMode ? 1 : chunk.length
  state.buffer.push(chunk)

  if (state.needReadable)
    emitReadable(stream)
}

稱第一個分支為立即輸出。

在立即輸出的情況下,輸出數據后,執行 read(0) ,進一步引起 _read() 和 push() 的調用,從而使數據源源不斷地輸出。

在非立即輸出的情況下,數據先被添加到緩存中。此時有兩種情況:

  • state.length 為0。
    這時,在調用 _read() 前, state.needReadable 就會被設為 true 。
    因此,一定會調用 emitReadable() 。
    這個方法會在下一個tick中觸發 readable 事件,同時再調用 flow() ,從而形成流動。
  • state.length 不為0。
    由于流動模式下,每次都是從緩存中取第一個元素,所以這時 read() 返回值一定不為 null 。
    故 flow() 中的循環還在繼續。

此外,從 push() 的兩個分支可以看出來,如果 state.flowing 設為 false ,第一個分支便不會再進去,也就不會再調用 read(0) 。

同時第二個分支中引發 flow 的調用后,也不會再調用 read() ,這就完全暫停了底層數據的讀取。

事實上, pause 方法就是這樣使流從流動模式轉換到暫停模式的。

背壓反饋機制

考慮下面的例子:

const fs = require('fs')
fs.createReadStream(file).on('data', doSomething)

監聽 data 事件后文件中的內容便立即開始源源不斷地傳給 doSomething() 。

如果 doSomething 處理數據較慢,就需要緩存來不及處理的數據 data ,占用大量內存。

理想的情況是下游消耗一個數據,上游才生產一個新數據,這樣整體的內存使用就能保持在一個水平。

Readable 提供 pipe 方法,用來實現這個功能。

pipe

用 pipe 方法連接上下游:

const fs = require('fs')
fs.createReadStream(file).pipe(writable)

writable 是一個可寫流 Writable 對象,上游調用其 write 方法將數據寫入其中。

writable 內部維護了一個寫隊列,當這個隊列長度達到某個閾值( state.highWaterMark )時,

執行 write() 時返回 false ,否則返回 true 。

于是上游可以根據 write() 的返回值在流動模式和暫停模式間切換:

readable.on('data', function (data) {
  if (false === writable.write(data)) {
    readable.pause()
  }
})

writable.on('drain', function () {
  readable.resume()
})

上面便是 pipe 方法的核心邏輯。

當 write() 返回 false 時,調用 readable.pause() 使上游進入暫停模式,不再觸發 data 事件。

但是當 writable 將緩存清空時,會觸發一個 drain 事件,再調用 readable.resume() 使上游進入流動模式,繼續觸發 data 事件。

看一個例子:

const stream = require('stream')

var c = 0
const readable = stream.Readable({
  highWaterMark: 2,
  read: function () {
    process.nextTick(() => {
      var data = c < 6 ? String.fromCharCode(c + 65) : null
      console.log('push', ++c, data)
      this.push(data)
    })
  }
})

const writable = stream.Writable({
  highWaterMark: 2,
  write: function (chunk, enc, next) {
    console.log('write', chunk)
  }
})

readable.pipe(writable)

輸出:

push 1 A
write <Buffer 41>
push 2 B
push 3 C
push 4 D

雖然上游一共有6個數據( ABCDEF )可以生產,但實際只生產了4個( ABCD )。

這是因為第一個數據( A )遲遲未能寫完(未調用 next() ),所以后面通過 write 方法添加進來的數據便被緩存起來。

下游的緩存隊列到達2時, write 返回 false ,上游切換至暫停模式。

此時下游保存了 AB 。

由于 Readable 總是緩存 state.highWaterMark 這么多的數據,所以上游保存了 CD 。

從而一共生產出來 ABCD 四個數據。

下面使用 tick-node Readable 的debug信息按tick分組:

? NODE_DEBUG=stream tick-node pipe.js
STREAM 18930: pipe count=1 opts=undefined
STREAM 18930: resume
---------- TICK 1 ----------
STREAM 18930: resume read 0
STREAM 18930: read 0
STREAM 18930: need readable false
STREAM 18930: length less than watermark true
STREAM 18930: do read
STREAM 18930: flow true
STREAM 18930: read undefined
STREAM 18930: need readable true
STREAM 18930: length less than watermark true
STREAM 18930: reading or ended false
---------- TICK 2 ----------
push 1 A
STREAM 18930: ondata
write <Buffer 41>
STREAM 18930: read 0
STREAM 18930: need readable true
STREAM 18930: length less than watermark true
STREAM 18930: do read
---------- TICK 3 ----------
push 2 B
STREAM 18930: ondata
STREAM 18930: call pause flowing=true
STREAM 18930: pause
STREAM 18930: read 0
STREAM 18930: need readable true
STREAM 18930: length less than watermark true
STREAM 18930: do read
---------- TICK 4 ----------
push 3 C
STREAM 18930: emitReadable false
STREAM 18930: emit readable
STREAM 18930: flow false
---------- TICK 5 ----------
STREAM 18930: maybeReadMore read 0
STREAM 18930: read 0
STREAM 18930: need readable false
STREAM 18930: length less than watermark true
STREAM 18930: do read
---------- TICK 6 ----------
push 4 D
---------- TICK 7 ----------
  • TICK 0: readable.resume()
  • TICK 1: readable 在流動模式下開始從底層讀取數據
  • TICK 2: A 被輸出,同時執行 readable.read(0) 。
  • TICK 3: B 被輸出,同時執行 readable.read(0) 。
    writable.write('B') 返回 false 。
    執行 readable.pause() 切換至暫停模式。
  • TICK 4: TICK 3中 read(0) 引起 push('C') 的調用, C 被加到 readable 緩存中。
    此時, writable 中有 A 和 B , readable 中有 C 。
    這時已在暫停模式,但在 readable.push('C') 結束前,發現緩存中只有1個數據,小于設定的 highWaterMark (2),故準備在下一個tick再讀一次數據。
  • TICK 5: 調用 read(0) 從底層取數據。
  • TICK 6: push('D') , D 被加到 readable 緩存中。
    此時, writable 中有 A 和 B , readable 中有 C 和 D 。
    readable 緩存中有2個數據,等于設定的 highWaterMark (2),不再從底層讀取數據。

可以認為,隨著下游緩存隊列的增加,上游寫數據時受到的阻力變大。

這種 back pressure 大到一定程度時上游便停止寫,等到 back pressure 降低時再繼續。

消耗驅動的數據生產

使用 pipe() 時,數據的生產和消耗形成了一個閉環。

通過負反饋調節上游的數據生產節奏,事實上形成了一種所謂的拉式流( pull stream )。

用喝飲料來說明拉式流和普通流的區別的話,普通流就像是將杯子里的飲料往嘴里傾倒,動力來源于上游,數據是被推往下游的;拉式流則是用吸管去喝飲料,動力實際來源于下游,數據是被拉去下游的。

所以,使用拉式流時,是“按需生產”。

如果下游停止消耗,上游便會停止生產。

所有緩存的數據量便是兩者的閾值和。

當使用 Transform 作為下游時,尤其需要注意消耗。

const stream = require('stream')

var c = 0
const readable = stream.Readable({
  highWaterMark: 2,
  read: function () {
    process.nextTick(() => {
      var data = c < 26 ? String.fromCharCode(c++ + 97) : null
      console.log('push', data)
      this.push(data)
    })
  }
})

const transform = stream.Transform({
  highWaterMark: 2,
  transform: function (buf, enc, next) {
    console.log('transform', buf)
    next(null, buf)
  }
})

readable.pipe(transform)

以上代碼執行結果為:

push a
transform <Buffer 61>
push b
transform <Buffer 62>
push c
push d
push e
push f

可見,并沒有將26個字母全生產出來。

Transform 中有兩個緩存:可寫端的緩存和可讀端的緩存。

調用 transform.write() 時,如果可讀端緩存未滿,數據會經過變換后加入到可讀端的緩存中。

當可讀端緩存到達閾值后,再調用 transform.write() 則會將寫操作緩存到可寫端的緩存隊列。

當可寫端的緩存隊列也到達閾值時, transform.write() 返回 false ,上游進入暫停模式,不再繼續 transform.write() 。

所以,上面的 transform 中實際存儲了4個數據, ab 在可讀端(經過了 _transform 的處理), cd 在可寫端(還未經過 _transform 處理)。

此時,由前面一節的分析可知, readable 將緩存 ef ,之后便不再生產數據。

這三個緩存加起來的長度恰好為6,所以一共就生產了6個數據。

要想將26個數據全生產出來,有兩種做法。

第一種是消耗 transform 中可讀端的緩存,以拉動上游的生產:

readable.pipe(transform).pipe(process.stdout)

第二種是,不要將數據存入可讀端中,這樣可讀端的緩存便會一直處于數據不足狀態,上游便會源源不斷地生產數據:

const transform = stream.Transform({
  highWaterMark: 2,
  transform: function (buf, enc, next) {
    next()
  }
})

 

系列文章

 

參考文獻

來自:http://tech.meituan.com/stream-internals.html

 

 本文由用戶 fc1503 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!