通過源碼解析Node.js中導流(pipe)的實現
來自: http://hao.jser.com/archive/9353/
在 Node.js 中,流( Stream )是其眾多原生對象的基類,它對處理潛在的大文件提供了支持,也抽象了一些場景下的數據處理和傳遞。在它對外暴露的接口中,最為神奇的,莫過于導流( pipe )方法了。鑒于近期自己正在閱讀 Node.js 中的部分源碼,也來從源碼層面分享下導流的具體實現。
正題
以下是一個關于導流的簡單例子:
'use strict' import {createReadStream, createWriteStream} from 'fs'createReadStream('/path/to/a/big/file').pipe(createWriteStream('/path/to/the/dest'))</pre>
再結合 官方文檔 ,我們可以把 pipe 方法的主要功能分解為:
-
不斷從來源可讀流中獲得一個指定長度的數據。
-
將獲取到的數據寫入目標可寫流。
-
平衡讀取和寫入速度,防止讀取速度大大超過寫入速度時,出現大量滯留數據。
好,讓我們跟隨 Node.js 項目里 lib/_stream_readable.js 和 lib/_stream_writable.js 中的代碼,逐個解析這三個主要功能的實現。
讀取數據
剛創建出的可讀流只是一個記錄了一些初始狀態的空殼,里面沒有任何數據,并且其狀態不屬于官方文檔中的流動模式(flowing mode)和暫停模式(paused mode)中的任何一種,算是一種偽暫停模式,因為此時實例的狀態中記錄它是否為暫停模式的變量還不是標準的布爾值,而是 null ,但又可通過將暫停模式轉化為流動模式的行為(調用實例的 resume() 方法),將可讀流切換至流動模式。在外部代碼中,我們可以手動監聽可讀流的 data 事件,讓其進入流動模式:
// lib/_stream_readable.js // ...Readable.prototype.on = function(ev, fn) { var res = Stream.prototype.on.call(this, ev, fn);
if (ev === 'data' && false !== this._readableState.flowing) { this.resume(); }
// ...
return res; };</pre>
可見,可讀流類通過二次封裝父類( EventEmitter )的 on() 方法,替我們在監聽 data 事件時,將流切換至了流動模式。而開始讀取數據的動作,則存在于 resume() 方法調用的內部方法 resume_() 中,讓我們一窺究竟:
// lib/_stream_readable.js // ...function resume_(stream, state) { if (!state.reading) { debug('resume read 0'); stream.read(0); }
// ... }</pre>
通過向可讀流讀取一次空數據(大小為0),將會觸發實例層面實現的 _read() 方法,開始讀取數據,然后利用讀到的數據觸發 data 事件:
// lib/_stream_readable.js // ...Readable.prototype.read = function(n) { // ... // 此次判斷的意圖為,如果可讀流的緩沖中已滿,則只空觸發readable事件。 if (n === 0 && state.needReadable && (state.length >= state.highWaterMark || state.ended)) { if (state.length === 0 && state.ended) endReadable(this); else emitReadable(this); return null; }
// 若可讀流已經被傳入了終止符(null),且緩沖中沒有遺留數據,則結束這個可讀流 if (n === 0 && state.ended) { if (state.length === 0) endReadable(this); return null; }
// 若目前緩沖中的數據大小為空,或未超過設置的警戒線,則進行一次數據讀取。 if (state.length === 0 || state.length - n < state.highWaterMark) { doRead = true; }
if (doRead) { // ... this._read(state.highWaterMark); }
// ...
if (ret !== null) this.emit('data', ret);
return ret; };</pre>
可見,在可讀流的 read() 方法內部,通過調用在實例層面實現的 _read(size) 方法,取得了一段(設置的警戒線)大小的數據,但是,你可能會疑惑,這只是讀取了一次數據啊,理想情況下,應該是循環調用 _read(size) 直至取完所有數據才對啊!?其實,這部分的邏輯存在于我們實現 _read(size) 方法時,在其內部調用的 this.push(data) 方法中,在最后其會調用私有方法 maybeReadMore_() ,再次觸發 read(0) ,接著在 read(0) 函數的代碼中再次判斷可讀流是否能夠結束,否則再進行一次 _read(size) 讀取:
// lib/_stream_readable.js // ...Readable.prototype.push = function(chunk, encoding) { var state = this._readableState; // ... return readableAddChunk(this, state, chunk, encoding, false); };
function readableAddChunk(stream, state, chunk, encoding, addToFront) { // ... if (er) { stream.emit('error', er); } else if (chunk === null) { state.reading = false; onEofChunk(stream, state); // 當傳入終止符時,將可讀流的結束標識(state.ended)設為true } // ... maybeReadMore(stream, state); } }
// ... }
function maybeReadMore(stream, state) { if (!state.readingMore) { // ... process.nextTick(maybeReadMore_, stream, state); } }
function maybeReadMore_(stream, state) { // ... stream.read(0); }
function onEofChunk(stream, state) { if (state.ended) return; // ... state.ended = true; // ... }</pre>
好的,此時從可讀流中讀取數據的整個核心流程已經實現了,讓我們歸納一下:
-
剛創建出的可讀流只是一個空殼,保存著一些初始狀態。
-
監聽它的 data 事件,將會自動調用該可讀流的 resume() 方法,使流切換至流動模式。
-
在 resume() 方法的內部函數 _resume() 中,對可讀流進行了一次 read(0) 調用。
-
read(0) 調用的內部,首先檢查流是否符合了結束條件,若符合,則 結束 之。否則調用實例實現的 _read(size) 方法讀取一段預設的警戒線(highWaterMark)大小的數據。
-
在實例實現 _read(size) 函數時內部調用的 this.push(data) 方法里,會先判斷的讀到的數據是否為結束符,若是,則將流的狀態設為結束,然后再一次對可讀流調用 read(0) 。
寫入數據
和可讀流一樣,剛創建出的可寫流也只是一個記錄了相關狀態(包括預設的寫入緩沖大小)的空殼。直接調用它的 write 方法,該方法會在其內部調用 writeOrBuffer 函數來對數據是否可以直接一次性全部寫入進行判斷:
// lib/_stream_writable.js // ...function writeOrBuffer(stream, state, chunk, encoding, cb) { // ... var ret = state.length < state.highWaterMark;
// 記錄可寫流是否需要出發drain事件 if (!ret) state.needDrain = true;
if (state.writing || state.corked) { // 若可寫流正在被寫入或被人工阻塞,則先將寫入操作排隊 // ... } else { doWrite(stream, state, false, len, chunk, encoding, cb); }
return ret; }
function doWrite(stream, state, writev, len, chunk, encoding, cb) { // ... if (writev) stream._writev(chunk, state.onwrite); else stream._write(chunk, encoding, state.onwrite); // ... }</pre>
從代碼中可知,在 writeOrBuffer 函數記錄下了數據是否可以被一次性寫入后,調用了實例層實現的 _write() 或 _writev() 方法進行了實際的寫入操作。那么,如果不能一次性寫入完畢,那么在真正寫入完畢時,又是如何進行通知的呢?嗯,答案就在設置的 state.onwrite 回調中:
// lib/_stream_writable.js // ...function onwrite(stream, er) { // ...
if (er) onwriteError(stream, state, sync, er, cb); else { // ... if (sync) { process.nextTick(afterWrite, stream, state, finished, cb); } else { afterWrite(stream, state, finished, cb); } } }
function afterWrite(stream, state, finished, cb) { if (!finished) onwriteDrain(stream, state); // ... }
function onwriteDrain(stream, state) { if (state.length === 0 && state.needDrain) { state.needDrain = false; stream.emit('drain'); } }</pre>
可見,在回調函數的執行中,會對該可寫流該次被寫入的數據是否超過了警戒線的狀態進行判斷,如果是,則觸發 drain 事件,進行通知。
我們也可以調用 end() 方法來表明要結束這個寫入流,并進行最后一次寫入, end() 方法的內部最終會調用 endWritable() 函數來講可寫流的狀態切換為已結束:
// lib/_stream_writable.js // ...function endWritable(stream, state, cb) { // ... state.ended = true; stream.writable = false; }</pre>
此時,向可寫流中寫入數據的整個核心流程已經實現了,這個流程和可寫流的循環讀取流程不同,它是直線的,歸納一下:
-
剛創建出的可寫流只是一個空殼,保存著一些初始狀態。
-
調用 write() 方法,其內部的 writeOrBuffer() 檢測該次寫入的數據是否需要被暫存在緩沖區中。
-
writeOrBuffer() 函數調用實例實現的 _write() 或 _writev() 方法,進行實際的寫入,完成后調用回調函數 state.onwrite 。
-
回調函數中檢測該次寫入是否被緩沖,若是,觸發 drain 事件。
-
重復以上過程,直至調用 end() 方法結束該可寫流。
導流
在摸清了從可讀流中讀數據,和向可寫流中寫數據實現的核心流程后, Node.js 中實現導流的核心流程其實已經呼之欲出了。首先,為了開始從源可讀流讀取數據,在 pipe() 方法的內部,它主動為源可讀流添加了 data 事件的監聽函數:
// lib/_stream_readable.js // ...Readable.prototype.pipe = function(dest, pipeOpts) { // ...
src.on('data', ondata); function ondata(chunk) { // ... src.pause(); } }
// ... return dest; };</pre>
從代碼中可見,若向目標可寫流寫入一次數據時,目標可寫流表示該次寫入它需要進行緩沖,則主動將源可讀流切換至暫停模式。那么,源可讀流通過什么手段得知可以再次讀取數據并寫入呢?嗯,通過監聽目標可寫流的 drain 事件:
// lib/_stream_readable.js // ...Readable.prototype.pipe = function(dest, pipeOpts) { // ... var ondrain = pipeOnDrain(src); dest.on('drain', ondrain);
// ... return dest; };
function pipeOnDrain(src) { return function() { var state = src._readableState;
// 目標可寫流可能會存在多次寫入需要進行緩沖的情況,需確保所有需要緩沖的寫入都 // 完成后,再次將可讀流切換至流動模式。 if (state.awaitDrain) state.awaitDrain--; if (state.awaitDrain === 0 && EE.listenerCount(src, 'data')) { state.flowing = true; flow(src); }
}; }</pre>
最后,監聽源可讀流的結束事件,對應著結束目標可寫流:
// lib/_stream_readable.js // ...Readable.prototype.pipe = function(dest, pipeOpts) { // ... var endFn = doEnd ? onend : cleanup; if (state.endEmitted) process.nextTick(endFn); else src.once('end', endFn);
function onend() { debug('onend'); dest.end(); }
// ... return dest; };</pre>
由于前面的鋪墊,實際導流操作的核心流程其實實現得非常輕松,歸納一下:
-
主動監聽源可讀流的 data 事件,在該事件的監聽函數中,向目標可寫流寫入數據。
-
若目標可寫流表示該寫入操作需要進行緩沖,則立刻將源可讀流切換至暫停模式。
-
監聽目標可寫流的 drain 事件,當目標可寫流里所有需要緩沖的寫入操作都完畢后,將流重新切換回流動模式。
-
監聽源可讀流的 end 事件,相應地結束目標可寫流。
最后
Node.js 中流的實際實現其實非常龐大,復雜,精妙。每一個流的內部,都管理著大量狀態。本文僅僅只是在龐大的流的實現中,選擇了一條主線,進行了闡述。大家如果有閑,非常推薦完整地閱讀一遍其實現。
參考: