nodejs stream 手冊完整中文版本

jopen 9年前發布 | 70K 次閱讀 Node.js 開發 NodeJS

stream-handbook

stream-handbook的完整中文版本

nodejs stream 手冊

寫在前面的話: 如果你正在學習Nodejs,那么流一定是一個你需要掌握的概念。如果你想成為一個Node高手,那么流一定是武功秘籍中不可缺少的一個部分。關于流這個主題,由Node高手substack帶來的stream-handbook絕對是經典入門讀物之一,其在Github上的star數量以上超過了4500個,足以見其權威程度。本文下面的內容將參考自substak的這篇文章。本文也放在Github上,如果你本文覺得對你有幫助,鼓勵大家去github上幫我點個贊。https://github.com/jabez128/stream-handbook

引子

在編寫代碼時,我們應該有一些方法將程序像連接水管一樣連接起來 -- 當我們需要獲取一些數據時,可以去通過"擰"其他的部分來達到目的。這也應該是IO應有的方式。 -- Doug McIlroy. October 11, 1964

從unix開始,stream便開始進入了人們的視野,在過去的幾十年的時間里,它被證明是一種可以可依賴的編程方式,它可以將一個大型的系統拆成一些很小的部分,并且讓這些部分之間完美的進行合作。在unix中,我們可以使用|符號來實現流。在node中,node的內建stream模塊已經被多個核心模塊使用,同時也可以被用戶自定義的模塊使用。和unix類似,node中的流模塊的基本操作符叫做.pipe(),同時你也可以使用一個后壓機制來應對那些對數據消耗較慢的對象。

在node中,流可以幫助我們將事情的重點分為幾份,因為使用流可以幫助我們將實現接口的部分分割成一些連續的接口,這些接口都是可重用的。接著,你可以將一個流的輸出口接到另一個流的輸入口,然后使用使用一些庫來對流實現高級別的控制。

對于小型程序設計(small-program design)以及unix哲學來說,流都是一個重要的主城部分,但是除此之外還有一些重要的事情值得我們思考。永遠要記得:十鳥在森林不如一鳥在手里。

為什么應該使用流

在node中,I/O都是異步的,所以在和硬盤以及網絡的交互過程中會涉及到傳遞回調函數的過程。你之前可能會寫出這樣的代碼:

var http = require('http');
var fs = require('fs');

var server = http.createServer(function (req, res) {
    fs.readFile(__dirname + '/data.txt', function (err, data) {
        res.end(data);
    });
});
server.listen(8000);

上面的這段代碼并沒有什么問題,但是在每次請求時,我們都會把整個data.txt文件讀入到內存中,然后再把結果返回給客戶端。想想看,如果data.txt文件非常大,在響應大量用戶的并發請求時,程序可能會消耗大量的內存,這樣很可能會造成用戶連接緩慢的問題。

其次,上面的代碼可能會造成很不好的用戶體驗,因為用戶在接收到任何的內容之前首先需要等待程序將文件內容完全讀入到內存中。

所幸的是,(req,res)參數都是流對象,這意味著我們可以使用一種更好的方法來實現上面的需求:

var http = require('http');
var fs = require('fs');

var server = http.createServer(function (req, res) {
    var stream = fs.createReadStream(__dirname + '/data.txt');
    stream.pipe(res);
});
server.listen(8000);

在這里,.pipe()方法會自動幫助我們監聽data和end事件。上面的這段代碼不僅簡潔,而且data.txt文件中每一小段數據都將源源不斷的發送到客戶端。

除此之外,使用.pipe()方法還有別的好處,比如說它可以自動控制后端壓力,以便在客戶端連接緩慢的時候node可以將盡可能少的緩存放到內存中。

想要將數據進行壓縮?我們可以使用相應的流模塊完成這項工作!

var http = require('http');
var fs = require('fs');
var oppressor = require('oppressor');

var server = http.createServer(function (req, res) {
    var stream = fs.createReadStream(__dirname + '/data.txt');
    stream.pipe(oppressor(req)).pipe(res);
});
server.listen(8000);

通過上面的代碼,我們成功的將發送到瀏覽器端的數據進行了gzip壓縮。我們只是使用了一個oppressor模塊來處理這件事情。

