Node.js Stream - 實戰篇

KirstenGumm 9年前發布 | 24K 次閱讀 Node.js Node.js 開發 Browserify

背景

前面兩篇(基礎篇和進階篇)主要介紹流的基本用法和原理,本篇從應用的角度,介紹如何使用管道進行程序設計,主要內容包括:

Pipeline

所謂“管道”,指的是通過 a.pipe(b) 的形式連接起來的多個Stream對象的組合。

假如現在有兩個 Transform : bold 和 red ,分別可將文本流中某些關鍵字加粗和飄紅。

可以按下面的方式對文本同時加粗和飄紅:

// source: 輸入流
// dest: 輸出目的地
source.pipe(bold).pipe(red).pipe(dest)

bold.pipe(red) 便可以看作一個管道,輸入流先后經過 bold 和 red 的變換再輸出。

但如果這種加粗且飄紅的功能的應用場景很廣,我們期望的使用方式是:

// source: 輸入流
// dest: 輸出目的地
// pipeline: 加粗且飄紅
source.pipe(pipeline).pipe(dest)

此時, pipeline 封裝了 bold.pipe(red) ,從邏輯上來講,也稱其為管道。

其實現可簡化為:

var pipeline = new Duplex()
var streams = pipeline._streams = [bold, red]

// 底層寫邏輯:將數據寫入管道的第一個Stream,即bold pipeline._write = function (buf, enc, next) { streams[0].write(buf, enc, next) }

// 底層讀邏輯:從管道的最后一個Stream(即red)中讀取數據 pipeline._read = function () { var buf var reads = 0 var r = streams[streams.length - 1] // 將緩存讀空 while ((buf = r.read()) !== null) { pipeline.push(buf) reads++ } if (reads === 0) { // 緩存本來為空,則等待新數據的到來 r.once('readable', function () { pipeline._read() }) } }

// 將各個Stream組合起來(此處等同于bold.pipe(red)) streams.reduce(function (r, next) { r.pipe(next) return next })</code></pre>

往 pipeline 寫數據時,數據直接寫入 bold ,再流向 red ,最后從 pipeline 讀數據時再從 red 中讀出。

如果需要在中間新加一個 underline 的Stream,可以:

pipeline._streams.splice(1, 0, underline)
bold.unpipe(red)
bold.pipe(underline).pipe(red)

如果要將 red 替換成 green ,可以:

// 刪除red
pipeline._streams.pop()
bold.unpipe(red)

// 添加green pipeline._streams.push(green) bold.pipe(green)</code></pre>

可見,這種管道的各個環節是可以修改的。

stream-splicer 對上述邏輯進行了進一步封裝,提供 splice 、 push 、 pop 等方法,使得 pipeline 可以像數組那樣被修改:

var splicer = require('stream-splicer')
var pipeline = splicer([bold, red])
// 在中間添加underline
pipeline.splice(1, 0, underline)

// 刪除red pipeline.pop()

// 添加green pipeline.push(green)</code></pre>

labeled-stream-splicer 在此基礎上又添加了使用名字替代下標進行操作的功能:

var splicer = require('labeled-stream-splicer')
var pipeline = splicer([
  'bold', bold,
  'red', red,
])

// 在red前添加underline pipeline.splice('red', 0, underline)

// 刪除bold pipeline.splice('bold', 1)</code></pre>

由于 pipeline 本身與其各個環節一樣,也是一個Stream對象,因此可以嵌套:

var splicer = require('labeled-stream-splicer')
var pipeline = splicer([
  'style', [ bold, red ],
  'insert', [ comma ],
])

pipeline.get('style') // 取得管道:[bold, red] .splice(1, 0, underline) // 添加underline</code></pre>

Browserify

Browserify 的功能介紹可見 substack/browserify-handbook ,其核心邏輯的實現在于管道的設計:

var splicer = require('labeled-stream-splicer')
var pipeline = splicer.obj([
    // 記錄輸入管道的數據,重建管道時直接將記錄的數據寫入。
    // 用于像watch時需要多次打包的情況
    'record', [ this._recorder() ],
    // 依賴解析,預處理
    'deps', [ this._mdeps ],
    // 處理JSON文件
    'json', [ this._json() ],
    // 刪除文件前面的BOM
    'unbom', [ this._unbom() ],
    // 刪除文件前面的`#!`行
    'unshebang', [ this._unshebang() ],
    // 語法檢查
    'syntax', [ this._syntax() ],
    // 排序,以確保打包結果的穩定性
    'sort', [ depsSort(dopts) ],
    // 對擁有同樣內容的模塊去重
    'dedupe', [ this._dedupe() ],
    // 將id從文件路徑轉換成數字,避免暴露系統路徑信息
    'label', [ this._label(opts) ],
    // 為每個模塊觸發一次dep事件
    'emit-deps', [ this._emitDeps() ],
    'debug', [ this._debug(opts) ],
    // 將模塊打包
    'pack', [ this._bpack ],
    // 更多自定義的處理
    'wrap', [],
])

每個模塊用 row 表示,定義如下:

