深入理解 Node Stream 內部機制

Kat6936 7年前發布 | 24K 次閱讀 JavaScript開發 JavaScript

相信很多人對 Node 的 Stream 已經不陌生了,不論是請求流、響應流、文件流還是 socket 流,這些流的底層都是使用 stream 模塊封裝的,甚至我們平時用的最多的 console.log 打印日志也使用了它,不信你打開 Node runtime 的源碼,看看 lib/console.js

function write(ignoreErrors, stream, string, errorhandler) {
  // ...
  stream.once('error', noop);
  stream.write(string, errorhandler);
  //...
}
Console.prototype.log = function log(...args) {
  write(this._ignoreErrors,
        this._stdout,
        `${util.format.apply(null, args)}\n`,
        this._stdoutErrorHandler);
};

Stream 模塊做了很多事情,了解了 Stream,那么 Node 中其他很多模塊理解起來就順暢多了。

本文代碼和圖片可以在這里取用: https://github.com/barretlee/dive-into-node-stream

stream 模塊

如果你了解 生產者和消費者問題 的解法,那理解 stream 就基本沒有壓力了,它不僅僅是資料的起點和落點,還包含了一系列狀態控制,可以說一個 stream 就是一個狀態管理單元。了解內部機制的最佳方式除了看 Node 官方文檔 ,還可以去看看 Node 的 源碼

  • lib/module.js
  • lib/_stream_readable.js
  • lib/_stream_writable.js
  • lib/_stream_tranform.js
  • lib/_stream_duplex.js

把 Readable 和 Writable 看明白,Tranform 和 Duplex 就不難理解了。

Readable Stream

Readable Stream 存在兩種模式,一種是叫做 Flowing Mode ,流動模式,在 Stream 上綁定 ondata 方法就會自動觸發這個模式,比如:

const readable = getReadableStreamSomehow();
readable.on('data', (chunk) => {
  console.log(`Received ${chunk.length} bytes of data.`);
});

這個模式的流程圖如下:

資源的數據流并不是直接流向消費者,而是先 push 到緩存池,緩存池有一個水位標記 highWatermark ,超過這個標記閾值,push 的時候會返回 false ,什么場景下會出現這種情況呢?

  • 消費者主動執行了 .pause()
  • 消費速度比數據 push 到緩存池的生產速度慢

有個專有名詞來形成這種情況,叫做「背壓」,Writable Stream 也存在類似的情況。

流動模式,這個名詞還是很形象的,緩存池就像一個水桶,消費者通過管口接水,同時,資源池就像一個水泵,不斷地往水桶中泵水,而 highWaterMark 是水桶的浮標,達到閾值就停止蓄水。

下面是一個簡單的 Demo:

const Readable = require('stream').Readable;
// Stream 實現
class MyReadable extends Readable {
  constructor(dataSource, options) {
    super(options);
    this.dataSource = dataSource;
  }
  // 繼承了 Readable 的類必須實現這個函數
  // 觸發系統底層對流的讀取
  _read() {
    const data = this.dataSource.makeData();
    this.push(data);
  }
}
// 模擬資源池
const dataSource = {
  data: new Array(10).fill('-'),
  // 每次讀取時 pop 一個數據
  makeData() {
    if (!dataSource.data.length) return null;
    return dataSource.data.pop();
  }
};
const myReadable = new MyReadable(dataSource);
myReadable.setEncoding('utf8');
myReadable.on('data', (chunk) => {
  console.log(chunk);
});

另外一種模式是 Non-Flowing Mode ,沒流動,也就是暫停模式,這是 Stream 的預設模式,Stream 實例的 _readableState.flow 有三個狀態,分別是:

  • _readableState.flow = null ,暫時沒有消費者過來
  • _readableState.flow = false ,主動觸發了 .pause()
  • _readableState.flow = true ,流動模式

當我們監聽了 onreadable 事件后,會進入這種模式,比如:

const myReadable = new MyReadable(dataSource);
myReadable.setEncoding('utf8');
myReadable.on('readable', () => {});