一旦你學會使用流api,你可以將這些流模塊像搭樂高積木或者像連接水管一樣拼湊起來,從此以后你可能再也不會去使用那些沒有流API的模塊獲取和推送數據了。

流模塊基礎

在node中,一共有五種類型的流:readable,writable,transform,duplex以及"classic"

pipe

無論哪一種流,都會使用.pipe()方法來實現輸出和輸出。

.pipe()函數很簡單,它僅僅是接受一個源頭src并將數據輸出到一個可寫的流dst中:

src.pipe(dst)

.pipe(dst)將會返回dst因此你可以鏈式調用多個流:

a.pipe(b).pipe(c).pipe(d)

上面的代碼也可以等價為:

a.pipe(b);
b.pipe(c);
c.pipe(d);

這和你在unix中編寫流代碼很類似:

a | b | c | d

只不過此時你是在node中編寫而不是在shell中!

readable流

Readable流可以產出數據,你可以將這些數據傳送到一個writable,transform或者duplex流中,只需要調用pipe()方法:

readableStream.pipe(dst)

創建一個readable流

現在我們就來創建一個readable流!

var Readable = require('stream').Readable;

var rs = new Readable;
rs.push('beep ');
rs.push('boop\n');
rs.push(null);

rs.pipe(process.stdout);

下面運行代碼:

$ node read0.js
beep boop

在上面的代碼中rs.push(null)的作用是告訴rs輸出數據應該結束了。

需要注意的一點是我們在將數據輸出到process.stdout之前已經將內容推送進readable流rs中,但是所有的數據依然是可寫的。

這是因為在你使用.push()將數據推進一個readable流中時,一直要到另一個東西來消耗數據之前,數據都會存在一個緩存中。

然而,在更多的情況下,我們想要的是當需要數據時數據才會產生,以此來避免大量的緩存數據。

我們可以通過定義一個._read函數來實現按需推送數據:

var Readable = require('stream').Readable;
var rs = Readable();

var c = 97;
rs._read = function () {
    rs.push(String.fromCharCode(c++));
    if (c > 'z'.charCodeAt(0)) rs.push(null);
};

rs.pipe(process.stdout);

代碼的運行結果如下所示:

$ node read1.js
abcdefghijklmnopqrstuvwxyz

在這里我們將字母a到z推進了rs中,但是只有當數據消耗者出現時,數據才會真正實現推送。

_read函數也可以獲取一個size參數來指明消耗者想要讀取多少比特的數據,但是這個參數是可選的。

需要注意到的是你可以使用util.inherit()來繼承一個Readable流。

為了說明只有在數據消耗者出現時,_read函數才會被調用,我們可以將上面的代碼簡單的修改一下:

var Readable = require('stream').Readable;
var rs = Readable();

var c = 97 - 1;

rs._read = function () {
    if (c >= 'z'.charCodeAt(0)) return rs.push(null);

    setTimeout(function () {
        rs.push(String.fromCharCode(++c));
    }, 100);
};

rs.pipe(process.stdout);

process.on('exit', function () {
    console.error('\n_read() called ' + (c - 97) + ' times');
});
process.stdout.on('error', process.exit);

運行上面的代碼我們可以發現如果我們只請求5比特的數據,那么_read只會運行5次:

$ node read2.js | head -c5
abcde
_read() called 5 times

在上面的代碼中,setTimeout很重要,因為操作系統需要花費一些時間來發送程序結束信號。

另外,process.stdout.on('error',fn)處理器也很重要,因為當head不再關心我們的程序輸出時,操作系統將會向我們的進程發送一個SIGPIPE信號,此時process.stdout將會捕獲到一個EPIPE錯誤。

上面這些復雜的部分在和操作系統相關的交互中是必要的,但是如果你直接和node中的流交互的話,則可有可無。

如果你創建了一個readable流,并且想要將任何的值推送到其中的話,確保你在創建流的時候指定了objectMode參數,Readable({ objectMode: true })。

消耗一個readable流

大部分時候,將一個readable流直接pipe到另一種類型的流或者使用through或者concat-stream創建的流中,是一件很容易的事情。但是有時我們也會需要直接來消耗一個readable流。

