Async 模塊實現入門淺析

summeryct 7年前發布 | 27K 次閱讀 JavaScript開發 JavaScript

在早期的異步開發中, Async 模塊是比較有名的異步解決方案。本文會帶大家簡單看一下 async 模塊的幾個方法實現思路,具體分別是:

  • async.waterfall
  • async.each
  • async.eachLimit
  • async.whilst

PS:本文有相應視頻—— 優酷地址 (聲音據說有點小)。

 

waterfall

我們先來看下一個 async.waterfall 的簡單使用場景登錄:

async.waterfall([
  function (next) {
    user.get(name, next);
  },
  function (user, next) {
    if (!user) {
      return next(new Error('user not found'));
    }
    if (passwd != user.passwd) {
      return next(new Error('wrong password'));
    }
    sign.up(name, next);
  },
  function (reward, next) {
    resource.add(name, reward, next);
  },
], function (err, ...res) {
  if (err) {
    console.error(err.stack);
  }
  console.log(res);
});

async 的思路是將原本容易出現 callback hell 的嵌套,通過數組并列的方式抹平,并且節省每次判斷 error 的代碼,按照 error first 的約定在內部每次都幫助用戶檢查異步是否出錯。了解了這種想法之后我們可以寫個很簡單的 waterfall 出來。

// 確認整體結構
exports.waterfall = function (task = [], callback = noop) { // 默認值
  // 拿到 callback 數組
  if (!Array.isArray(task)) {
    return callback(new Error('task should be an array!'));
  }

  // TODO
};

function noop() {}

拿到了 callback 數組之后,我們需要想辦法,讓這個數組串聯的執行起來,即從數組的第一個 callback 開始,一個執行完就自動調用下一個 callback:

exports.waterfall = function (task = [], callback = noop) {
  if (!Array.isArray(task)) {
    return callback(new Error('task should be an array!'));
  }

  (function next() {
    // 取出數組中的第一個 callback 執行
    let fn = task.shift();
    fn.apply(null, [next]); // ①用戶自行調用這個 next
  })();
};

關于 ① 處流程自行走下去結合這里看看:

