Node.js 中使用 Redis 來實現定時任務

pdce 9年前發布 | 32K 次閱讀 Node.js Node.js 開發

本文所說的定時任務或者說計劃任務并不是很多人想象中的那樣,比如說每天凌晨三點自動運行起來跑一個腳本。這種都已經爛大街了,隨便一個 Crontab 就能搞定了。

這里所說的定時任務可以說是計時器任務,比如說用戶觸發了某個動作,那么從這個點開始過二十四小時我們要對這個動作做點什么。那么如果有 1000 個用戶觸發了這個動作,就會有 1000 個定時任務。于是這就不是 Cron 范疇里面的內容了。

舉個最簡單的例子,一個用戶推薦了另一個用戶,我們定一個二十四小時之后的任務,看看被推薦的用戶有沒有來注冊,如果沒注冊就給他搞一條短信過去。Σ>―(〃°ω°〃)?→

最初的設想

一開始我是想把這個計時器做在內存里面直接調用的。

考慮到 Node.js 的定時并不是那么準確(無論是setTimeout還是setInterval),所以本來打算自己維護這個定時器隊列。

又考慮到 Node.js 原生對象比較耗內存。之前我用JSON對象存了一本字典,約十二萬多的詞條,原文件大概也就五六兆,用 Node.js 的原生對象一存居然有五六百兆的內存占用——所以打算這個定時器隊列用 C++ 來寫 addon。

考慮到任何時候插入的任務都有可能在已有的任務之前或者之后,所以本來想用 C++ 來寫一個 小根堆 。每次用戶來一個任務的時候就將這個任務插入到堆中。

如果按照上述方法的話,再加上對時間要求掐得也不是那么緊,于是就是一個不斷的process.nextTick()的過程。

在process.nextTick()當中執行這么一個函數:

  1. 從小根堆中不斷獲取堆頂的任務并處理,一直處理到堆頂任務的執行時間大于當前時間為止。
  2. 繼續process.nextTick()來讓下一個 tick 執行步驟 1 中的流程。

所以最后就是一邊往小根堆插入任務,另一邊通過不斷process.nextTick()消費任務的這么一個過程。

最后,為了考慮到程序重啟的時候內存數據會丟失,還應該做一個持久化的事情——在每次插入任務的時候順便往持久化中間件中插一條副本,比如 MySQL、MongoDB、Redis、Riak 等等任何三方依賴。消費任務的時候順便把中間件中的這條任務數據給刪除。

也就是說中間件中永遠存的就是當前尚未完成的任務。每當程序重啟的時候都先從中間件中把所有任務讀取進來重建一下堆,然后就能繼續工作了。

如果當時我沒有發現 Redis 的這個妙用的話,上述的流程將會是我實現我們定時任務的流程了。

Redis 妙用

在 Redis 的 2.8.0 版本之后,其推出了一個新的特性——鍵空間消息( Redis Keyspace Notifications ),它配合 2.0.0 版本之后的SUBSCRIBE就能完成這個定時任務的操作了, 不過定時的單位是秒

Publish / Subscribe

Redis 在 2.0.0 之后推出了 Pub / Sub 的指令,大致就是說一邊給 Redis 的特定頻道發送消息,另一邊從 Redis 的特定頻道取值——形成了一個簡易的消息隊列

比如我們可以往foo頻道推一個消息bar,那么就可以直接:

PUBLISH foo bar

另一邊我們在客戶端訂閱foo頻道就能接受到這個消息了。

舉個例子,如果在 Node.js 里面使用 ioredis 這個包那么看起來就會像這樣:

javascript
var Redis = require("ioredis");
var sub = new Redis(/** 連接信息 */);
sub.once("connect", function() {
  // 假設我們需要選擇 redis 的 db,因為實際上我們不會去污染默認的 db 0
  sub.select(DB_NUMBER, function(err) {
    if(err) process.exit(4);
    sub.subscribe("foo", function() {
      //... 訂閱頻道成功
    });
  });
});
// 監聽從 `foo` 來的消息
sub.on("message", function(channel, msg) {
  console.log(channel, msg);
});

Redis Keyspace Notifications

在 Redis 里面有一些事件,比如鍵到期、鍵被刪除等。然后我們可以通過配置一些東西來讓 Redis 一旦觸發這些事件的時候就往特定的 Channel 推一條消息。

