一個基于redis和disque實現的輕量級異步任務執行器

jopen 9年前發布 | 27K 次閱讀 Redis

簡介

horae 是一個基于 redisdisque 實現的 輕量級高性能 的異步任務執行器,它的核心是 disque 提供的任務隊列,而隊列有 先進先出 的時序關系,顧得名: horae

一個基于redis和disque實現的輕量級異步任務執行器

horae : 時序女神,希臘神話中司掌季節時間和人間秩序的三女神,又譯“荷萊”。

horae的關注點不是隊列服務的實現本身(已經有不少隊列服務的實現了),而是希望借助于 redisdisque 提供的純內存的高性能的隊列機制,實現一個異步任務執行器。它可以自由配置任務來自哪種隊列服務,它不關注任務執行的最終狀態(它寫向哪里)或與哪個系統交互,它給你提供一個執行器以及簡單地編寫任務執行邏輯的方式。

取決于需求,這個執行器在要求不高的時候,只需要一個單節點的redis服務器,即可運轉。

如果你愿意犧牲一點性能,來換取更高的隊列可靠性保障(這種情況我強烈推薦你使用AMQP協議以及它的開源隊列實現: RabbitMQ )。如果你想這樣,那么這個執行器也是可用的,只是你需要自己去實現跟RabbitMQ交互的細節。你可以用它連接各種其他隊列來消費消息并執行任務,它具有充分的擴展性與自由度。但我仍然推薦你使用 disque

適用場景

搶購/秒殺

搶購業務是典型的短時高并發場景,傳統行業里的類似于 學生選課 也可以歸結這類場景。

社交關系處理

純內存計算/計數器的場景,比如把社交系統里的好友、關系搬到內存中處理。

耗時的web請求

常見的耗時web請求,比如 生成PDF網頁抓取數據備份郵件/短信發送 等。

分布式系統前端緩沖隊列

將它置于應用服務器之后,核心服務之前,作為請求的緩沖隊列使用。

概括起來就是 服務器峰值扛壓異步處理純內存計算 ,當然你把它用成普通隊列也是可以的。

高性能

目前支持 disqueredis 這兩種隊列服務(主推 disqueredis 的隊列暫時以 list 數據接口的 lpush & brpop 實現,但它不是高可靠的,并且沒有ack機制)。這兩種純內存的隊列首先保證了消費任務的性能。具體任務的執行性能,取決于使用場景,這里分析兩種場景:

純內存&單線程&無鎖

如果任務處理器消費的消息是完全存儲于內存中的,那么需要盡量將同構的各任務訪問的數據進行隔離(隔離的手段是對key劃分命名空間),如果實在沒辦法隔離,可以使用單隊列單線程無鎖的處理方式。

通用&多線程&多隊列

如果是通用的應用場景,比如訪問數據庫,因為數據庫有成熟的數據一致性保證。所以,你可以將任務劃分到多個不同的隊列,并利用多個線程來并發執行以加快任務的處理效率。

當然最推薦的使用方式是:用 redis 作為配置、協調、管控中心,用 disque 做隊列服務,任務需要訪問的數據盡可能存儲于 redis 中。

高可用

一主多從

執行器在運行時實行的是:Master單節點運行,多個Slave做Standby的機制來保證服務的可用性。事實上,從Master下線到其中 一個Slave成功競選為Master需要數個心跳周期的時間。因為執行器作為隊列的消費者跟隊列是完全解耦的,所以短暫的暫停消費對整個系統的可用性不 會產生太大影響。

心跳機制

Master跟Slave之間通過 redis-Pubsub 來維持心跳。目前的設計是Master單向 publish 心跳,Slave subscribe Master的心跳。這么設計的原因是簡單,并且考慮到每個Slave都是無狀態的執行器,并不會涉及到狀態的維護與同步問題,所以Master不需要關心Slave的存活。

競爭Master