監聽 readable 的回調函數第一個參數不會傳遞內容,需要我們通過 myReadable.read() 主動讀取,為啥呢,可以看看下面這張圖:

資源池會不斷地往緩存池輸送數據,直到 highWaterMark 閾值,消費者監聽了 readable 事件并不會消費數據,需要主動調用 .read([size]) 函數才會從緩存池取出,并且可以帶上 size 參數,用多少就取多少:

const myReadable = new MyReadable(dataSource);
myReadable.setEncoding('utf8');
myReadable.on('readable', () => {
  let chunk;
  while (null !== (chunk = myReadable.read())) {
    console.log(`Received ${chunk.length} bytes of data.`);
  }
});

這里需要注意一點,只要數據達到緩存池都會觸發一次 readable 事件,有可能出現「消費者正在消費數據的時候,又觸發了一次 readable 事件,那么下次回調中 read 到的數據可能為空」的情況。我們可以通過 _readableState.buffer 來查看緩存池到底緩存了多少資源:

let once = false;
myReadable.on('readable', (chunk) => {
  console.log(myReadable._readableState.buffer.length);
  if (once) return;
  once = true;
  console.log(myReadable.read());
});

上面的代碼我們只消費一次緩存池的數據,那么在消費后,緩存池又收到了一次資源池的 push 操作,此時還會觸發一次 readable 事件,我們可以看看這次存了多大的 buffer。

需要注意的是,buffer 大小也是有上限的,默認設置為 16kb,也就是 16384 個字節長度,它最大可設置為 8Mb,沒記錯的話,這個值好像是 Node 的 new space memory 的大小。

上面介紹了 Readable Stream 大概的機制,還有很多細節部分沒有提到,比如 Flowing Mode 在不同 Node 版本中的 Stream 實現不太一樣,實際上,它有三個版本,上面提到的是第 2 和 第 3 個版本的實現;再比如 Mixins Mode 模式,一般我們只推薦(允許)使用 ondata 和 onreadable 的一種來處理 Readable Stream,但是如果要求在 Non-Flowing Mode 的情況下使用 ondata 如何實現呢?那么就可以考慮 Mixins Mode 了。

Writable Stream

原理與 Readable Stream 是比較相似的,數據流過來的時候,會直接寫入到資源池,當寫入速度比較緩慢或者寫入暫停時,數據流會進入隊列池緩存起來,如下圖所示:

當生產者寫入速度過快,把隊列池裝滿了之后,就會出現「背壓」,這個時候是需要告訴生產者暫停生產的,當隊列釋放之后,Writable Stream 會給生產者發送一個 drain 消息,讓它恢復生產。下面是一個寫入一百萬條數據的 Demo:

function writeOneMillionTimes(writer, data, encoding, callback) {
  let i = 10000;
  write();
  function write() {
    let ok = true;
    while(i-- > 0 && ok) {
      // 寫入結束時回調
      ok = writer.write(data, encoding, i === 0 ? callback : null);
    }
    if (i > 0) {
      // 這里提前停下了,'drain' 事件觸發后才可以繼續寫入  
      console.log('drain', i);
      writer.once('drain', write);
    }
  }
}

我們構造一個 Writable Stream,在寫入到資源池的時候,我們稍作處理,讓它效率低一點:

const Writable = require('stream').Writable;
const writer = new Writable({
  write(chunk, encoding, callback) {
    // 比 process.nextTick() 稍慢
    setTimeout(() => {
      callback && callback();
    });
  }
});
writeOneMillionTimes(writer, 'simple', 'utf8', () => {
  console.log('end');
});

最后執行的結果是:

drain 7268
drain 4536
drain 1804
end

說明程序遇到了三次「背壓」,如果我們沒有在上面綁定 writer.once('drain') ,那么最后的結果就是 Stream 將第一次獲取的數據消耗完變結束了程序。

pipe

了解了 Readable 和 Writable,pipe 這個常用的函數應該就很好理解了,