process.stdin.on('readable', function () {
    var buf = process.stdin.read();
    console.dir(buf);
});

代碼運行結果如下所示:

$ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume0.js 
<Buffer 61 62 63 0a>
<Buffer 64 65 66 0a>
<Buffer 67 68 69 0a>
null

當數據可用時,readable時間將會被觸發,此時你可以調用.read()方法來從緩存中獲取這些數據。

當流結束時,.read()將返回null,因為此時已經沒有更多的字節可以供我們獲取了。

你也可以告訴.read()方法來返回n個比特的數據。雖然所有核心對象中的流都支持這種方式,但是對于對象流來說這種方法并不可用。

下面是一個例子,在這里我們制定每次讀取3個比特的數據:

process.stdin.on('readable', function () {
    var buf = process.stdin.read(3);
    console.dir(buf);
});

運行上面的例子,我們將獲取到不完整的數據:

$ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume1.js 
<Buffer 61 62 63>
<Buffer 0a 64 65>
<Buffer 66 0a 67>

這是因為多余的數據都留在了內部的緩存中,因此這個時候我們需要告訴node我們還對剩下的數據感興趣,我們可以使用.read(0)來完成這件事:

process.stdin.on('readable', function () {
    var buf = process.stdin.read(3);
    console.dir(buf);
    process.stdin.read(0);
});

到現在為止我們的代碼和我們所期望的一樣了!

$ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume2.js 
<Buffer 61 62 63>
<Buffer 0a 64 65>
<Buffer 66 0a 67>
<Buffer 68 69 0a>

我們也可以使用.unshift()方法來放置多余的數據。

使用unshift()方法能夠放置我們進行不必要的緩存拷貝。在下面的代碼中我們將創建一個分割新行的可讀解析器:

var offset = 0;

process.stdin.on('readable', function () {
    var buf = process.stdin.read();
    if (!buf) return;
    for (; offset < buf.length; offset++) {
        if (buf[offset] === 0x0a) {
            console.dir(buf.slice(0, offset).toString());
            buf = buf.slice(offset + 1);
            offset = 0;
            process.stdin.unshift(buf);
            return;
        }
    }
    process.stdin.unshift(buf);
});

代碼的運行結果如下所示:

$ tail -n +50000 /usr/share/dict/american-english | head -n10 | node lines.js 
'hearties'
'heartiest'
'heartily'
'heartiness'
'heartiness\'s'
'heartland'
'heartland\'s'
'heartlands'
'heartless'
'heartlessly'

當然,已經有很多這樣的模塊比如split來幫助你完成這件事情,你完全不需要自己寫一個。

writable流

一個writable流指的是只能流進不能流出的流:

src.pipe(writableStream)

創建一個writable流

只需要定義一個._write(chunk,enc,next)函數,你就可以將一個readable流的數據釋放到其中:

var Writable = require('stream').Writable;
var ws = Writable();
ws._write = function (chunk, enc, next) {
    console.dir(chunk);
    next();
};

process.stdin.pipe(ws);

代碼運行結果如下所示:

$ (echo beep; sleep 1; echo boop) | node write0.js 
<Buffer 62 65 65 70 0a>
<Buffer 62 6f 6f 70 0a>

第一個參數,chunk代表寫進來的數據。

第二個參數enc代表編碼的字符串,但是只有在opts.decodeString為false的時候你才可以寫一個字符串。

第三個參數,next(err)是一個回調函數,使用這個回調函數你可以告訴數據消耗者可以寫更多的數據。你可以有選擇性的川籍哪一個錯誤對象error,這時會在流實體上出發一個emit事件。

在從一個readable流向一個writable流傳數據的過程中,數據會自動被轉換為Buffer對象,除非你在創建writable流的時候制定了decodeStrings參數為false,Writable({decodeStrings: false})。

如果你需要傳遞對象,需要指定objectMode參數為true,Writable({ objectMode: true })。

向一個writable流中寫東西

如果你需要向一個writable流中寫東西,只需要調用.write(data)即可。

process.stdout.write('beep boop\n');

為了告訴一個writable流你已經寫完畢了,只需要調用.end()方法。你也可以使用.end(data)在結束前再寫一些數據。

var fs = require('fs');
var ws = fs.createWriteStream('message.txt');

