Async 模塊實現入門淺析
在早期的異步開發中, Async 模塊是比較有名的異步解決方案。本文會帶大家簡單看一下 async 模塊的幾個方法實現思路,具體分別是:
- async.waterfall
- async.each
- async.eachLimit
- async.whilst
PS:本文有相應視頻—— 優酷地址 (聲音據說有點小)。
waterfall
我們先來看下一個 async.waterfall 的簡單使用場景登錄:
async.waterfall([
function (next) {
user.get(name, next);
},
function (user, next) {
if (!user) {
return next(new Error('user not found'));
}
if (passwd != user.passwd) {
return next(new Error('wrong password'));
}
sign.up(name, next);
},
function (reward, next) {
resource.add(name, reward, next);
},
], function (err, ...res) {
if (err) {
console.error(err.stack);
}
console.log(res);
});
async 的思路是將原本容易出現 callback hell 的嵌套,通過數組并列的方式抹平,并且節省每次判斷 error 的代碼,按照 error first 的約定在內部每次都幫助用戶檢查異步是否出錯。了解了這種想法之后我們可以寫個很簡單的 waterfall 出來。
// 確認整體結構
exports.waterfall = function (task = [], callback = noop) { // 默認值
// 拿到 callback 數組
if (!Array.isArray(task)) {
return callback(new Error('task should be an array!'));
}
// TODO
};
function noop() {}
拿到了 callback 數組之后,我們需要想辦法,讓這個數組串聯的執行起來,即從數組的第一個 callback 開始,一個執行完就自動調用下一個 callback:
exports.waterfall = function (task = [], callback = noop) {
if (!Array.isArray(task)) {
return callback(new Error('task should be an array!'));
}
(function next() {
// 取出數組中的第一個 callback 執行
let fn = task.shift();
fn.apply(null, [next]); // ①用戶自行調用這個 next
})();
};
關于 ① 處流程自行走下去結合這里看看:
async.waterfall([
function(callback) {
callback(null, 'one', 'two'); // ①這里 callback 就調用了 next
},
// ...
理解了這個剩下的就比較好辦了:
exports.waterfall = function (task = [], callback = noop) {
if (!Array.isArray(task)) {
return callback(new Error('task should be an array!'));
}
(function next(...args) { // args 獲取上一個 callback 傳的結果
if (args[0]) { // error first 約定
// 發現第一個參數存在 error 直接返回結束整個流程
return callback(args[0]);
}
if (task.length) { // 判斷 callback 是不是執行完了
let fn = task.shift();
// ② 將 args 平攤到下一個 cb 的開頭,next 位于最后
fn.apply(null, [...args.slice(1), next]);
} else {
// 如果執行完了就結束流程
callback.apply(null, args);
}
})();
};
關于 ② 可以結合例子來看:
async.waterfall([
function(callback) {
callback(null, 'one', 'two');
},
function(arg1, arg2, callback) { // ②
// arg1 now equals 'one' and arg2 now equals 'two'
callback(null, 'three');
},
function(arg1, callback) {
// arg1 now equals 'three'
callback(null, 'done');
}
], function (err, result) {
// result now equals 'done'
});
那么到這里一個簡單的 waterfall 的實現思路已經完全展現出來了。最后說一下可能出現的問題,比如用戶多調了一次 cb (這種情況確實可能出現)所以我們需要做一些簡單的預防:
exports.waterfall = function (task = [], callback = noop) {
if (!Array.isArray(task)) {
return callback(new Error('task should be an array!'));
}
(function next(...args) {
if (args[0]) {
return callback(args[0]);
}
if (task.length) {
let fn = task.shift();
fn.apply(null, [...args.slice(1), onlyOnce(next)]); // 保證只被調用一次
} else {
callback.apply(null, args);
}
})();
};
function onlyOnce(cb) {
let flag = false;
return (...args) => {
if (flag) {
return cb(new Error('cb already called'));
}
cb.apply(null, args);
flag = true;
};
}
each
async.each 有點像是異步的 arr.map 操作。我們可以來看一個使用的例子:
'use strict';
const fs = require('fs');
const async = require('async');
const request = require('request');
const sites = ['www.baidu.com','github.com','www.npmjs.com', 'www.zhihu.com'];
// 下站站點圖標
function downloadFavicon(site, next) {
let addr = `https://${site}/favicon.ico`;
let file = `./${site}.ico`;
request.get(addr)
.pipe(fs.createWriteStream(file))
.on('error', (err) => {
console.error(`${url} Download failed: ${err.message}`);
next();
})
.on('finish', next);
}
// 下載每一個站點的圖標
async.each(sites, downloadFavicon, function (err) {
if (err) {
console.log('err', err);
}
console.log('over');
});
那么按照例子,我們可以先來搭一個 async.each 的架子:
exports.each = function (items = [], iterator, callback = noop) {
// 判斷數組類型
if (!Array.isArray(items)) {
return callback(new Error('items should be an array!'));
}
// 判斷迭代器
if (typeof iterator != 'function') {
return callback(new Error('iterator should be a function!'));
}
// TODO
};
然后我們需要做的事情很簡單,只需要將數組的每個一個元素作為參數拿來調用 iterator 函數即可:
exports.each = function (items = [], iterator, callback = noop) {
if (!Array.isArray(items)) {
return callback(new Error('items should be an array!'));
}
if (typeof iterator != 'function') {
return callback(new Error('iterator should be a function!'));
}
function next(err) {
// TODO
}
items.map((item) => iterator(item, next));
};
然后我們要想辦法在所有的異步操作都執行完之后調用 callback 出去
exports.each = function (items = [], iterator, callback = noop) {
if (!Array.isArray(items)) {
return callback(new Error('items should be an array!'));
}
if (typeof iterator != 'function') {
return callback(new Error('iterator should be a function!'));
}
let completed = 0; // 計數
function next(err) {
if (err) { // error first
return callback(err); // 結束流程
}
if (++completed >= items.length) { // 計數判斷
callback(); // 流程結束
}
}
items.map((item) => iterator(item, next));
};
async.each 的實現思路確實如上述例子一樣簡單,當然還可能會有一些復雜的情況需要判斷,更深入的內容各位可以移步 Async 官方的 each 實現 中查看更多。
eachLimit
使用 each 執行操作的時候,在量小的情況下是沒有問題的,但是當異步操作的量特別大的時候,就需要對其進行一定的控制。比如寫一個爬蟲去某種網站上爬圖片,那么將圖片下載到本地的過程中存在一個文件描述符的限制,即同時打開的文件(保存圖片時需要openFile)數目超過一定程度就會收到操作系統的報錯。
以 each 中出現過的例子來說 eachLimit 的功能:
const sites = [ ... ]; // 可能非常多站點
// 對 each 操作做 limit,同時最多下載 100 個站點圖標
async.eachLimit(sites, 100, downloadFavicon, function (err) {
if (err) {
console.log('err', err);
}
console.log('over');
});
了解了上述需求之后,我們來搭一個 eachLimit 的架子:
exports.eachLimit = function (items = [], limit = 1, iterator, callback = noop) {
if (!Array.isArray(items)) {
return callback(new Error('items should be an array!'));
}
if (typeof iterator != 'function') {
return callback(new Error('iterator should be a function!'));
}
// 同時執行的異步操作數目 (不能超過 limit)
let running = 0;
// TODO
};
我首先需要一個循環來將異步操作加入到執行隊列,但是只能加到 limit 的數目為止:
let running = 0;
(function next() {
while (running < limit) { // 一口氣加到隊列滿為止
let item = items.shift();
running++;
iterator(item, (err) => {
running--;
next(); // 每執行完一個異步操作就觸發一下加入隊列的行為
});
}
})();
然后加上結束的操作:
let done = false;
let running = 0;
(function next() {
if (done && running <= 0) {
return callback();
}
while (running < limit) {
let item = items.shift();
running++;
if (item === undefined) {
done = true;
if (running <= 0) {
callback();
}
return;
}
iterator(item, (err) => {
running--;
next();
});
}
})();
最后補上錯誤處理的完整版:
exports.eachLimit = function (items = [], limit = 1, iterator, callback = noop) {
if (!Array.isArray(items)) {
return callback(new Error('items should be an array!'));
}
if (typeof iterator != 'function') {
return callback(new Error('iterator should be a function!'));
}
let done = false;
let running = 0;
let errored = false;
(function next() {
if (done && running <= 0) {
return callback();
}
while (running < limit && !errored) {
let item = items.shift();
running++;
if (item === undefined) {
done = true;
if (running <= 0) {
callback();
}
return;
}
iterator(item, (err) => {
running--;
if (err) {
errored = true;
return callback(err);
}
next();
});
}
})();
};
whilst
最后我們來看一個循環異步 whilst 的實現,也是非常的簡單。我們先看看使用例子:
'use strict';
const async = require('async');
let count = 0;
async.whilst(
function () { return count < 5; },
function (callback) {
console.log('count', count++);
setTimeout(callback, 1000);
},
function (err) {
console.log('over');
}
);
然后因為比較簡單,直接來看代碼吧:
exports.whilst = function (test, iterator, callback = noop) {
if (typeof test != 'function') {
return callback(new Error('iterator should be a function!'));
}
if (typeof iterator != 'function') {
return callback(new Error('iterator should be a function!'));
}
(function next() {
if (test()) {
iterator((err) => {
if (err) {
return callback(err);
}
next();
});
}
})();
};
小結
綜上,本文為 callback 的異步流程封裝控制的思路做了一點微小的整理工作。實現上并沒有完全遵循原版,而是選擇使用 es6 的新特性勁量讓代碼看起來簡(zhuang)潔(bi),整體上是為了展現一個思路可能有不少細節沒有處理,完整的部分參見 async 官方文檔
。
async 的優點可以簡單的說,由于 async 基于原生的 callback 所以相比 promise/co 等方式性較好(目前最快的方式是專門優化了速度的 neo-async )。并且 async 提供了非常多、非常全面的 60+ 種異步操作方式,功能可謂十分強大。
最后簡單提一下 async 的一些缺點:
- 基于 error first 的 約定 。約定的意思就是不是強制的,也就存在不了解這個約定或者使用錯誤方面的問題。
- 流程沒有狀態。
- 由于功能太過強大(如 async.auto)存在可能濫用的問題。
- 錯誤棧曲折排查困難。
來自:https://zhuanlan.zhihu.com/p/27303127