一旦Master下線(比如因為故障宕機),需要快速得從多個Slave中選舉出一個新的Master,選舉的算法非常多,并且非常復雜。

通常選舉Master的方式會由一個獨立的承擔 Manager 角色的節點來完成,如果不存在這樣一個節點那么通常會基于分布式選舉算法來實現( Zookeeper 比較擅長這個)。這里簡單得采用類似于競爭分布式鎖的實現方式來搶占Master。

如何判斷Master是否下線?這是一個非常關鍵的問題,因為如果產生誤判,將會給整體系統服務造成一段空檔期,這是一個不小的時間開銷。采用的判斷方式是 雙重檢測

  • Slave訂閱Master的heartbeat channel,判斷心跳是否超時
  • Slave去Master的數據結構中去獲取Master自己刷新的心跳時間戳,并跟當前時間對比,判斷是否超時

具體的實現方式:每個服務都會有一個heartbeat線程,Master的heartbeat線程做兩件事情:

  • refresh自己的心跳時間戳
  • publish自己的心跳到 heartbeat channel

Slave的heartbeat線程做上面的 雙重檢測 ,Slave會等待幾個心跳周期,如果在這段時間內,兩種檢測都認為Master失去心跳,則判斷Master下線。

Master下線后,就涉及到多個Slave競爭Master的問題,這里我們在競爭鎖的時候沒有采用阻塞等待的方式,而是采用了一種危險性相對小的方式: tryLock :

private boolean tryLockMaster() {
    long currentTime = RedisConfigUtil.redisTime(configContext);
    String val = String.valueOf(currentTime + Constants.DEFAULT_MASTER_LOCK_TIMEOUT + 1);
    Jedis jedis = null;
    try {
        jedis = RedisConfigUtil.getJedis(configContext).get();
        boolean locked = jedis.setnx(Constants.KEY_LOCK_FOR_MASTER, val) == 1;
        if (locked) {
            jedis.expire(Constants.KEY_LOCK_FOR_MASTER,
                         Constants.DEFAULT_MASTER_LOCK_TIMEOUT);

            return true;
        }
    } finally {
        if (jedis != null) RedisConfigUtil.returnResource(jedis);
    }

    return false;
}


只有判斷Master下線之后,才會調用 tryLockMaster ,它僅僅是嘗試獲得鎖,如果獲取成功,將給鎖設置一個很短的過期時間,這里跟跟心跳過期時間相同。如果獲取失敗將繼續檢測心跳。獲取鎖的Slave會立即變為Master并迅速刷新自己的心跳,這樣,其他Slave檢測Master下線就會失敗,將不會再去調用 tryLockMaster 。避免了通常情況下,一直阻塞、競爭鎖這一條路。

擴展性

擴展功能

得益于Redis的 PubSub ,我們可以實現很多類似于 指令下發->執行 的feature,比如實時獲取任務的執行進度、讓各服務器匯報自己的狀態等。因為時間關系,目前這塊只是留了一個擴展口:

  • 上行頻道:執行器有一個 upstream channel,用于上傳各節點的本機信息。
  • 下行頻道:系統有一個 downstream channel,用于被動接受來自上游的信息/指令。

這里上下游的語義是:所有服務節點均為下游, redis 配置中心應該算是中心節點,在上游你可以定制一個管控臺,用于管理 redis 配置中心并向下游的服務節點下發指令。

擴展隊列服務

如果你想擴展它,希望它支持另一種隊列服務(為了方便表述,這里假設你想支持RabbitMQ)。那么你需要做以下幾步:

  • 在package: com.github.horae.exchanger 包下新建類: RabbitConnManager 用于管理client 到 RabbitMQ的連接
  • 同樣在package: com.github.horae.exchanger 包下新建類: RabbitExchanger 用于實現消息的出隊與入隊邏輯,該類需實現 TaskExchanger
  • TaskExchangerManagercreateTaskExchanger 方法內加入新的分支判斷。
  • partition.properties 下可以配置新的partition,在matrix中指定RabbitMQ

