Node.js Stream - 基礎篇

lebaobei 8年前發布 | 19K 次閱讀 Node.js Node.js 開發

背景

在構建較復雜的系統時,通常將其拆解為功能獨立的若干部分。這些部分的接口遵循一定的規范,通過某種方式相連,以共同完成較復雜的任務。譬如,shell通過管道|連接各部分,其輸入輸出的規范是文本流。

Node.js中,內置的Stream模塊也實現了類似功能,各部分通過.pipe()連接。

鑒于目前國內系統性介紹Stream的文章較少,而越來越多的開源工具都使用了Stream,本系列文章將從以下幾方面來介紹相關內容:

  1. 流的基本類型,以及Stream模塊的基本使用方法
  2. 流式處理與back pressure的工作原理
  3. 如何開發流式程序,包括對GulpBrowserify的剖析,以及一個實戰示例。

本文為系列文章的第一篇。

流的四種類型

Stream提供了以下四種類型的流:

var Stream = require('stream')

var Readable = Stream.Readable var Writable = Stream.Writable var Duplex = Stream.Duplex var Transform = Stream.Transform </code></pre>

使用Stream可實現數據的流式處理,如:

var fs = require('fs')
// `fs.createReadStream`創建一個`Readable`對象以讀取`bigFile`的內容,并輸出到標準輸出
// 如果使用`fs.readFile`則可能由于文件過大而失敗
fs.createReadStream(bigFile).pipe(process.stdout)

Readable

創建可讀流。

實例:流式消耗迭代器中的數據。

'use strict'
const Readable = require('stream').Readable

class ToReadable extends Readable { constructor(iterable) { super() this.iterator = new function () { yield iterable } }

// 子類需要實現該方法 // 這是生產數據的邏輯 _read() { const res = this.iterator.next() if (res.done) { // 數據源已枯竭,調用push(null)通知流 this.push(null) } else { // 通過push方法將數據添加到流中 this.push(res.value + '\n') } } }

module.exports = ToReadable </code></pre>

實際使用時,new ToReadable(iterable)會返回一個可讀流,下游可以流式的消耗迭代器中的數據。

const iterable = function *(limit) {
  while (limit--) {
    yield Math.random()
  }
}(1e10)

const readable = new ToReadable(iterable)

// 監聽data事件,一次獲取一個數據 readable.on('data', data => process.stdout.write(data))

// 所有數據均已讀完 readable.on('end', () => process.stdout.write('DONE')) </code></pre>

執行上述代碼,將會有100億個隨機數源源不斷地寫進標準輸出流。

創建可讀流時,需要繼承Readable,并實現_read方法。

  • _read方法是從底層系統讀取具體數據的邏輯,即生產數據的邏輯。
  • _read方法中,通過調用push(data)將數據放入可讀流中供下游消耗。
  • _read方法中,可以同步調用push(data),也可以異步調用。
  • 當全部數據都生產出來后,必須調用push(null)來結束可讀流。
  • 流一旦結束,便不能再調用push(data)添加數據。

可以通過監聽data事件的方式消耗可讀流。

  • 在首次監聽其data事件后,readable便會持續不斷地調用_read(),通過觸發data事件將數據輸出。
  • 第一次data事件會在下一個tick中觸發,所以,可以安全地將數據輸出前的邏輯放在事件監聽后(同一個tick中)。
  • 當數據全部被消耗時,會觸發end事件。

上面的例子中,process.stdout代表標準輸出流,實際是一個可寫流。下小節中介紹可寫流的用法。

Writable

創建可寫流。

前面通過繼承的方式去創建一類可讀流,這種方法也適用于創建一類可寫流,只是需要實現的是_write(data, enc, next)方法,而不是_read()方法。

有些簡單的情況下不需要創建一類流,而只是一個流對象,可以用如下方式去做:

const Writable = require('stream').Writable

const writable = Writable() // 實現_write方法 // 這是將數據寫入底層的邏輯 writable._write = function (data, enc, next) { // 將流中的數據寫入底層 process.stdout.write(data.toString().toUpperCase()) // 寫入完成時,調用next()方法通知流傳入下一個數據 process.nextTick(next) }

// 所有數據均已寫入底層 writable.on('finish', () => process.stdout.write('DONE'))

// 將一個數據寫入流中 writable.write('a' + '\n') writable.write('b' + '\n') writable.write('c' + '\n')

// 再無數據寫入流時,需要調用end方法 writable.end() </code></pre>

