深入理解 Node Stream 內部機制

wyp 7年前發布 | 22K 次閱讀 JavaScript開發 JavaScript

相信很多人對 Node 的 Stream 已經不陌生了,不論是請求流、響應流、文件流還是 socket 流,這些流的底層都是使用 stream 模塊封裝的,甚至我們平時用的最多的 console.log 打印日志也使用了它,不信你打開 Node runtime 的源碼,看看 lib/console.js

function write(ignoreErrors, stream, string, errorhandler){
 // ...
 stream.once('error', noop);
 stream.write(string, errorhandler);
 //...
}

Console.prototype.log = function log(...args){
 write(this._ignoreErrors,
 this._stdout,
 `${util.format.apply(null, args)}\n`,
 this._stdoutErrorHandler);
};

Stream 模塊做了很多事情,了解了 Stream,那么 Node 中其他很多模塊理解起來就順暢多了。

本文代碼和圖片可以在這里取用: https://github.com/barretlee/dive-into-node-stream

stream 模塊

如果你了解 生產者和消費者問題 的解法,那理解 stream 就基本沒有壓力了,它不僅僅是資料的起點和落點,還包含了一系列狀態控制,可以說一個 stream 就是一個狀態管理單元。了解內部機制的最佳方式除了看 Node 官方文檔 ,還可以去看看 Node 的 源碼

  • lib/module.js
  • lib/_stream_readable.js
  • lib/_stream_writable.js
  • lib/_stream_tranform.js
  • lib/_stream_duplex.js

把 Readable 和 Writable 看明白,Tranform 和 Duplex 就不難理解了。

Readable Stream

Readable Stream 存在兩種模式,一種是叫做 Flowing Mode ,流動模式,在 Stream 上綁定 ondata 方法就會自動觸發這個模式,比如:

const readable = getReadableStreamSomehow();
readable.on('data', (chunk) => {
 console.log(`Received ${chunk.length} bytes of data.`);
});

這個模式的流程圖如下:

資源的數據流并不是直接流向消費者,而是先 push 到緩存池,緩存池有一個水位標記 highWatermark ,超過這個標記閾值,push 的時候會返回 false ,什么場景下會出現這種情況呢?

  • 消費者主動執行了 .pause()
  • 消費速度比數據 push 到緩存池的生產速度慢

有個專有名詞來形成這種情況,叫做「背壓」,Writable Stream 也存在類似的情況。

流動模式,這個名詞還是很形象的,緩存池就像一個水桶,消費者通過管口接水,同時,資源池就像一個水泵,不斷地往水桶中泵水,而 highWaterMark 是水桶的浮標,達到閾值就停止蓄水。

下面是一個簡單的 Demo:

const Readable = require('stream').Readable;

// Stream 實現
class MyReadable extends Readable{
 constructor(dataSource, options) {
 super(options);
 this.dataSource = dataSource;
 }
 // 繼承了 Readable 的類必須實現這個函數
 // 觸發系統底層對流的讀取
 _read() {
 const data = this.dataSource.makeData();
 this.push(data);
 }
}

// 模擬資源池
const dataSource = {
 data: new Array(10).fill('-'),
 // 每次讀取時 pop 一個數據
 makeData() {
 if (!dataSource.data.length) return null;
 return dataSource.data.pop();
 }
};

const myReadable = new MyReadable(dataSource);
myReadable.setEncoding('utf8');
myReadable.on('data', (chunk) => {
 console.log(chunk);
});

另外一種模式是 Non-Flowing Mode ,沒流動,也就是暫停模式,這是 Stream 的預設模式,Stream 實例的 _readableState.flow 有三個狀態,分別是:

  • _readableState.flow = null ,暫時沒有消費者過來
  • _readableState.flow = false ,主動觸發了 .pause()
  • _readableState.flow = true ,流動模式

當我們監聽了 onreadable 事件后,會進入這種模式,比如:

const myReadable = new MyReadable(dataSource);
myReadable.setEncoding('utf8');
myReadable.on('readable', () => {});