本文所涉及到的需求的話我們所需要關心的事件是EXPIRE即過期事件。

大致的流程就是我們給 Redis 的某一個 db 設置過期事件,使其鍵一旦過期就會往特定頻道推消息,我在自己的客戶端這邊就一直消費這個頻道就好了。

以后一來一條定時任務,我們就把這個任務狀態壓縮成一個鍵,并且過期時間為距這個任務執行的時間差。那么當鍵一旦到期,就到了任務該執行的時間,Redis 自然會把過期消息推去,我們的客戶端就能接收到了。這樣一來就起到了定時任務的作用。

消息類型

當達到一定條件后,有兩種類型的這種消息會被觸發,用哪個需要自己選了。舉個例子,我們刪除了在 db 0 中一個叫foo的鍵,那么系統會往兩個頻道推消息,一個是del事件頻道推foo消息,另一個是foo頻道推del消息,它們小倆口被系統推送的指令分別等價于:

PUBLISH __keyspace@0__:foo del PUBLISH __keyevent@0__:del foo

其中往foo推送del的頻道名為__keyspace@0__:foo,即是"__keyspace@" + DB_NUMBER + "__:" + KEY_NAME;而del的頻道名為"__keyevent@" + DB_NUMBER + "__:" + EVENT_NAME。

配置

即使你的 Redis 版本達標了,但是 Redis 默認是關閉這個功能的,你需要修改配置文件來打開它,或者直接在 CLI 里面通過指令修改。這里就說說配置文件的修改吧。

如果不想看我在這里羅里吧嗦的,也可以直接去看 Redis 的 相關文檔

首先打開 Redis 的配置文件,在不同的系統和安裝方式下文件位置可能不同,比如通過brew安裝的 MacOS 下可能是在/usr/local/etc/redis.conf下面,通過apt-get安裝的 Ubuntu 下可能是在/etc/redis/redis.conf下,總之找到配置文件。 或者自己寫一個配置文件,啟動的時候指定配置文件地址就好。

然后找到一項叫notify-keyspace-events的地方,如果找不到則自行添加,其值可以是Ex、Klg等等。這些字母的具體含義如下所示:

  • K ,表示keyspace事件,有這個字母表示會往__keyspace@<db>__頻道推消息。
  • E ,表示keyevent事件,有這個字母表示會往__keyevent@<db>__頻道推消息。
  • g ,表示一些通用指令事件支持,如DEL、EXPIRE、RENAME等等。
  • $ ,表示字符串(String)相關指令的事件支持。
  • l ,表示列表(List)相關指令事件支持。
  • s ,表示集合(Set)相關指令事件支持。
  • h ,哈希(Hash)相關指令事件支持。
  • z ,有序集(Sorted Set)相關指令事件支持。
  • x ,過期事件,與 g 中的EXPIRE不同的是, g 的EXPIRE是指執行EXPIRE key ttl這條指令的時候順便觸發的事件,而這里是指那個key剛好過期的這個時間點觸發的事件。
  • e ,驅逐事件,一個key由于內存上限而被驅逐的時候會觸發的事件。
  • A ,g$lshzxe的別名。也就是說AKE的意思就代表了所有的事件。

結合上述列表我們就能拼湊出自己所需要的事件支持字符串了,在我的需求中我只需要Ex就可以滿足了,所以配置項就是這樣的:

notify-keyspace-events Ex

然后保存配置文件,啟動 Redis 就啟用了過期事件的支持了。

實踐

我們先說任務的創造者吧。由于這里 Redis 的事件只會傳鍵名,并不會傳鍵值,而過期事件觸發的時候那個鍵已經沒了,你也無法獲取鍵值,加上我的主系統和任務系統是分布式的,所以就把所有需要的信息往鍵名塞。

一個最簡單的鍵名設計就是任務類型 + ":" + JSON.stringify 化后的參數數組;更有甚者可以直接把任務類型替換成所需的函數路徑,比如需要執行這個任務的函數在task/foo/bar文件下面的baz函數,參數arguments數組為[ 1, 2 ],那么鍵名的設計可以是task/foo/bar.baz:[1,2],反正我們只需要觸發這個鍵,用不著去查詢這個鍵。等到真正過期了任務系統接收到這個鍵名的時候再一一解析,得到需要執行task/foo/bar.baz這個消息,并且網函數里面傳入[1,2]這個arguments。

