一個基于redis和disque實現的輕量級異步任務執行器
簡介
horae 是一個基于 redis
和 disque
實現的 輕量級
、 高性能
的異步任務執行器,它的核心是 disque
提供的任務隊列,而隊列有 先進先出
的時序關系,顧得名: horae
。
horae : 時序女神,希臘神話中司掌季節時間和人間秩序的三女神,又譯“荷萊”。
horae的關注點不是隊列服務的實現本身(已經有不少隊列服務的實現了),而是希望借助于 redis
與 disque
提供的純內存的高性能的隊列機制,實現一個異步任務執行器。它可以自由配置任務來自哪種隊列服務,它不關注任務執行的最終狀態(它寫向哪里)或與哪個系統交互,它給你提供一個執行器以及簡單地編寫任務執行邏輯的方式。
取決于需求,這個執行器在要求不高的時候,只需要一個單節點的redis服務器,即可運轉。
如果你愿意犧牲一點性能,來換取更高的隊列可靠性保障(這種情況我強烈推薦你使用AMQP協議以及它的開源隊列實現: RabbitMQ
)。如果你想這樣,那么這個執行器也是可用的,只是你需要自己去實現跟RabbitMQ交互的細節。你可以用它連接各種其他隊列來消費消息并執行任務,它具有充分的擴展性與自由度。但我仍然推薦你使用 disque
。
適用場景
搶購/秒殺
搶購業務是典型的短時高并發場景,傳統行業里的類似于 學生選課
也可以歸結這類場景。
社交關系處理
純內存計算/計數器的場景,比如把社交系統里的好友、關系搬到內存中處理。
耗時的web請求
常見的耗時web請求,比如 生成PDF
、 網頁抓取
、 數據備份
、 郵件/短信發送
等。
分布式系統前端緩沖隊列
將它置于應用服務器之后,核心服務之前,作為請求的緩沖隊列使用。
概括起來就是 服務器峰值扛壓
、 異步處理
、 純內存計算
,當然你把它用成普通隊列也是可以的。
高性能
目前支持 disque
跟 redis
這兩種隊列服務(主推 disque
, redis
的隊列暫時以 list
數據接口的 lpush
& brpop
實現,但它不是高可靠的,并且沒有ack機制)。這兩種純內存的隊列首先保證了消費任務的性能。具體任務的執行性能,取決于使用場景,這里分析兩種場景:
純內存&單線程&無鎖
如果任務處理器消費的消息是完全存儲于內存中的,那么需要盡量將同構的各任務訪問的數據進行隔離(隔離的手段是對key劃分命名空間),如果實在沒辦法隔離,可以使用單隊列單線程無鎖的處理方式。
通用&多線程&多隊列
如果是通用的應用場景,比如訪問數據庫,因為數據庫有成熟的數據一致性保證。所以,你可以將任務劃分到多個不同的隊列,并利用多個線程來并發執行以加快任務的處理效率。
當然最推薦的使用方式是:用 redis
作為配置、協調、管控中心,用 disque
做隊列服務,任務需要訪問的數據盡可能存儲于 redis
中。
高可用
一主多從
執行器在運行時實行的是:Master單節點運行,多個Slave做Standby的機制來保證服務的可用性。事實上,從Master下線到其中 一個Slave成功競選為Master需要數個心跳周期的時間。因為執行器作為隊列的消費者跟隊列是完全解耦的,所以短暫的暫停消費對整個系統的可用性不 會產生太大影響。
心跳機制
Master跟Slave之間通過 redis-Pubsub
來維持心跳。目前的設計是Master單向 publish
心跳,Slave subscribe
Master的心跳。這么設計的原因是簡單,并且考慮到每個Slave都是無狀態的執行器,并不會涉及到狀態的維護與同步問題,所以Master不需要關心Slave的存活。
競爭Master
一旦Master下線(比如因為故障宕機),需要快速得從多個Slave中選舉出一個新的Master,選舉的算法非常多,并且非常復雜。
通常選舉Master的方式會由一個獨立的承擔 Manager
角色的節點來完成,如果不存在這樣一個節點那么通常會基于分布式選舉算法來實現( Zookeeper
比較擅長這個)。這里簡單得采用類似于競爭分布式鎖的實現方式來搶占Master。
如何判斷Master是否下線?這是一個非常關鍵的問題,因為如果產生誤判,將會給整體系統服務造成一段空檔期,這是一個不小的時間開銷。采用的判斷方式是 雙重檢測 :
- Slave訂閱Master的heartbeat channel,判斷心跳是否超時
- Slave去Master的數據結構中去獲取Master自己刷新的心跳時間戳,并跟當前時間對比,判斷是否超時
具體的實現方式:每個服務都會有一個heartbeat線程,Master的heartbeat線程做兩件事情:
- refresh自己的心跳時間戳
- publish自己的心跳到
heartbeat
channel
Slave的heartbeat線程做上面的 雙重檢測 ,Slave會等待幾個心跳周期,如果在這段時間內,兩種檢測都認為Master失去心跳,則判斷Master下線。
Master下線后,就涉及到多個Slave競爭Master的問題,這里我們在競爭鎖的時候沒有采用阻塞等待的方式,而是采用了一種危險性相對小的方式: tryLock
:
private boolean tryLockMaster() { long currentTime = RedisConfigUtil.redisTime(configContext); String val = String.valueOf(currentTime + Constants.DEFAULT_MASTER_LOCK_TIMEOUT + 1); Jedis jedis = null; try { jedis = RedisConfigUtil.getJedis(configContext).get(); boolean locked = jedis.setnx(Constants.KEY_LOCK_FOR_MASTER, val) == 1; if (locked) { jedis.expire(Constants.KEY_LOCK_FOR_MASTER, Constants.DEFAULT_MASTER_LOCK_TIMEOUT); return true; } } finally { if (jedis != null) RedisConfigUtil.returnResource(jedis); } return false; }
只有判斷Master下線之后,才會調用 tryLockMaster
,它僅僅是嘗試獲得鎖,如果獲取成功,將給鎖設置一個很短的過期時間,這里跟跟心跳過期時間相同。如果獲取失敗將繼續檢測心跳。獲取鎖的Slave會立即變為Master并迅速刷新自己的心跳,這樣,其他Slave檢測Master下線就會失敗,將不會再去調用 tryLockMaster
。避免了通常情況下,一直阻塞、競爭鎖這一條路。
擴展性
擴展功能
得益于Redis的 PubSub
,我們可以實現很多類似于 指令下發->執行
的feature,比如實時獲取任務的執行進度、讓各服務器匯報自己的狀態等。因為時間關系,目前這塊只是留了一個擴展口:
- 上行頻道:執行器有一個
upstream
channel,用于上傳各節點的本機信息。 - 下行頻道:系統有一個
downstream
channel,用于被動接受來自上游的信息/指令。
這里上下游的語義是:所有服務節點均為下游, redis
配置中心應該算是中心節點,在上游你可以定制一個管控臺,用于管理 redis
配置中心并向下游的服務節點下發指令。
擴展隊列服務
如果你想擴展它,希望它支持另一種隊列服務(為了方便表述,這里假設你想支持RabbitMQ)。那么你需要做以下幾步:
- 在package:
com.github.horae.exchanger
包下新建類:RabbitConnManager
用于管理client 到 RabbitMQ的連接 - 同樣在package:
com.github.horae.exchanger
包下新建類:RabbitExchanger
用于實現消息的出隊與入隊邏輯,該類需實現TaskExchanger
- 在
TaskExchangerManager
的createTaskExchanger
方法內加入新的分支判斷。 - 在
partition.properties
下可以配置新的partition,在matrix中指定RabbitMQ
需要 注意 的是: TaskExchanger
的 dequeue
接口方法,默認的行為是 block
形式的。如果你擴展的隊列不支持block形式的消費,那可能需要你自己實現,實現的方式可以借助于 java.util.concurrent.BlockingQueue
。
多種可靠性級別
隊列的可靠性牽扯到整個分布式系統的可靠性,這是一個無法回避的問題。如果你說用 redis
實現的隊列,是否能做到既保持 高性能
又能兼具 高可靠
,答案是 不能
。或者說它不是一個專業的隊列服務(不然redis的作者也沒有必要再另起 disque
項目了)。如果從可靠性的角度而言,我給幾個主流的隊列服務器(或者可以提供隊列服務)的排名是: RabbitMQ
> Kafka
> Disque
> Redis
。雖然這個執行器內置支持了 disque
和 redis
作為隊列的實現,但它跟你選擇的隊列服務沒有非常緊的耦合關系,你可以選擇其他隊列服務,通常你只需要實現這么幾個功能 入隊消息
、 出隊消息
、 ack消息
、 管理連接
。
分區
對我而言 分區
的概念來自于Kafka,但這里的分區跟Kafka性質不太一樣。首先我們來看為什么有這樣的需求?
作為一個無狀態的服務,它可以長時間運行(某種程度上,這有點像Storm)而不必下線。為了充分榨取CPU的價值。我們可能希望在一次服務的生命周期內 讓它運行多個異構服務(所謂異構任務,就是不同性質的任務)。因此我們有必要將多個異構任務區分開來,而這個手段就是 分區
。說它不同于kafka的原因是:它更多是一種邏輯上的劃分,而不是kafka物理上按分區存儲消息。我們來看一個分區隔離了哪些東西:
partition.root=p0,p1 p0.matrix=redis p0.host=127.0.0.1 p0.port=6379 p0.class=com.github.horae.task.RedisTask p1.matrix=disque p1.host=127.0.0.1 p1.port=7711 p1.class=com.github.horae.task.DisqueTask
- matrix : 哪種隊列實現服務,目前支持
disque
/redis
- host : 隊列服務器的host
- port : 隊列服務器的port
- class : 處理隊列任務的實現類的完全限定名
從上面的隔離方式來看,這里的分區也能做到對任務隊列的物理隔離。上面配置了兩個分區,兩個分區分別對應了兩種隊列服務。分區跟隊列服務的對應關系沒有限制,甚至多個分區對應一個隊列服務器也可行,因為還有一個分區到隊列名稱的映射關系:
如下圖:
綜述:分區隔離了異構任務的隊列,而隊列存儲于何種隊列服務、存儲于何處、以及任務的處理邏輯完全取決于配置。
上面的解析明確了分區跟任務處理類的對應關系。為了便于管理,一個分區也有其獨立的線程池來將異構任務的線程隔離開來。
編寫任務處理器
在你編寫一個任務處理器之前,你應該意識到你編寫的任務處理器充當的是 隊列的消費者
。接下來你需要了解的是,你編寫的任務處理器將在一個線程池中運行,而線程池的管理,需要你關心,但你需要知道: 一個任務隊列將會對應一個線程
。你需要知道的就是這么多,下面來編寫一個任務處理器:
- 首先你需要創建一個新的maven工程
- 在horae發布包的庫目錄下(
./horae/libs
)找到以horae
開頭的jar文件,加入到你的maven依賴中,只是一個本地依賴:<dependency> <groupId>com.github.horae</groupId> <artifactId>horae</artifactId> <version>0.1.0</version> <scope>system</scope> <systemPath>/usr/local/horae/libs/horae-0.1.0.jar</systemPath> </dependency>
- 你需要新建一個類,繼承
TaskTemplate
,并實現run
方法,下面是一個模板:public void run() { try { signal.await(); //implement task process business logic } catch (InterruptedException e) { } }
- 編寫構造方法:
public RedisTestTask(CountDownLatch signal, String queueName, Map<String, Object> configContext, Partition partition, TaskExchanger taskExchanger) { super(signal, queueName, configContext, partition, taskExchanger); }
在run方法的第一句,你需要調用一個 CountDownLatch
實例的 await
方法來將其阻塞住。解釋一下,為什么需要這么做?
其實,每個服務在啟動的時候,都會立即讀取redis內配置的隊列,并初始化線程池,進入執行就緒狀態。這一步,所有的服務,無論是Master,Slave都是一樣的。但區別就區別在這句:
signal.await();
當啟動的是master節點,那么該signal會立即釋放信號(通過 signal.countDown()
),所有任務處理器都立即開始執行。
而啟動的是slave節點,則將會一直在上面這句代碼這里阻塞,直到master下線,而該節點競爭到master之后,會立即釋放解除阻塞信號,后續代碼會立即執行。
因此這么做可以使得在master下線之后,所有Slave都以最快的速度進入任務執行狀態,雖然對一些Slave節點而言,這有些浪費系統資源。
-
編譯工程并打包jar,注意不用包含上面的maven依賴,它已經存在于
horae
可執行文件類庫中。 -
將生成的jar放置于
./horae/libs/
下,它將會被自動添加到classpath
中 -
編輯配置文件
./horae/conf/partition.properties
,新建/修改一個分區的p{x}.class
,值為你剛剛編寫的任務實現類的 完全限定名 。
安裝部署
以下安裝步驟在Mac OS X系統驗證通過(Linux系類似,但存在一些不同)。Mac用戶需要預裝 Homebrew
- 安裝jsvc
brew install jsvc
- 安裝redis
brew install redis
- 安裝disque
因為disque目前還沒有一個穩定的版本,所以暫時被homebrew暫存在 head-only 倉庫中,安裝命令略有不同:
brew install --HEAD homebrew/head-only/disque
horae
源碼編譯、打包
mvn assembly:assembly
- 拷貝打包文件到目標文件夾,并解壓縮
cd ${project_baseDir}/target/horae.zip /usr/local unzip /usr/local/horae.zip
- 配置可執行文件,主要是命令與路徑
sudo vi /usr/local/horae/bin/horae.sh
- 配置conf下的配置文件
sudo vi /usr/local/horae/conf/${service/redisConf/partition}.properties
- 執行命令
sudo sh /usr/local/horae/bin/horae.sh ${start/stop/restart}
注意事項
- conf下的service.properties中的配置項
master
在所有節點中只能有一個被設置為true。如果它下線,將不能以master的身份再次啟動。 - 因為jsvc需要寫進程號(pid),所以盡量以系統管理員身份執行,將horae.sh里的
user
配置為root
,并以sudo
執行
關于disque
目前disque仍處于alpha版本,命令也還在調整中。雖然已被支持,但無論是disque的server以及其java client: jedisque
都存在bug,因此暫時 不推薦 使用,請至少等到發布stable版本再使用。
自實現的 jedisque
連接池。目前jedisque的客戶端還沒有提供連接池機制,它跟redis的主流java client: jedis
出自同一個開發者手筆。考慮到 jedis
內部使用的是 apache commons-pool
實現連接池機制,在實現 jedisque
的時候也使用的是同樣的方案,等 jedisque
官方提供連接池之后,會采用官方連接池。
disque
的開發過程中,對命令和命令參數可能會進行調整, horae
也會對此進行跟進。雖然, disque
的stable版本還未發布,但redis作者的水準和口碑有目共睹,所以你有理由相信它能給你帶來驚喜。
本項目的開源地址: https://github.com/yanghua/horae