監聽 readable 的回調函數第一個參數不會傳遞內容,需要我們通過 myReadable.read() 主動讀取,為啥呢,可以看看下面這張圖:

資源池會不斷地往緩存池輸送數據,直到 highWaterMark 閾值,消費者監聽了 readable 事件并不會消費數據,需要主動調用 .read([size]) 函數才會從緩存池取出,并且可以帶上 size 參數,用多少就取多少:

const myReadable = new MyReadable(dataSource);
myReadable.setEncoding('utf8');
myReadable.on('readable', () => {
 let chunk;
 while (null !== (chunk = myReadable.read())) {
 console.log(`Received ${chunk.length} bytes of data.`);
 }
});

這里需要注意一點,只要數據達到緩存池都會觸發一次 readable 事件,有可能出現「消費者正在消費數據的時候,又觸發了一次 readable 事件,那么下次回調中 read 到的數據可能為空」的情況。我們可以通過 _readableState.buffer 來查看緩存池到底緩存了多少資源:

let once = false;
myReadable.on('readable', (chunk) => {
 console.log(myReadable._readableState.buffer.length);
 if (once) return;
 once = true;
 console.log(myReadable.read());
});

上面的代碼我們只消費一次緩存池的數據,那么在消費后,緩存池又收到了一次資源池的 push 操作,此時還會觸發一次 readable 事件,我們可以看看這次存了多大的 buffer。

需要注意的是,buffer 大小也是有上限的,默認設置為 16kb,也就是 16384 個字節長度,它最大可設置為 8Mb,沒記錯的話,這個值好像是 Node 的 new space memory 的大小。

上面介紹了 Readable Stream 大概的機制,還有很多細節部分沒有提到,比如 Flowing Mode 在不同 Node 版本中的 Stream 實現不太一樣,實際上,它有三個版本,上面提到的是第 2 和 第 3 個版本的實現;再比如 Mixins Mode 模式,一般我們只推薦(允許)使用 ondata 和 onreadable 的一種來處理 Readable Stream,但是如果要求在 Non-Flowing Mode 的情況下使用 ondata 如何實現呢?那么就可以考慮 Mixins Mode 了。

Writable Stream

原理與 Readable Stream 是比較相似的,數據流過來的時候,會直接寫入到資源池,當寫入速度比較緩慢或者寫入暫停時,數據流會進入隊列池緩存起來,如下圖所示:

當生產者寫入速度過快,把隊列池裝滿了之后,就會出現「背壓」,這個時候是需要告訴生產者暫停生產的,當隊列釋放之后,Writable Stream 會給生產者發送一個 drain 消息,讓它恢復生產。下面是一個寫入一百萬條數據的 Demo:

function writeOneMillionTimes(writer, data, encoding, callback){
 let i = 10000;
 write();
 function write(){
 let ok = true;
 while(i-- > 0 && ok) {
 // 寫入結束時回調
 ok = writer.write(data, encoding, i === 0 ? callback : null);
 }
 if (i > 0) {
 // 這里提前停下了,'drain' 事件觸發后才可以繼續寫入
 console.log('drain', i);
 writer.once('drain', write);
 }
 }
}

我們構造一個 Writable Stream,在寫入到資源池的時候,我們稍作處理,讓它效率低一點:

const Writable = require('stream').Writable;
const writer = new Writable({
 write(chunk, encoding, callback) {
 // 比 process.nextTick() 稍慢
 setTimeout(() => {
 callback && callback();
 });
 }
});

writeOneMillionTimes(writer, 'simple', 'utf8', () => {
 console.log('end');
});

最后執行的結果是:

drain 7268
drain 4536
drain 1804
end

說明程序遇到了三次「背壓」,如果我們沒有在上面綁定 writer.once('drain') ,那么最后的結果就是 Stream 將第一次獲取的數據消耗完變結束了程序。

pipe

了解了 Readable 和 Writable,pipe 這個常用的函數應該就很好理解了,

