Node.js Stream - 基礎篇
背景
在構建較復雜的系統時,通常將其拆解為功能獨立的若干部分。這些部分的接口遵循一定的規范,通過某種方式相連,以共同完成較復雜的任務。譬如,shell通過管道|
連接各部分,其輸入輸出的規范是文本流。
在Node.js中,內置的Stream模塊也實現了類似功能,各部分通過.pipe()
連接。
鑒于目前國內系統性介紹Stream的文章較少,而越來越多的開源工具都使用了Stream,本系列文章將從以下幾方面來介紹相關內容:
- 流的基本類型,以及Stream模塊的基本使用方法
- 流式處理與back pressure的工作原理
- 如何開發流式程序,包括對Gulp與Browserify的剖析,以及一個實戰示例。
本文為系列文章的第一篇。
流的四種類型
Stream提供了以下四種類型的流:
var Stream = require('stream')
var Readable = Stream.Readable
var Writable = Stream.Writable
var Duplex = Stream.Duplex
var Transform = Stream.Transform
</code></pre>
使用Stream
可實現數據的流式處理,如:
var fs = require('fs')
// `fs.createReadStream`創建一個`Readable`對象以讀取`bigFile`的內容,并輸出到標準輸出
// 如果使用`fs.readFile`則可能由于文件過大而失敗
fs.createReadStream(bigFile).pipe(process.stdout)
Readable
創建可讀流。
實例:流式消耗迭代器中的數據。
'use strict'
const Readable = require('stream').Readable
class ToReadable extends Readable {
constructor(iterable) {
super()
this.iterator = new function () {
yield iterable
}
}
// 子類需要實現該方法
// 這是生產數據的邏輯
_read() {
const res = this.iterator.next()
if (res.done) {
// 數據源已枯竭,調用push(null)
通知流
this.push(null)
} else {
// 通過push
方法將數據添加到流中
this.push(res.value + '\n')
}
}
}
module.exports = ToReadable
</code></pre>
實際使用時,new ToReadable(iterable)
會返回一個可讀流,下游可以流式的消耗迭代器中的數據。
const iterable = function *(limit) {
while (limit--) {
yield Math.random()
}
}(1e10)
const readable = new ToReadable(iterable)
// 監聽data
事件,一次獲取一個數據
readable.on('data', data => process.stdout.write(data))
// 所有數據均已讀完
readable.on('end', () => process.stdout.write('DONE'))
</code></pre>
執行上述代碼,將會有100億個隨機數源源不斷地寫進標準輸出流。
創建可讀流時,需要繼承Readable
,并實現_read
方法。
_read
方法是從底層系統讀取具體數據的邏輯,即生產數據的邏輯。
- 在
_read
方法中,通過調用push(data)
將數據放入可讀流中供下游消耗。
- 在
_read
方法中,可以同步調用push(data)
,也可以異步調用。
- 當全部數據都生產出來后,必須調用
push(null)
來結束可讀流。
- 流一旦結束,便不能再調用
push(data)
添加數據。
可以通過監聽data
事件的方式消耗可讀流。
- 在首次監聽其
data
事件后,readable
便會持續不斷地調用_read()
,通過觸發data
事件將數據輸出。
- 第一次
data
事件會在下一個tick中觸發,所以,可以安全地將數據輸出前的邏輯放在事件監聽后(同一個tick中)。
- 當數據全部被消耗時,會觸發
end
事件。
上面的例子中,process.stdout
代表標準輸出流,實際是一個可寫流。下小節中介紹可寫流的用法。
Writable
創建可寫流。
前面通過繼承的方式去創建一類可讀流,這種方法也適用于創建一類可寫流,只是需要實現的是_write(data, enc, next)
方法,而不是_read()
方法。
有些簡單的情況下不需要創建一類流,而只是一個流對象,可以用如下方式去做:
const Writable = require('stream').Writable
const writable = Writable()
// 實現_write
方法
// 這是將數據寫入底層的邏輯
writable._write = function (data, enc, next) {
// 將流中的數據寫入底層
process.stdout.write(data.toString().toUpperCase())
// 寫入完成時,調用next()
方法通知流傳入下一個數據
process.nextTick(next)
}
// 所有數據均已寫入底層
writable.on('finish', () => process.stdout.write('DONE'))
// 將一個數據寫入流中
writable.write('a' + '\n')
writable.write('b' + '\n')
writable.write('c' + '\n')
// 再無數據寫入流時,需要調用end
方法
writable.end()
</code></pre>
- 上游通過調用
writable.write(data)
將數據寫入可寫流中。write()
方法會調用_write()
將data
寫入底層。
- 在
_write
中,當數據成功寫入底層后,必須調用next(err)
告訴流開始處理下一個數據。
next
的調用既可以是同步的,也可以是異步的。
- 上游必須調用
writable.end(data)
來結束可寫流,data
是可選的。此后,不能再調用write
新增數據。
- 在
end
方法調用后,當所有底層的寫操作均完成時,會觸發finish
事件。
Duplex
創建可讀可寫流。
Duplex
實際上就是繼承了Readable
和Writable
的一類流。
所以,一個Duplex
對象既可當成可讀流來使用(需要實現_read
方法),也可當成可寫流來使用(需要實現_write
方法)。
var Duplex = require('stream').Duplex
var duplex = Duplex()
// 可讀端底層讀取邏輯
duplex._read = function () {
this._readNum = this._readNum || 0
if (this._readNum > 1) {
this.push(null)
} else {
this.push('' + (this._readNum++))
}
}
// 可寫端底層寫邏輯
duplex._write = function (buf, enc, next) {
// a, b
process.stdout.write('_write ' + buf.toString() + '\n')
next()
}
// 0, 1
duplex.on('data', data => console.log('ondata', data.toString()))
duplex.write('a')
duplex.write('b')
duplex.end()
</code></pre>
上面的代碼中實現了_read
方法,所以可以監聽data
事件來消耗Duplex
產生的數據。
同時,又實現了_write
方法,可作為下游去消耗數據。
因為它既可讀又可寫,所以稱它有兩端:可寫端和可讀端。
可寫端的接口與Writable
一致,作為下游來使用;可讀端的接口與Readable
一致,作為上游來使用。
Transform
在上面的例子中,可讀流中的數據(0, 1)與可寫流中的數據('a', 'b')是隔離開的,但在Transform
中可寫端寫入的數據經變換后會自動添加到可讀端。
Tranform
繼承自Duplex
,并已經實現了_read
和_write
方法,同時要求用戶實現一個_transform
方法。
'use strict'
const Transform = require('stream').Transform
class Rotate extends Transform {
constructor(n) {
super()
// 將字母旋轉n
個位置
this.offset = (n || 13) % 26
}
// 將可寫端寫入的數據變換后添加到可讀端
_transform(buf, enc, next) {
var res = buf.toString().split('').map(c => {
var code = c.charCodeAt(0)
if (c >= 'a' && c <= 'z') {
code += this.offset
if (code > 'z'.charCodeAt(0)) {
code -= 26
}
} else if (c >= 'A' && c <= 'Z') {
code += this.offset
if (code > 'Z'.charCodeAt(0)) {
code -= 26
}
}
return String.fromCharCode(code)
}).join('')
// 調用push方法將變換后的數據添加到可讀端
this.push(res)
// 調用next方法準備處理下一個
next()
}
}
var transform = new Rotate(3)
transform.on('data', data => process.stdout.write(data))
transform.write('hello, ')
transform.write('world!')
transform.end()
// khoor, zruog!
</code></pre>
objectMode
前面幾節的例子中,經常看到調用data.toString()
。這個toString()
的調用是必需的嗎?
本節介紹完如何控制流中的數據類型后,自然就有了答案。
在shell中,用管道(|
)連接上下游。上游輸出的是文本流(標準輸出流),下游輸入的也是文本流(標準輸入流)。在本文介紹的流中,默認也是如此。
對于可讀流來說,push(data)
時,data
只能是String
或Buffer
類型,而消耗時data
事件輸出的數據都是Buffer
類型。對于可寫流來說,write(data)
時,data
只能是String
或Buffer
類型,_write(data)
調用時傳進來的data
都是Buffer
類型。
也就是說,流中的數據默認情況下都是Buffer
類型。產生的數據一放入流中,便轉成Buffer
被消耗;寫入的數據在傳給底層寫邏輯時,也被轉成Buffer
類型。
但每個構造函數都接收一個配置對象,有一個objectMode
的選項,一旦設置為true
,就能出現“種瓜得瓜,種豆得豆”的效果。
Readable
未設置objectMode
時:
const Readable = require('stream').Readable
const readable = Readable()
readable.push('a')
readable.push('b')
readable.push(null)
readable.on('data', data => console.log(data))
</code></pre>
輸出:
<Buffer 61>
<Buffer 62>
Readable
設置objectMode
后:
const Readable = require('stream').Readable
const readable = Readable({ objectMode: true })
readable.push('a')
readable.push('b')
readable.push({})
readable.push(null)
readable.on('data', data => console.log(data))
</code></pre>
輸出:
a
b
{}
可見,設置objectMode
后,push(data)
的數據被原樣地輸出了。此時,可以生產任意類型的數據。
系列文章
- 第一部分:《Node.js Stream - 基礎篇》,介紹Stream接口的基本使用。
- 第二部分:《Node.js Stream - 進階篇》,重點剖析Stream底層如何支持流式數據處理,及其back pressure機制。
- 第三部分:《Node.js Stream - 實戰篇》,介紹如何使用Stream進行程序設計。從Browserify和Gulp總結出兩種設計模式,并基于Stream構建一個為Git倉庫自動生成changelog的應用作為示例。
參考文獻
來自:http://tech.meituan.com/stream-basics.html