深入理解 Node Stream 內部機制
相信很多人對 Node 的 Stream 已經不陌生了,不論是請求流、響應流、文件流還是 socket 流,這些流的底層都是使用 stream 模塊封裝的,甚至我們平時用的最多的 console.log 打印日志也使用了它,不信你打開 Node runtime 的源碼,看看 lib/console.js :
function write(ignoreErrors, stream, string, errorhandler) {
// ...
stream.once('error', noop);
stream.write(string, errorhandler);
//...
}
Console.prototype.log = function log(...args) {
write(this._ignoreErrors,
this._stdout,
`${util.format.apply(null, args)}\n`,
this._stdoutErrorHandler);
};
Stream 模塊做了很多事情,了解了 Stream,那么 Node 中其他很多模塊理解起來就順暢多了。
本文代碼和圖片可以在這里取用: https://github.com/barretlee/dive-into-node-stream 。
stream 模塊
如果你了解 生產者和消費者問題 的解法,那理解 stream 就基本沒有壓力了,它不僅僅是資料的起點和落點,還包含了一系列狀態控制,可以說一個 stream 就是一個狀態管理單元。了解內部機制的最佳方式除了看 Node 官方文檔 ,還可以去看看 Node 的 源碼 :
- lib/module.js
- lib/_stream_readable.js
- lib/_stream_writable.js
- lib/_stream_tranform.js
- lib/_stream_duplex.js
把 Readable 和 Writable 看明白,Tranform 和 Duplex 就不難理解了。
Readable Stream
Readable Stream 存在兩種模式,一種是叫做 Flowing Mode ,流動模式,在 Stream 上綁定 ondata 方法就會自動觸發這個模式,比如:
const readable = getReadableStreamSomehow();
readable.on('data', (chunk) => {
console.log(`Received ${chunk.length} bytes of data.`);
});
這個模式的流程圖如下:
資源的數據流并不是直接流向消費者,而是先 push 到緩存池,緩存池有一個水位標記 highWatermark ,超過這個標記閾值,push 的時候會返回 false ,什么場景下會出現這種情況呢?
- 消費者主動執行了 .pause()
- 消費速度比數據 push 到緩存池的生產速度慢
有個專有名詞來形成這種情況,叫做「背壓」,Writable Stream 也存在類似的情況。
流動模式,這個名詞還是很形象的,緩存池就像一個水桶,消費者通過管口接水,同時,資源池就像一個水泵,不斷地往水桶中泵水,而 highWaterMark 是水桶的浮標,達到閾值就停止蓄水。
下面是一個簡單的 Demo:
const Readable = require('stream').Readable;
// Stream 實現
class MyReadable extends Readable {
constructor(dataSource, options) {
super(options);
this.dataSource = dataSource;
}
// 繼承了 Readable 的類必須實現這個函數
// 觸發系統底層對流的讀取
_read() {
const data = this.dataSource.makeData();
this.push(data);
}
}
// 模擬資源池
const dataSource = {
data: new Array(10).fill('-'),
// 每次讀取時 pop 一個數據
makeData() {
if (!dataSource.data.length) return null;
return dataSource.data.pop();
}
};
const myReadable = new MyReadable(dataSource);
myReadable.setEncoding('utf8');
myReadable.on('data', (chunk) => {
console.log(chunk);
});
另外一種模式是 Non-Flowing Mode ,沒流動,也就是暫停模式,這是 Stream 的預設模式,Stream 實例的 _readableState.flow 有三個狀態,分別是:
- _readableState.flow = null ,暫時沒有消費者過來
- _readableState.flow = false ,主動觸發了 .pause()
- _readableState.flow = true ,流動模式
當我們監聽了 onreadable 事件后,會進入這種模式,比如:
const myReadable = new MyReadable(dataSource);
myReadable.setEncoding('utf8');
myReadable.on('readable', () => {});
監聽 readable 的回調函數第一個參數不會傳遞內容,需要我們通過 myReadable.read() 主動讀取,為啥呢,可以看看下面這張圖:
資源池會不斷地往緩存池輸送數據,直到 highWaterMark 閾值,消費者監聽了 readable 事件并不會消費數據,需要主動調用 .read([size]) 函數才會從緩存池取出,并且可以帶上 size 參數,用多少就取多少:
const myReadable = new MyReadable(dataSource);
myReadable.setEncoding('utf8');
myReadable.on('readable', () => {
let chunk;
while (null !== (chunk = myReadable.read())) {
console.log(`Received ${chunk.length} bytes of data.`);
}
});
這里需要注意一點,只要數據達到緩存池都會觸發一次 readable 事件,有可能出現「消費者正在消費數據的時候,又觸發了一次 readable 事件,那么下次回調中 read 到的數據可能為空」的情況。我們可以通過 _readableState.buffer 來查看緩存池到底緩存了多少資源:
let once = false;
myReadable.on('readable', (chunk) => {
console.log(myReadable._readableState.buffer.length);
if (once) return;
once = true;
console.log(myReadable.read());
});
上面的代碼我們只消費一次緩存池的數據,那么在消費后,緩存池又收到了一次資源池的 push 操作,此時還會觸發一次 readable 事件,我們可以看看這次存了多大的 buffer。
需要注意的是,buffer 大小也是有上限的,默認設置為 16kb,也就是 16384 個字節長度,它最大可設置為 8Mb,沒記錯的話,這個值好像是 Node 的 new space memory 的大小。
上面介紹了 Readable Stream 大概的機制,還有很多細節部分沒有提到,比如 Flowing Mode 在不同 Node 版本中的 Stream 實現不太一樣,實際上,它有三個版本,上面提到的是第 2 和 第 3 個版本的實現;再比如 Mixins Mode 模式,一般我們只推薦(允許)使用 ondata 和 onreadable 的一種來處理 Readable Stream,但是如果要求在 Non-Flowing Mode 的情況下使用 ondata 如何實現呢?那么就可以考慮 Mixins Mode 了。
Writable Stream
原理與 Readable Stream 是比較相似的,數據流過來的時候,會直接寫入到資源池,當寫入速度比較緩慢或者寫入暫停時,數據流會進入隊列池緩存起來,如下圖所示:
當生產者寫入速度過快,把隊列池裝滿了之后,就會出現「背壓」,這個時候是需要告訴生產者暫停生產的,當隊列釋放之后,Writable Stream 會給生產者發送一個 drain 消息,讓它恢復生產。下面是一個寫入一百萬條數據的 Demo:
function writeOneMillionTimes(writer, data, encoding, callback) {
let i = 10000;
write();
function write() {
let ok = true;
while(i-- > 0 && ok) {
// 寫入結束時回調
ok = writer.write(data, encoding, i === 0 ? callback : null);
}
if (i > 0) {
// 這里提前停下了,'drain' 事件觸發后才可以繼續寫入
console.log('drain', i);
writer.once('drain', write);
}
}
}
我們構造一個 Writable Stream,在寫入到資源池的時候,我們稍作處理,讓它效率低一點:
const Writable = require('stream').Writable;
const writer = new Writable({
write(chunk, encoding, callback) {
// 比 process.nextTick() 稍慢
setTimeout(() => {
callback && callback();
});
}
});
writeOneMillionTimes(writer, 'simple', 'utf8', () => {
console.log('end');
});
最后執行的結果是:
drain 7268
drain 4536
drain 1804
end
說明程序遇到了三次「背壓」,如果我們沒有在上面綁定 writer.once('drain') ,那么最后的結果就是 Stream 將第一次獲取的數據消耗完變結束了程序。
pipe
了解了 Readable 和 Writable,pipe 這個常用的函數應該就很好理解了,
readable.pipe(writable);
這句代碼的語意性很強,readable 通過 pipe(管道)傳輸給 writable,pipe 的實現大致如下(偽代碼):
Readable.prototype.pipe = function(writable, options) {
this.on('data', (chunk) => {
let ok = writable.write(chunk);
true// 背壓,暫停
!ok && this.pause();
});
writable.on('drain', () => {
// 恢復
this.resume();
});
// 告訴 writable 有流要導入
writable.emit('pipe', this);
// 支持鏈式調用
return writable;
};
上面做了五件事情:
- emit(pipe) ,通知寫入
- .write() ,新數據過來,寫入
- .pause() ,消費者消費速度慢,暫停寫入
- .resume() ,消費者完成消費,繼續寫入
- return writable ,支持鏈式調用
當然,上面只是最簡單的邏輯,還有很多異常和臨界判斷沒有加入,具體可以去看看 Node 的代碼( /lib/_stream_readable.js )。
Duplex Stream
Duplex,雙工的意思,它的輸入和輸出可以沒有任何關系,
Duplex Stream 實現特別簡單,不到一百行代碼,它繼承了 Readable Stream,并擁有 Writable Stream 的方法( 源碼地址 ):
const util = require('util');
const Readable = require('_stream_readable');
const Writable = require('_stream_writable');
util.inherits(Duplex, Readable);
var keys = Object.keys(Writable.prototype);
for (var v = 0; v < keys.length; v++) {
var method = keys[v];
if (!Duplex.prototype[method])
Duplex.prototype[method] = Writable.prototype[method];
}
我們可以通過 options 參數來配置它為只可讀、只可寫或者半工模式,一個簡單的 Demo:
var Duplex = require('stream').Duplex
const duplex = Duplex();
// readable
let i = 2;
duplex._read = function () {
this.push(i-- ? 'read ' + i : null);
};
duplex.on('data', data => console.log(data.toString()));
// writable
duplex._write = function (chunk, encoding, callback) {
console.log(chunk.toString());
callback();
};
duplex.write('write');
輸出的結果為:
write
read 1
read 0
可以看出,兩個管道是相互之間不干擾的。
Transform Stream
Transform Stream 集成了 Duplex Stream,它同樣具備 Readable 和 Writable 的能力,只不過它的輸入和輸出是存在相互關聯的,中間做了一次轉換處理。常見的處理有 Gzip 壓縮、解壓等。
Transform 的處理就是通過 _transform 函數將 Duplex 的 Readable 連接到 Writable,由于 Readable 的生產效率與 Writable 的消費效率是一樣的,所以這里 Transform 內部不存在「背壓」問題,背壓問題的源頭是外部的生產者和消費者速度差造成的。
關于 Transfrom Stream,我寫了一個簡單的 Demo:
const Transform = require('stream').Transform;
const MAP = {
'Barret': '靖',
'Lee': '李'
};
class Translate extends Transform {
constructor(dataSource, options) {
super(options);
}
_transform(buf, enc, next) {
const key = buf.toString();
const data = MAP[key];
this.push(data);
next();
}
}
var transform = new Translate();
transform.on('data', data => console.log(data.toString()));
transform.write('Lee');
transform.write('Barret');
transform.end();
小結
本文主要參考和查閱 Node 官網的文檔和源碼,細節問題都是從源碼中找到的答案,如有理解不準確之處,還請斧正。關于 Stream,這篇文章只是講述了基礎的原理,還有很多細節之處沒有講到,要真正理解它,還是需要多讀讀文檔,寫寫代碼。
了解了這些 Stream 的內部機制,對我們后續深入理解上層代碼有很大的促進作用,特別希望初學 Node 的同學花點時間進來看看。
另外,本文代碼和圖片可以在這里取用: https://github.com/barretlee/dive-into-node-stream 。
來自:http://web.jobbole.com/91439/