readable.pipe(writable);

這句代碼的語意性很強,readable 通過 pipe(管道)傳輸給 writable,pipe 的實現大致如下(偽代碼):

Readable.prototype.pipe = function(writable, options){
 this.on('data', (chunk) => {
 let ok = writable.write(chunk);
true// 背壓,暫停
 !ok && this.pause();
 });
 writable.on('drain', () => {
 // 恢復
 this.resume();
 });
 // 告訴 writable 有流要導入
 writable.emit('pipe', this);
 // 支持鏈式調用
 return writable;
};

上面做了五件事情:

  • emit(pipe) ,通知寫入
  • .write() ,新數據過來,寫入
  • .pause() ,消費者消費速度慢,暫停寫入
  • .resume() ,消費者完成消費,繼續寫入
  • return writable ,支持鏈式調用

當然,上面只是最簡單的邏輯,還有很多異常和臨界判斷沒有加入,具體可以去看看 Node 的代碼( /lib/_stream_readable.js )。

Duplex Stream

Duplex,雙工的意思,它的輸入和輸出可以沒有任何關系,

Duplex Stream 實現特別簡單,不到一百行代碼,它繼承了 Readable Stream,并擁有 Writable Stream 的方法( 源碼地址 ):

const util = require('util');
const Readable = require('_stream_readable');
const Writable = require('_stream_writable');

util.inherits(Duplex, Readable);

var keys = Object.keys(Writable.prototype);
for (var v = 0; v < keys.length; v++) {
 var method = keys[v];
 if (!Duplex.prototype[method])
 Duplex.prototype[method] = Writable.prototype[method];
}

我們可以通過 options 參數來配置它為只可讀、只可寫或者半工模式,一個簡單的 Demo:

var Duplex = require('stream').Duplex

const duplex = Duplex();

// readable
let i = 2;
duplex._read = function (){
 this.push(i-- ? 'read ' + i : null);
};
duplex.on('data', data => console.log(data.toString()));

// writable
duplex._write = function (chunk, encoding, callback){
 console.log(chunk.toString());
 callback();
};
duplex.write('write');

輸出的結果為:

write
read 1
read 0

可以看出,兩個管道是相互之間不干擾的。

Transform Stream

Transform Stream 集成了 Duplex Stream,它同樣具備 Readable 和 Writable 的能力,只不過它的輸入和輸出是存在相互關聯的,中間做了一次轉換處理。常見的處理有 Gzip 壓縮、解壓等。

Transform 的處理就是通過 _transform 函數將 Duplex 的 Readable 連接到 Writable,由于 Readable 的生產效率與 Writable 的消費效率是一樣的,所以這里 Transform 內部不存在「背壓」問題,背壓問題的源頭是外部的生產者和消費者速度差造成的。

關于 Transfrom Stream,我寫了一個簡單的 Demo:

const Transform = require('stream').Transform;
const MAP = {
 'Barret': '靖',
 'Lee': '李'
};

class Translate extends Transform{
 constructor(dataSource, options) {
 super(options);
 }
 _transform(buf, enc, next) {
 const key = buf.toString();
 const data = MAP[key];
 this.push(data);
 next();
 }
}

var transform = new Translate();
transform.on('data', data => console.log(data.toString()));
transform.write('Lee');
transform.write('Barret');
transform.end();

小結

本文主要參考和查閱 Node 官網的文檔和源碼,細節問題都是從源碼中找到的答案,如有理解不準確之處,還請斧正。關于 Stream,這篇文章只是講述了基礎的原理,還有很多細節之處沒有講到,要真正理解它,還是需要多讀讀文檔,寫寫代碼。

了解了這些 Stream 的內部機制,對我們后續深入理解上層代碼有很大的促進作用,特別希望初學 Node 的同學花點時間進來看看。

 

來自:http://www.barretlee.com/blog/2017/06/06/dive-to-nodejs-at-stream-module/

 

 本文由用戶 wyp 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!