readable.pipe(writable);

這句代碼的語意性很強,readable 通過 pipe(管道)傳輸給 writable,pipe 的實現大致如下(偽代碼):

Readable.prototype.pipe = function(writable, options) {
  this.on('data', (chunk) => {
    let ok = writable.write(chunk);
true// 背壓,暫停
    !ok && this.pause();
  });
  writable.on('drain', () => {
    // 恢復
    this.resume();
  });
  // 告訴 writable 有流要導入
  writable.emit('pipe', this);
  // 支持鏈式調用
  return writable;
};

上面做了五件事情:

  • emit(pipe) ,通知寫入
  • .write() ,新數據過來,寫入
  • .pause() ,消費者消費速度慢,暫停寫入
  • .resume() ,消費者完成消費,繼續寫入
  • return writable ,支持鏈式調用

當然,上面只是最簡單的邏輯,還有很多異常和臨界判斷沒有加入,具體可以去看看 Node 的代碼( /lib/_stream_readable.js )。

Duplex Stream

Duplex,雙工的意思,它的輸入和輸出可以沒有任何關系,

Duplex Stream 實現特別簡單,不到一百行代碼,它繼承了 Readable Stream,并擁有 Writable Stream 的方法( 源碼地址 ):

const util = require('util');
const Readable = require('_stream_readable');
const Writable = require('_stream_writable');
util.inherits(Duplex, Readable);
var keys = Object.keys(Writable.prototype);
for (var v = 0; v < keys.length; v++) {
  var method = keys[v];
  if (!Duplex.prototype[method])
    Duplex.prototype[method] = Writable.prototype[method];
}

我們可以通過 options 參數來配置它為只可讀、只可寫或者半工模式,一個簡單的 Demo:

var Duplex = require('stream').Duplex
const duplex = Duplex();
// readable
let i = 2;
duplex._read = function () {
  this.push(i-- ? 'read ' + i : null);
};
duplex.on('data', data => console.log(data.toString()));
// writable
duplex._write = function (chunk, encoding, callback) {
  console.log(chunk.toString());
  callback();
};
duplex.write('write');

輸出的結果為:

write
read 1
read 0

可以看出,兩個管道是相互之間不干擾的。

Transform Stream

Transform Stream 集成了 Duplex Stream,它同樣具備 Readable 和 Writable 的能力,只不過它的輸入和輸出是存在相互關聯的,中間做了一次轉換處理。常見的處理有 Gzip 壓縮、解壓等。

Transform 的處理就是通過 _transform 函數將 Duplex 的 Readable 連接到 Writable,由于 Readable 的生產效率與 Writable 的消費效率是一樣的,所以這里 Transform 內部不存在「背壓」問題,背壓問題的源頭是外部的生產者和消費者速度差造成的。

關于 Transfrom Stream,我寫了一個簡單的 Demo:

const Transform = require('stream').Transform;
const MAP = {
  'Barret': '靖',
  'Lee': '李'
};
  
class Translate extends Transform {
  constructor(dataSource, options) {
    super(options);
  }
  _transform(buf, enc, next) {
    const key = buf.toString();
    const data = MAP[key];
    this.push(data);
    next();
  }
}
var transform = new Translate();
transform.on('data', data => console.log(data.toString()));
transform.write('Lee');
transform.write('Barret');
transform.end();

小結

本文主要參考和查閱 Node 官網的文檔和源碼,細節問題都是從源碼中找到的答案,如有理解不準確之處,還請斧正。關于 Stream,這篇文章只是講述了基礎的原理,還有很多細節之處沒有講到,要真正理解它,還是需要多讀讀文檔,寫寫代碼。

了解了這些 Stream 的內部機制,對我們后續深入理解上層代碼有很大的促進作用,特別希望初學 Node 的同學花點時間進來看看。

另外,本文代碼和圖片可以在這里取用: https://github.com/barretlee/dive-into-node-stream

 

來自:http://web.jobbole.com/91439/

 

 本文由用戶 Kat6936 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!