需要 注意 的是: TaskExchangerdequeue 接口方法,默認的行為是 block 形式的。如果你擴展的隊列不支持block形式的消費,那可能需要你自己實現,實現的方式可以借助于 java.util.concurrent.BlockingQueue

多種可靠性級別

隊列的可靠性牽扯到整個分布式系統的可靠性,這是一個無法回避的問題。如果你說用 redis 實現的隊列,是否能做到既保持 高性能 又能兼具 高可靠 ,答案是 不能 。或者說它不是一個專業的隊列服務(不然redis的作者也沒有必要再另起 disque 項目了)。如果從可靠性的角度而言,我給幾個主流的隊列服務器(或者可以提供隊列服務)的排名是: RabbitMQ > Kafka > Disque > Redis 。雖然這個執行器內置支持了 disqueredis 作為隊列的實現,但它跟你選擇的隊列服務沒有非常緊的耦合關系,你可以選擇其他隊列服務,通常你只需要實現這么幾個功能 入隊消息出隊消息ack消息管理連接

分區

對我而言 分區 的概念來自于Kafka,但這里的分區跟Kafka性質不太一樣。首先我們來看為什么有這樣的需求?

作為一個無狀態的服務,它可以長時間運行(某種程度上,這有點像Storm)而不必下線。為了充分榨取CPU的價值。我們可能希望在一次服務的生命周期內 讓它運行多個異構服務(所謂異構任務,就是不同性質的任務)。因此我們有必要將多個異構任務區分開來,而這個手段就是 分區 。說它不同于kafka的原因是:它更多是一種邏輯上的劃分,而不是kafka物理上按分區存儲消息。我們來看一個分區隔離了哪些東西:

partition.root=p0,p1
p0.matrix=redis
p0.host=127.0.0.1
p0.port=6379
p0.class=com.github.horae.task.RedisTask
p1.matrix=disque
p1.host=127.0.0.1
p1.port=7711
p1.class=com.github.horae.task.DisqueTask


  • matrix : 哪種隊列實現服務,目前支持 disque / redis
  • host : 隊列服務器的host
  • port : 隊列服務器的port
  • class : 處理隊列任務的實現類的完全限定名

從上面的隔離方式來看,這里的分區也能做到對任務隊列的物理隔離。上面配置了兩個分區,兩個分區分別對應了兩種隊列服務。分區跟隊列服務的對應關系沒有限制,甚至多個分區對應一個隊列服務器也可行,因為還有一個分區到隊列名稱的映射關系:

如下圖:

一個基于redis和disque實現的輕量級異步任務執行器

綜述:分區隔離了異構任務的隊列,而隊列存儲于何種隊列服務、存儲于何處、以及任務的處理邏輯完全取決于配置。

上面的解析明確了分區跟任務處理類的對應關系。為了便于管理,一個分區也有其獨立的線程池來將異構任務的線程隔離開來。

編寫任務處理器

在你編寫一個任務處理器之前,你應該意識到你編寫的任務處理器充當的是 隊列的消費者 。接下來你需要了解的是,你編寫的任務處理器將在一個線程池中運行,而線程池的管理,需要你關心,但你需要知道: 一個任務隊列將會對應一個線程 。你需要知道的就是這么多,下面來編寫一個任務處理器:

  • 首先你需要創建一個新的maven工程
  • 在horae發布包的庫目錄下( ./horae/libs )找到以 horae 開頭的jar文件,加入到你的maven依賴中,只是一個本地依賴:
    <dependency>
        <groupId>com.github.horae</groupId>
        <artifactId>horae</artifactId>
        <version>0.1.0</version>
        <scope>system</scope>
        <systemPath>/usr/local/horae/libs/horae-0.1.0.jar</systemPath>
    </dependency>

  • 你需要新建一個類,繼承 TaskTemplate ,并實現 run 方法,下面是一個模板:
    public void run() {
        try {
            signal.await();
    
            //implement task process business logic
        } catch (InterruptedException e) {
    
        }
    }

  • 編寫構造方法:
    public RedisTestTask(CountDownLatch signal, String queueName, Map<String, Object> configContext,
                         Partition partition, TaskExchanger taskExchanger) {
        super(signal, queueName, configContext, partition, taskExchanger);
    }