ws.write('beep ');

setTimeout(function () {
    ws.end('boop\n');
}, 1000);

運行結果如下所示:

$ node writing1.js 
$ cat message.txt
beep boop

如果你在創建writable流時指定了highWaterMark參數,那么當沒有更多數據寫入時,調用.write()方法將會返回false。

如果你想要等待緩存情況,可以監聽drain事件。

transform流

你可以將transform流想象成一個流的中間部分,它可以讀也可寫,但是并不保存數據,它只負責處理流經它的數據。

duplex流

Duplex流是一個可讀也可寫的流,就好像一個電話,可以接收也可以發送語音。一個rpc交換是一個duplex流的最好的例子。如果你看到過下面這樣的代碼:

a.pipe(b).pipe(a)

那么你需要處理的就是一個duplex流對象。

classic流

Classic流是一個古老的接口,最早出現在node 0.4中。雖然現在不怎么用,但是我們最好還是來了解一下它的工作原理。

無論何時,只要一個流對象注冊了一個data監聽器,它就會自動的切換到classic模式,并且根據舊API的方式運行。

classic readable流

Classic readable流只是一個事件發射器,當有數據消耗者出現時發射emit事件,當輸出數據完畢時發射end事件。

我們可以同構檢查stream.readable來檢查一個classic流對象是否可讀。

下面是一個簡單的readable流對象的例子,程序的運行結果將會輸出A到J:

var Stream = require('stream');
var stream = new Stream;
stream.readable = true;

var c = 64;
var iv = setInterval(function () {
    if (++c >= 75) {
        clearInterval(iv);
        stream.emit('end');
    }
    else stream.emit('data', String.fromCharCode(c));
}, 100);

stream.pipe(process.stdout);

運行結果如下所示:

$ node classic0.js
ABCDEFGHIJ

為了從一個classic readable流中讀取數據,你可以注冊data和end監聽器。下面是一個使用舊readable流方式從process.stdin中讀取數據的例子:

process.stdin.on('data', function (buf) {
    console.log(buf);
});
process.stdin.on('end', function () {
    console.log('__END__');
});

運行結果如下所示:

$ (echo beep; sleep 1; echo boop) | node classic1.js 
<Buffer 62 65 65 70 0a>
<Buffer 62 6f 6f 70 0a>
__END__

需要注意的一點是當你在一個流對象上注冊了一個data監聽器,你就將這個流放在了兼容模式下,此時你不能使用兩個stream2的api。

如果你自己創建流對象,永遠不要綁定data和end監聽器。如果你需要和舊版本的流兼容,最好使用第三方庫來實現.pipe()方法。

例如,你可以使用through模塊來避免顯式的使用data和end監聽器:

var through = require('through');
process.stdin.pipe(through(write, end));

function write (buf) {
    console.log(buf);
}
function end () {
    console.log('__END__');
}

程序運行結果如下所示:

$ (echo beep; sleep 1; echo boop) | node through.js 
<Buffer 62 65 65 70 0a>
<Buffer 62 6f 6f 70 0a>
__END__

你也可以使用concat-stream模塊來將整個流的內容緩存起來:

var concat = require('concat-stream');
process.stdin.pipe(concat(function (body) {
    console.log(JSON.parse(body));
}));

程序運行結果如下所示:

$ echo '{"beep":"boop"}' | node concat.js 
{ beep: 'boop' }

Classic readable流擁有.pause()和.resume()邏輯來暫停一個流,但是這都是可選的。如果你想要使用.pause()和.resume()方法,你應該使用through模塊來幫助你處理緩存。

classic writable流

Classic writable流非常簡單。其中只定義了.write(buf),.end(buf),以及.desctory()方法。其中.end(buf)的參數buf是可選參數,但是一般來說node程序員還是喜歡使用.end(buf)這種寫法。

接下來讀什么

  • node核心stream模塊文檔
  • 你可以使用readable-stream模塊來確保你的stream2代碼兼容node 0.8及其之前的代碼。在你npm install readable-stream之后直接reauire('readable-stream')而不要require('stream')。

本文參考自stream-handbook,原文地址https://github.com/substack/stream-handbook
來自:https://github.com/jabez128/stream-handbook

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!