Node.js + Redis Sorted Set 任務隊列
需求:功能 A 需要調用第三方 API 獲取數據,而第三方 API 自身是異步處理方式,在調用后會返回數據與狀態 { data: "查詢結果", "status": "正在異步處理中" } ,這樣就需要間隔一段時間后再去調用第三方 API 獲取數據。為了用戶在使用功能 A 時不會因為第三方 API 正在異步處理中而必須等待,將用戶請求加入任務隊列中,返回部分數據并關閉請求。然后定時從任務隊列里中取出任務調用第三方 API,若返回狀態為”異步處理中“,將該任務再次加入任務隊列,若返回狀態為”已處理完畢“,將返回數據入庫。
根據以上問題,想到使用 Node.js + Redis sorted set 來實現任務隊列。Node.js 實現自身應用 API 用來接受用戶請求,合并數據庫已存數據與 API 返回的部分數據返回給用戶,并將任務加入到任務隊列中。利用 Node.js child process 與 cron 定時從任務隊列中取出任務執行。
在設計任務隊列的過程中需要考慮到的幾個問題
-
并行執行多個任務
-
任務唯一性
-
任務成功或失敗后的處理
針對以上問題的解決方案
-
并行執行多個任務利用 Promise.all 來實現
-
任務唯一性利用 Redis sorted set 來實現。使用時間戳作為分值可以實現將 sorted set 作為 list 來使用,在加入任務時判斷任務是否已經存在,在取出任務執行時將該任務分值設置為 0,每次取出分值大于 0 的任務來執行,可以避免重復執行任務。
-
執行任務成功后刪除任務,執行任務失敗后將任務分值更新為當前時間時間戳,這樣就可以將失敗的任務重新加入任務隊列尾部
示例代碼
// remote_api.js 模擬第三方 API
'use strict';
const app = require('express')();
app.get('/', (req, res) => {
setTimeout(() => {
let arr = [200, 300]; // 200 代表成功,300 代表失敗需要重新請求
res.status(200).send({ 'status': arr[parseInt(Math.random() * 2)] });
}, 3000);
});
app.listen('9001', () => {
console.log('API 服務監聽端口:9001');
});
// producer.js 自身應用 API,用來接受用戶請求并將任務加入任務隊列
'use strict';
const app = require('express')();
const redisClient = require('redis').createClient();
const QUEUE_NAME = 'queue:example';
function addTaskToQueue(taskName, callback) {
// 先判斷任務是否已經存在,存在:跳過,不存在:加入任務隊列
redisClient.zscore(QUEUE_NAME, taskName, (error, task) => {
if (error) {
console.log(error);
} else {
if (task) {
console.log('任務已存在,不新增相同任務');
callback(null, task);
} else {
redisClient.zadd(QUEUE_NAME, new Date().getTime(), taskName, (error, result) => {
if (error) {
callback(error);
} else {
callback(null, result);
}
});
}
}
});
}
app.get('/', (req, res) => {
let taskName = req.query['task-name'];
addTaskToQueue(taskName, (error, result) => {
if (error) {
console.log(error);
} else {
res.status(200).send('正在查詢中......');
}
});
});
app.listen(9002, () => {
console.log('生產者服務監聽端口:9002');
});
// consumer.js 定時獲取任務并執行
'use strict';
const redisClient = require('redis').createClient();
const request = require('request');
const schedule = require('node-schedule');
const QUEUE_NAME = 'queue:expmple';
const PARALLEL_TASK_NUMBER = 2; // 并行執行任務數量
function getTasksFromQueue(callback) {
// 獲取多個任務
redisClient.zrangebyscore([QUEUE_NAME, 1, new Date().getTime(), 'LIMIT', 0, PARALLEL_TASK_NUMBER], (error, tasks) => {
if (error) {
callback(error);
} else {
// 將任務分值設置為 0,表示正在處理
if (tasks.length > 0) {
let tmp = [];
tasks.forEach((task) => {
tmp.push(0);
tmp.push(task);
});
redisClient.zadd([QUEUE_NAME].concat(tmp), (error, result) => {
if (error) {
callback(error);
} else {
callback(null, tasks)
}
});
}
}
});
}
function addFailedTaskToQueue(taskName, callback) {
redisClient.zadd(QUEUE_NAME, new Date().getTime(), taskName, (error, result) => {
if (error) {
callback(error);
} else {
callback(null, result);
}
});
}
function removeSucceedTaskFromQueue(taskName, callback) {
redisClient.zrem(QUEUE_NAME, taskName, (error, result) => {
if (error) {
callback(error);
} else {
callback(null, result);
}
})
}
function execTask(taskName) {
return new Promise((resolve, reject) => {
let requestOptions = {
'url': 'http://127.0.0.1:9001',
'method': 'GET',
'timeout': 5000
};
request(requestOptions, (error, response, body) => {
if (error) {
resolve('failed');
console.log(error);
addFailedTaskToQueue(taskName, (error) => {
if (error) {
console.log(error);
} else {
}
});
} else {
try {
body = typeof body !== 'object' ? JSON.parse(body) : body;
} catch (error) {
resolve('failed');
console.log(error);
addFailedTaskToQueue(taskName, (error, result) => {
if (error) {
console.log(error);
} else {
}
});
return;
}
if (body.status !== 200) {
resolve('failed');
addFailedTaskToQueue(taskName, (error, result) => {
if (error) {
console.log(error);
} else {
}
});
} else {
resolve('succeed');
removeSucceedTaskFromQueue(taskName, (error, result) => {
if (error) {
console.log(error);
} else {
}
});
}
}
});
});
}
// 定時,每隔 5 秒獲取新的任務來執行
let job = schedule.scheduleJob('*/5 * * * * *', () => {
console.log('獲取新任務');
getTasksFromQueue((error, tasks) => {
if (error) {
console.log(error);
} else {
if (tasks.length > 0) {
console.log(tasks);
Promise.all(tasks.map(execTask))
.then((results) => {
console.log(results);
})
.catch((error) => {
console.log(error);
});
}
}
});
});
來自:https://segmentfault.com/a/1190000006933269
本文由用戶 li8548668 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!