在run方法的第一句,你需要調用一個 CountDownLatch 實例的 await 方法來將其阻塞住。解釋一下,為什么需要這么做?

其實,每個服務在啟動的時候,都會立即讀取redis內配置的隊列,并初始化線程池,進入執行就緒狀態。這一步,所有的服務,無論是Master,Slave都是一樣的。但區別就區別在這句:

signal.await(); 

當啟動的是master節點,那么該signal會立即釋放信號(通過 signal.countDown() ),所有任務處理器都立即開始執行。

而啟動的是slave節點,則將會一直在上面這句代碼這里阻塞,直到master下線,而該節點競爭到master之后,會立即釋放解除阻塞信號,后續代碼會立即執行。

因此這么做可以使得在master下線之后,所有Slave都以最快的速度進入任務執行狀態,雖然對一些Slave節點而言,這有些浪費系統資源。

  • 編譯工程并打包jar,注意不用包含上面的maven依賴,它已經存在于 horae 可執行文件類庫中。

  • 將生成的jar放置于 ./horae/libs/ 下,它將會被自動添加到 classpath

  • 編輯配置文件 ./horae/conf/partition.properties ,新建/修改一個分區的 p{x}.class ,值為你剛剛編寫的任務實現類的 完全限定名

安裝部署

以下安裝步驟在Mac OS X系統驗證通過(Linux系類似,但存在一些不同)。Mac用戶需要預裝 Homebrew

  • 安裝jsvc
brew install jsvc 
  • 安裝redis
brew install redis 
  • 安裝disque

因為disque目前還沒有一個穩定的版本,所以暫時被homebrew暫存在 head-only 倉庫中,安裝命令略有不同:

brew install --HEAD homebrew/head-only/disque 
  • horae 源碼編譯、打包
mvn assembly:assembly 
  • 拷貝打包文件到目標文件夾,并解壓縮
cd ${project_baseDir}/target/horae.zip /usr/local unzip /usr/local/horae.zip 
  • 配置可執行文件,主要是命令與路徑
sudo vi /usr/local/horae/bin/horae.sh 
  • 配置conf下的配置文件
sudo vi /usr/local/horae/conf/${service/redisConf/partition}.properties 
  • 執行命令
sudo sh /usr/local/horae/bin/horae.sh ${start/stop/restart} 

注意事項

  • conf下的service.properties中的配置項 master 在所有節點中只能有一個被設置為true。如果它下線,將不能以master的身份再次啟動。
  • 因為jsvc需要寫進程號(pid),所以盡量以系統管理員身份執行,將horae.sh里的 user 配置為 root ,并以 sudo 執行

關于disque

目前disque仍處于alpha版本,命令也還在調整中。雖然已被支持,但無論是disque的server以及其java client: jedisque 都存在bug,因此暫時 不推薦 使用,請至少等到發布stable版本再使用。

自實現的 jedisque 連接池。目前jedisque的客戶端還沒有提供連接池機制,它跟redis的主流java client: jedis 出自同一個開發者手筆。考慮到 jedis 內部使用的是 apache commons-pool 實現連接池機制,在實現 jedisque 的時候也使用的是同樣的方案,等 jedisque 官方提供連接池之后,會采用官方連接池。

disque 的開發過程中,對命令和命令參數可能會進行調整, horae 也會對此進行跟進。雖然, disque 的stable版本還未發布,但redis作者的水準和口碑有目共睹,所以你有理由相信它能給你帶來驚喜。

本項目的開源地址: https://github.com/yanghua/horae

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!