所以當接收到一個定時任務的時候,我們得到消息、函數名、過期時間參數,這個函數可以如下設計:

/** 我們假設 redis 是一個 ioredis 的對象 */
var sampleTaskMaker = function(message, func, timeout) {
  message = JSON.stringify(message);
  console.log("Received a new task:", func, message, "after " + timeout + ".");
  // 這里的 uuid 是 npm 一個包
  // 生成一個唯一 uuid 的目的是為了防止兩個任務用了相同的函數和參數,那么
  // 鍵名可能會重復并覆蓋的情況
  // uuid 的文檔為 https://www.npmjs.com/package/node-uuid
  //
  // 這里的 ?? 是一個分隔符,冒號是分割 uuid 和后面內容的,而 ?? 是分割函數名
  // 和消息的
  var key = uuid.v1().replace(/-/g, "") +
    ":??" + func + "??" + message;
  var content = "";
  redis.multi()
    .set(key, content)
    .expire(key, timeout)
    .exec(function(err) {
      if(err) {
        console.error("Failed to publish EXPIRE EVENT for " + content);
        console.error(err);
        return;
      }
    });
};

Ioredis 的穩定可以 點此 查看。

然后在任務系統里面的一開始監聽這個過期頻道:

// assign 是 sugarjs 里面的函數
// 把 db 塞到字符串里面的 {db} 里去
var subscribeKey = "__keyevent@{db}__:expired".assign({ db: 1 });
// 假設 sub 是 ioredis 的對象
sub.once("connect", function() {
  // 假設我們需要選擇 redis 的 db,因為實際上我們不會去污染默認的 db 0
  sub.select(1, function(err) {
    if(err) process.exit(4);
    sub.subscribe("foo", function() {
      //... 訂閱頻道成功
    });
  });
});
// 監聽從 `foo` 來的消息
sub.on("message", sampleOnExpired);

注意:我們這里選擇 db 1 是因為一旦開啟過期事件監聽,那么這個 db 的所有過期事件都會被發送。為了不跟正常使用的 redis 過期鍵混淆,我們為這個事情專門用一個新的 db。比如我們在自己正常使用的 db 0 里面監聽了,那么不是我們任務觸發的過期事件也會傳過來,這個時候我們解析的鍵名就不對了。

最后就是我們的sampleOnExpired函數了。

var sampleOnExpired = function(channel, key) {
  // UUID:??func??params
  var body = key.split("??");
  if(body.length < 3) return;
  // 取出 body 第一位為 func
  var func = body[1];
  // 推出前兩位,后面剩下的有可能是參數里面自帶 ?? 而被分割,所以要拼回去
  body.shift(); body.shift();
  var params = body.join("??");
  // 然后把 params 傳入 func 去執行
  // func:
  //   path1/path2.func
  func = func.split(".");
  if(func.length !== 2) {
    console.error("Bad params for task:", func.join("."), "-", params);
    return;
  }
  var path = func[0];
  func = func[1];
  var mod;
  try {
    mod = require("./tasks/" + path);
  } catch(e) {
    console.error("Failed to load module", path);
    console.error(e.stack);
    return;
  }
  process.nextTick(function() {
    try {
      mod[func].apply(null, JSON.parse(params));
    } catch(e) {
      console.error("Failed to call function", path, "-", func, "-", params);
      console.error(e.stack);
    }
  });
};

這個簡易的架子搭好后,你只需要去寫一堆任務執行函數,然后在生成任務的時候把相應參數傳給sampleTaskMaker就好了。Redis 會自動過期并且觸發事件給你的sampleOnExpired函數,然后就會去執行相應的任務處理函數了。

小結

其實這個需求在我們項目目前就是給用戶定時發提醒短信用的。如果沒有發現 Redis 的這個妙用,我還是會去用 第二節 里面的方法來寫的。其實這期間也有考慮過用 RabbitMQ,不過貌似它的定時消息需要做一些 Hack,比較麻煩,最后就放棄了。

Redis 的這個方法其實是我在谷歌搜出來的,別人在 StackOverflow 回答的答案。我參考了之后用我自己的方法實現了出來,并且把代碼的關鍵部分提取出來整理成這篇小文,還希望能給各位看官一些用吧,望打賞。

如果沒有什么用也憋噴我,畢竟我是個蒟蒻。有更好的方法希望留個言,望告知。謝謝。(′,,?ω?,,)?

 本文由用戶 pdce 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!