async.waterfall([
    function(callback) {
        callback(null, 'one', 'two'); // ①這里 callback 就調用了 next
    },
// ...

理解了這個剩下的就比較好辦了:

exports.waterfall = function (task = [], callback = noop) {
  if (!Array.isArray(task)) {
    return callback(new Error('task should be an array!'));
  }

  (function next(...args) { // args 獲取上一個 callback 傳的結果
    if (args[0]) { // error first 約定
      // 發現第一個參數存在 error 直接返回結束整個流程
      return callback(args[0]);
    }

    if (task.length) { // 判斷 callback 是不是執行完了
      let fn = task.shift();
      // ② 將 args 平攤到下一個 cb 的開頭,next 位于最后
      fn.apply(null, [...args.slice(1), next]);
    } else {
      // 如果執行完了就結束流程
      callback.apply(null, args);
    }
  })();
};

關于 ② 可以結合例子來看:

async.waterfall([
    function(callback) {
        callback(null, 'one', 'two');
    },
    function(arg1, arg2, callback) { // ②
        // arg1 now equals 'one' and arg2 now equals 'two'
        callback(null, 'three');
    },
    function(arg1, callback) {
        // arg1 now equals 'three'
        callback(null, 'done');
    }
], function (err, result) {
    // result now equals 'done'
});

那么到這里一個簡單的 waterfall 的實現思路已經完全展現出來了。最后說一下可能出現的問題,比如用戶多調了一次 cb (這種情況確實可能出現)所以我們需要做一些簡單的預防:

exports.waterfall = function (task = [], callback = noop) {
  if (!Array.isArray(task)) {
    return callback(new Error('task should be an array!'));
  }

  (function next(...args) {
    if (args[0]) {
      return callback(args[0]);
    }

    if (task.length) {
      let fn = task.shift();
      fn.apply(null, [...args.slice(1), onlyOnce(next)]); // 保證只被調用一次
    } else {
      callback.apply(null, args);
    }
  })();
};

function onlyOnce(cb) {
  let flag = false;
  return (...args) => {
    if (flag) {
      return cb(new Error('cb already called'));
    }
    cb.apply(null, args);
    flag = true;
  };
}

 

each

async.each 有點像是異步的 arr.map 操作。我們可以來看一個使用的例子:

'use strict';

const fs = require('fs');
const async = require('async');
const request = require('request');

const sites = ['www.baidu.com','github.com','www.npmjs.com', 'www.zhihu.com'];

// 下站站點圖標
function downloadFavicon(site, next) {
  let addr = `https://${site}/favicon.ico`;
  let file = `./${site}.ico`;
  request.get(addr)
    .pipe(fs.createWriteStream(file))
    .on('error', (err) => {
      console.error(`${url} Download failed: ${err.message}`);
      next();
    })
    .on('finish', next);
}

// 下載每一個站點的圖標
async.each(sites, downloadFavicon, function (err) {
  if (err) {
    console.log('err', err);
  }
  console.log('over');
});

那么按照例子,我們可以先來搭一個 async.each 的架子:

exports.each = function (items = [], iterator, callback = noop) {
  // 判斷數組類型
  if (!Array.isArray(items)) {
    return callback(new Error('items should be an array!'));
  }

  // 判斷迭代器
  if (typeof iterator != 'function') {
    return callback(new Error('iterator should be a function!'));
  }

  // TODO
};

然后我們需要做的事情很簡單,只需要將數組的每個一個元素作為參數拿來調用 iterator 函數即可:

exports.each = function (items = [], iterator, callback = noop) {
  if (!Array.isArray(items)) {
    return callback(new Error('items should be an array!'));
  }

  if (typeof iterator != 'function') {
    return callback(new Error('iterator should be a function!'));
  }

  function next(err) {
    // TODO
  }

  items.map((item) => iterator(item, next));
};

然后我們要想辦法在所有的異步操作都執行完之后調用 callback 出去

exports.each = function (items = [], iterator, callback = noop) {
  if (!Array.isArray(items)) {
    return callback(new Error('items should be an array!'));
  }

  if (typeof iterator != 'function') {
    return callback(new Error('iterator should be a function!'));
  }

  let completed = 0; // 計數

  function next(err) {
    if (err) { // error first
      return callback(err); // 結束流程
    }

    if (++completed >= items.length) { // 計數判斷
      callback(); // 流程結束
    }
  }

  items.map((item) => iterator(item, next));
};

async.each 的實現思路確實如上述例子一樣簡單,當然還可能會有一些復雜的情況需要判斷,更深入的內容各位可以移步 Async 官方的 each 實現 中查看更多。

eachLimit

使用 each 執行操作的時候,在量小的情況下是沒有問題的,但是當異步操作的量特別大的時候,就需要對其進行一定的控制。比如寫一個爬蟲去某種網站上爬圖片,那么將圖片下載到本地的過程中存在一個文件描述符的限制,即同時打開的文件(保存圖片時需要openFile)數目超過一定程度就會收到操作系統的報錯。

以 each 中出現過的例子來說 eachLimit 的功能:

const sites = [ ... ]; // 可能非常多站點

// 對 each 操作做 limit,同時最多下載 100 個站點圖標
async.eachLimit(sites, 100, downloadFavicon, function (err) {
  if (err) {
    console.log('err', err);
  }
  console.log('over');
});

了解了上述需求之后,我們來搭一個 eachLimit 的架子:

exports.eachLimit = function (items = [], limit = 1, iterator, callback = noop) {
  if (!Array.isArray(items)) {
    return callback(new Error('items should be an array!'));
  }

  if (typeof iterator != 'function') {
    return callback(new Error('iterator should be a function!'));
  }

  // 同時執行的異步操作數目 (不能超過 limit)
  let running = 0;

  // TODO
};

我首先需要一個循環來將異步操作加入到執行隊列,但是只能加到 limit 的數目為止:

let running = 0;

(function next() {
  while (running < limit) { // 一口氣加到隊列滿為止
    let item = items.shift();
    running++;

    iterator(item, (err) => {
      running--;
      next(); // 每執行完一個異步操作就觸發一下加入隊列的行為
    });
  }
})();

然后加上結束的操作:

let done = false;
let running = 0;

(function next() {
  if (done && running <= 0) {
    return callback();
  }

  while (running < limit) {
    let item = items.shift();
    running++;
    if (item === undefined) {
      done = true;
      if (running <= 0) {
        callback();
      }
      return;
    }

    iterator(item, (err) => {
      running--;
      next();
    });
  }
})();

最后補上錯誤處理的完整版:

exports.eachLimit = function (items = [], limit = 1, iterator, callback = noop) {
  if (!Array.isArray(items)) {
    return callback(new Error('items should be an array!'));
  }

  if (typeof iterator != 'function') {
    return callback(new Error('iterator should be a function!'));
  }

  let done = false;
  let running = 0;
  let errored = false;

  (function next() {
    if (done && running <= 0) {
      return callback();
    }

    while (running < limit && !errored) {
      let item = items.shift();
      running++;
      if (item === undefined) {
        done = true;
        if (running <= 0) {
          callback();
        }
        return;
      }

      iterator(item, (err) => {
        running--;
        if (err) {
          errored = true;
          return callback(err);
        }
        next();
      });
    }
  })();
};

 

whilst

最后我們來看一個循環異步 whilst 的實現,也是非常的簡單。我們先看看使用例子:

'use strict';

const async = require('async');

let count = 0;
async.whilst(
  function () { return count < 5; },
  function (callback) {
    console.log('count', count++);
    setTimeout(callback, 1000);
  },
  function (err) {
    console.log('over');
  }
);

然后因為比較簡單,直接來看代碼吧:

exports.whilst = function (test, iterator, callback = noop) {
  if (typeof test != 'function') {
    return callback(new Error('iterator should be a function!'));
  }
  if (typeof iterator != 'function') {
    return callback(new Error('iterator should be a function!'));
  }

  (function next() {
    if (test()) {
      iterator((err) => {
        if (err) {
          return callback(err);
        }
        next();
      });
    }
  })();
};

 

小結

綜上,本文為 callback 的異步流程封裝控制的思路做了一點微小的整理工作。實現上并沒有完全遵循原版,而是選擇使用 es6 的新特性勁量讓代碼看起來簡(zhuang)潔(bi),整體上是為了展現一個思路可能有不少細節沒有處理,完整的部分參見 async 官方文檔

async 的優點可以簡單的說,由于 async 基于原生的 callback 所以相比 promise/co 等方式性較好(目前最快的方式是專門優化了速度的 neo-async )。并且 async 提供了非常多、非常全面的 60+ 種異步操作方式,功能可謂十分強大。

最后簡單提一下 async 的一些缺點:

  1. 基于 error first 的 約定 。約定的意思就是不是強制的,也就存在不了解這個約定或者使用錯誤方面的問題。
  2. 流程沒有狀態。
  3. 由于功能太過強大(如 async.auto)存在可能濫用的問題。
  4. 錯誤棧曲折排查困難。

 

來自:https://zhuanlan.zhihu.com/p/27303127

 

 本文由用戶 summeryct 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!