{
  // 模塊的唯一標識
  id: id,
  // 模塊對應的文件路徑
  file: '/path/to/file',
  // 模塊內容
  source: '',
  // 模塊的依賴
  deps: {
    // `require(expr)`
    expr: id,
  }
}

在 wrap 階段前,所有的階段都處理這樣的對象流,且除 pack 外,都輸出這樣的流。

有的補充 row 中的一些信息,有的則對這些信息做一些變換,有的只是讀取和輸出。

一般 row 中的 source 、 deps 內容都是在 deps 階段解析出來的。

下面提供一個修改 Browserify 管道的函數。

var Transform = require('stream').Transform
// 創建Transform對象
function through(write, end) {
  return Transform({
    transform: write,
    flush: end,
  })
}

// b為Browserify實例 // 該插件可打印出打包時間 function log(b) { // watch時需要重新打包,整個pipeline會被重建,所以也要重新修改 b.on('reset', reset) // 修改當前pipeline reset()

function reset () { var time = null var bytes = 0 b.pipeline.get('record').on('end', function () { // 以record階段結束為起始時刻 time = Date.now() })

// `wrap`是最后一個階段,在其后添加記錄結束時刻的Transform
b.pipeline.get('wrap').push(through(write, end))
function write (buf, enc, next) {
  // 累計大小
  bytes += buf.length
  this.push(buf)
  next()
}
function end () {
  // 打包時間
  var delta = Date.now() - time
  b.emit('time', delta)
  b.emit('bytes', bytes)
  b.emit('log', bytes + ' bytes written ('
    + (delta / 1000).toFixed(2) + ' seconds)'
  )
  this.push(null)
}

} }

var fs = require('fs') var browserify = require('browserify') var b = browserify(opts) // 應用插件 b.plugin(log) b.bundle().pipe(fs.createWriteStream('bundle.js'))</code></pre>

事實上,這里的 b.plugin(log) 就是直接執行了 log(b) 。

在插件中,可以修改 b.pipeline 中的任何一個環節。

因此, Browserify 本身只保留了必要的功能,其它都由插件去實現,如 watchifyfactor-bundle 等。

除了了上述的插件機制外, Browserify 還有一套Transform機制,即通過 b.transform(transform) 可以新增一些文件內容預處理的Transform。

預處理是發生在 deps 階段的,當模塊文件內容被讀出來時,會經過這些Transform處理,然后才做依賴解析,如 babelifyenvify

Gulp

Gulp 的核心邏輯分成兩塊:任務調度與文件處理。

任務調度是基于 orchestrator ,而文件處理則是基于 vinyl-fs

類似于 Browserify 提供的模塊定義(用 row 表示), vinyl-fs 也提供了文件定義( vinyl 對象)。

Browserify 的管道處理的是 row 流, Gulp 管道處理 vinyl 流:

gulp.task('scripts', ['clean'], function() {
  // Minify and copy all JavaScript (except vendor scripts) 
  // with sourcemaps all the way down 
  return gulp.src(paths.scripts)
    .pipe(sourcemaps.init())
    .pipe(coffee())
    .pipe(uglify())
    .pipe(concat('all.min.js'))
    .pipe(sourcemaps.write())
    .pipe(gulp.dest('build/js'));
});

任務中創建的管道起始于 gulp.src ,終止于 gulp.dest ,中間有若干其它的Transform(插件)。

如果與 Browserify 的管道對比,可以發現 Browserify 是確定了一條具有完整功能的管道,而 Gulp 本身只提供了創建 vinyl 流和將 vinyl 流寫入磁盤的工具,管道中間經歷什么全由用戶決定。

這是因為任務中做什么,是沒有任何限制的,文件處理也只是常見的情況,并非一定要用 gulp.src 與 gulp.dest 。

兩種模式比較

BrowserifyGulp 都借助管道的概念來實現插件機制。

Browserify 定義了模塊的數據結構,提供了默認的管道以處理這樣的數據流,而插件可用來修改管道結構,以定制處理行為。

Gulp 雖也定義了文件的數據結構,但只提供產生、消耗這種數據流的接口,完全由用戶通過插件去構造處理管道。

當明確具體的處理需求時,可以像 Browserify 那樣,構造一個基本的處理管道,以提供插件機制。

如果需要的是實現任意功能的管道,可以如 Gulp 那樣,只提供數據流的抽象。

實例

本節中實現一個針對 Git 倉庫自動生成changelog的工具,完整代碼見 ezchangelog

ezchangelog 的輸入為 git log 生成的文本流,輸出默認為markdown格式的文本流,但可以修改為任意的自定義格式。

輸入示意:

commit 9c5829ce45567bedccda9beb7f5de17574ea9437
Author: zoubin <zoubin04@gmail.com>
Date:   Sat Nov 7 18:42:35 2015 +0800

CHANGELOG

commit 3bf9055b732cc23a9c14f295ff91f48aed5ef31a Author: zoubin <zoubin04@gmail.com> Date: Sat Nov 7 18:41:37 2015 +0800

4.0.3

commit 87abe8e12374079f73fc85c432604642059806ae Author: zoubin <zoubin04@gmail.com> Date: Sat Nov 7 18:41:32 2015 +0800

fix readme
add more tests</code></pre> 

輸出示意:

* [[`9c5829c`](https://github.com/zoubin/ezchangelog/commit/9c5829c)] CHANGELOG

## [v4.0.3](https://github.com/zoubin/ezchangelog/commit/3bf9055) (2015-11-07)

* [[`87abe8e`](https://github.com/zoubin/ezchangelog/commit/87abe8e)] fix readme

    add more tests

其實需要的是這樣一個 pipeline :

source.pipe(pipeline).pipe(dest)

可以分為兩個階段:

  • parse:從輸入文本流中解析出commit信息
  • format: 將commit流變換為文本流

默認的情況下,要想得到示例中的markdown,需要解析出每個commit的sha1、日期、消息、是否為tag。

定義commit的格式如下:

{
  commit: {
    // commit sha1
    long: '3bf9055b732cc23a9c14f295ff91f48aed5ef31a',
    short: '3bf9055',
  },
  committer: {
    // commit date
    date: new Date('Sat Nov 7 18:41:37 2015 +0800'),
  },
  // raw message lines
  messages: ['', '    4.0.3', ''],
  // raw headers before the messages
  headers: [
    ['Author', 'zoubin <zoubin04@gmail.com>'],
    ['Date', 'Sat Nov 7 18:41:37 2015 +0800'],
  ],
  // the first non-empty message line
  subject: '4.0.3',
  // other message lines
  body: '',
  // git tag
  tag: 'v4.0.3',
  // link to the commit. opts.baseUrl should be specified.
  url: 'https://github.com/zoubin/ezchangelog/commit/3bf9055',
}

于是有:

var splicer = require('labeled-stream-splicer')
pipeline = splicer.obj([
  'parse', [
    // 按行分隔
    'split', split(),
    // 生成commit對象,解析出sha1和日期
    'commit', commit(),
    // 解析出tag
    'tag', tag(),
    // 解析出url
    'url', url({ baseUrl: opts.baseUrl }),
  ],
  'format', [
    // 將commit組合成markdown文本
    'markdownify', markdownify(),
  ],
])

至此,基本功能已經實現。

現在將其封裝并提供插件機制。

function Changelog(opts) {
  opts = opts || {}
  this._options = opts
  // 創建pipeline
  this.pipeline = splicer.obj([
    'parse', [
      'split', split(),
      'commit', commit(),
      'tag', tag(),
      'url', url({ baseUrl: opts.baseUrl }),
    ],
    'format', [
      'markdownify', markdownify(),
    ],
  ])

  // 應用插件
  ;[].concat(opts.plugin).filter(Boolean).forEach(function (p) {
    this.plugin(p)
  }, this)
}

Changelog.prototype.plugin = function (p, opts) {
  if (Array.isArray(p)) {
    opts = p[1]
    p = p[0]
  }
  // 執行插件函數,修改pipeline
  p(this, opts)
  return this
}

上面的實現提供了兩種方式來應用插件。

一種是通過配置傳入,另一種是創建實例后再調用 plugin 方法,本質一樣。

為了使用方便,還可以簡單封裝一下。

function changelog(opts) {
  return new Changelog(opts).pipeline
}

這樣,就可以如下方式使用:

source.pipe(changelog()).pipe(dest)

這個已經非常接近我們的預期了。

現在來開發一個插件,修改默認的渲染方式。

var through = require('through2')

function customFormatter(c) {
  // c是`Changelog`實例

  // 添加解析author的transform
  c.pipeline.get('parse').push(through.obj(function (ci, enc, next) {
    // parse the author name from: 'zoubin <zoubin04@gmail.com>'
    ci.committer.author = ci.headers[0][1].split(/\s+/)[0]
    next(null, ci)
  }))

  // 替換原有的渲染
  c.pipeline.get('format').splice('markdownify', 1, through.obj(function (ci, enc, next) {
    var sha1 = ci.commit.short
    sha1 = '[`' + sha1 + '`](' + c._options.baseUrl + sha1 + ')'
    var date = ci.committer.date.toISOString().slice(0, 10)
    next(null, '* ' + sha1 + ' ' + date + ' @' + ci.committer.author + '\n')
  }))
}

source
  .pipe(changelog({
    baseUrl: 'https://github.com/zoubin/ezchangelog/commit/',
    plugin: [customFormatter],
  }))
  .pipe(dest)

同樣的輸入,輸出將會是:

* [`9c5829c`](https://github.com/zoubin/ezchangelog/commit/9c5829c) 2015-11-07 @zoubin
* [`3bf9055`](https://github.com/zoubin/ezchangelog/commit/3bf9055) 2015-11-07 @zoubin
* [`87abe8e`](https://github.com/zoubin/ezchangelog/commit/87abe8e) 2015-11-07 @zoubin

可以看出,通過創建可修改的管道, ezchangelog 保持了本身邏輯的單一性,同時又提供了強大的自定義空間。

系列文章

參考文獻

 

來自:http://tech.meituan.com/stream-in-action.html

 

 本文由用戶 KirstenGumm 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!