  • 上游通過調用writable.write(data)將數據寫入可寫流中。write()方法會調用_write()data寫入底層。
  • _write中,當數據成功寫入底層后,必須調用next(err)告訴流開始處理下一個數據。
  • next的調用既可以是同步的,也可以是異步的。
  • 上游必須調用writable.end(data)來結束可寫流,data是可選的。此后,不能再調用write新增數據。
  • end方法調用后,當所有底層的寫操作均完成時,會觸發finish事件。

Duplex

創建可讀可寫流。

Duplex實際上就是繼承了ReadableWritable的一類流。
所以,一個Duplex對象既可當成可讀流來使用(需要實現_read方法),也可當成可寫流來使用(需要實現_write方法)。

var Duplex = require('stream').Duplex

var duplex = Duplex()

// 可讀端底層讀取邏輯 duplex._read = function () { this._readNum = this._readNum || 0 if (this._readNum > 1) { this.push(null) } else { this.push('' + (this._readNum++)) } }

// 可寫端底層寫邏輯 duplex._write = function (buf, enc, next) { // a, b process.stdout.write('_write ' + buf.toString() + '\n') next() }

// 0, 1 duplex.on('data', data => console.log('ondata', data.toString()))

duplex.write('a') duplex.write('b')

duplex.end() </code></pre>

上面的代碼中實現了_read方法,所以可以監聽data事件來消耗Duplex產生的數據。
同時,又實現了_write方法,可作為下游去消耗數據。

因為它既可讀又可寫,所以稱它有兩端:可寫端和可讀端。
可寫端的接口與Writable一致,作為下游來使用;可讀端的接口與Readable一致,作為上游來使用。

Transform

在上面的例子中,可讀流中的數據(0, 1)與可寫流中的數據('a', 'b')是隔離開的,但在Transform中可寫端寫入的數據經變換后會自動添加到可讀端。
Tranform繼承自Duplex,并已經實現了_read_write方法,同時要求用戶實現一個_transform方法。

'use strict'

const Transform = require('stream').Transform

class Rotate extends Transform { constructor(n) { super() // 將字母旋轉n個位置 this.offset = (n || 13) % 26 }

// 將可寫端寫入的數據變換后添加到可讀端 _transform(buf, enc, next) { var res = buf.toString().split('').map(c => { var code = c.charCodeAt(0) if (c >= 'a' && c <= 'z') { code += this.offset if (code > 'z'.charCodeAt(0)) { code -= 26 } } else if (c >= 'A' && c <= 'Z') { code += this.offset if (code > 'Z'.charCodeAt(0)) { code -= 26 } } return String.fromCharCode(code) }).join('')

// 調用push方法將變換后的數據添加到可讀端
this.push(res)
// 調用next方法準備處理下一個
next()

}

}

var transform = new Rotate(3) transform.on('data', data => process.stdout.write(data)) transform.write('hello, ') transform.write('world!') transform.end()

// khoor, zruog! </code></pre>

objectMode

前面幾節的例子中,經常看到調用data.toString()。這個toString()的調用是必需的嗎?
本節介紹完如何控制流中的數據類型后,自然就有了答案。

在shell中,用管道(|)連接上下游。上游輸出的是文本流(標準輸出流),下游輸入的也是文本流(標準輸入流)。在本文介紹的流中,默認也是如此。

對于可讀流來說,push(data)時,data只能是StringBuffer類型,而消耗時data事件輸出的數據都是Buffer類型。對于可寫流來說,write(data)時,data只能是StringBuffer類型,_write(data)調用時傳進來的data都是Buffer類型。

也就是說,流中的數據默認情況下都是Buffer類型。產生的數據一放入流中,便轉成Buffer被消耗;寫入的數據在傳給底層寫邏輯時,也被轉成Buffer類型。

但每個構造函數都接收一個配置對象,有一個objectMode的選項,一旦設置為true,就能出現“種瓜得瓜,種豆得豆”的效果。

Readable未設置objectMode時:

const Readable = require('stream').Readable

const readable = Readable()

readable.push('a') readable.push('b') readable.push(null)

readable.on('data', data => console.log(data)) </code></pre>

輸出:

<Buffer 61>
<Buffer 62>

Readable設置objectMode后:

const Readable = require('stream').Readable

const readable = Readable({ objectMode: true })

readable.push('a') readable.push('b') readable.push({}) readable.push(null)

readable.on('data', data => console.log(data)) </code></pre>

輸出:

a
b
{}

可見,設置objectMode后,push(data)的數據被原樣地輸出了。此時,可以生產任意類型的數據。

系列文章

 

參考文獻

來自:http://tech.meituan.com/stream-basics.html

 

 本文由用戶 lebaobei 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!