當當 Elastic-job 開源項目的十項特性
張亮:當當網架構師、當當技術委員會成員、消息中間件組負責人。對架構設計、分布式、優雅代碼等領域興趣濃厚。目前主導當當應用框架ddframe研發,并負責推廣及撰寫技術白皮書。
一、為什么需要作業(定時任務)?
作業即定時任務。一般來說,系統可使用消息傳遞代替部分使用作業的場景。兩者確有相似之處。可互相替換的場景,如隊列表。將待處理的數據放入隊列 表,然后使用頻率極短的定時任務拉取隊列表的數據并處理。這種情況使用消息中間件的推送模式可更好的處理實時性數據。而且基于數據庫的消息存儲吞吐量遠遠 小于基于文件的順序追加消息存儲。
但在某些場景下則不能互換:
- 時間驅動 OR 事件驅動:內部系統一般可以通過事件來驅動,但涉及到外部系統,則只能使用時間驅動。如:抓取外部系統價格。每小時抓取,由于是外部系統,不能像內部系統一樣發送事件觸發事件。
- 批量處理 OR 逐條處理:批量處理堆積的數據更加高效,在不需要實時性的情況下比消息中間件更有優勢。而且有的業務邏輯只能批量處理,如:電商公司與快遞公司結算,一個月結算一次,并且根據送貨的數量有提成。比如,當月送貨超過1000則額外給快遞公司多1%優惠。
- 非實時性 OR 實時性:雖然消息中間件可以做到實時處理數據,但有的情況并不需要如的實時。如:VIP用戶降級,如果超過1年無購買行為,則自動降級。這類需求沒有強烈的時間要求,不需要按照時間精確的降級VIP用戶。
- 系統內部 OR 系統解耦。作業一般封裝在系統內部,而消息中間件可用于系統間解耦。
二、當當之前在使用什么作業系統?
當當之前使用的作業系統比較散亂,各自為戰,大致分為以下4種:
- Quartz:Java事實上的定時任務標準。但Quartz關注點在于定時任務而非數據,并無一套根據數據處理而定制化的流程。雖然Quartz可以基于數據庫實現作業的高可用,但缺少分布式并行執行作業的功能。
- TBSchedule:阿里早期開源的分布式任務調度系統。代碼略陳舊,使用timer而非線程池執行任務調度。眾所周知,timer在處理異常狀況時是有缺陷的。而且TBSchedule作業類型較為單一,只能是獲取/處理數據一種模式。還有就是文檔缺失比較嚴重。
- Crontab:Linux系統級的定時任務執行器。缺乏分布式和集中管理功能。
- Perl:遺留系統使用,目前已不符合公司的Java化戰略。
三、elastic-job的來歷
elastic-job原本是當當Java應用框架ddframe的一部分,本名dd-job。
ddframe包括編碼規范,開發框架,技術規范,監控以及分布式組件。ddframe規劃分為4個演進階段,目前處于第2階段。3、4階段涉及的技術組件不代表當當沒有使用,只是ddframe還未統一規劃。
ddframe由各種模塊組成,均已dd-開頭,如dd-container,dd-soa,dd-rdb,dd-job等。當當希望將 ddframe的各個模塊與公司環境解耦并開源以反饋社區。之前開源的Dubbo擴展版本DubboX即是dd-soa的核心模塊。而本次介紹的 elastic-job則是dd-job的開源部分,其中監控(但開源了監控方法)和ddframe核心接入等部分并未開源。
四、elastic-job包含的功能
- 分布式:最重要的功能,如果任務不能在分布式的環境下執行,那么直接使用Quartz就可以了。
- 任務分片:是elastic-job中最重要也是最難理解的概念。任務的分布式執行,需要將一個任務拆分為n個獨立的任務項,然后由分布式的服務器分別執行某一個或幾個分片項。
- 彈性擴容縮容:將任務拆分為n個任務項后,各個服務器分別執行各自分配到的任務項。一旦有新的服務器加入集群,或現有服務器下 線,elastic-job將在保留本次任務執行不變的情況下,下次任務開始前觸發任務重分片。舉例說明:有3臺服務器,分為10個片。則分片項分配如 下:{server1: [0,1,2], server2: [3,4,5], server3: [6,7,8,9]}。如果一臺服務器崩潰,則分片項分配如下:{server1: [0,1,2,3,4], server2: [5,6,7,8,9]}。如果新增一臺服務器,則分片項分配如下:{server1: [0,1], server2: [2,3] , server3: [4,5,6] , server4: [7,8,9]}。
- 穩定性:在服務器無波動的情況下,并不會重新分片;即使服務器有波動,下次分片的結果也會根據服務器IP和作業名稱哈希值算出穩定的分片順序,盡量不做大的變動。
- 高性能:elastic-job會將作業運行狀態的必要信息更新到注冊中心,但為了考慮性能問題,可以犧牲一些功能,而換取性能的提升。
- 冪等性:elastic-job可犧牲部分性能用以保證同一分片項不會同時在兩個服務器上運行。
- 失效轉移:彈性擴容縮容在下次作業運行前重分片,但本次作業執行的過程中,下線的服務器所分配的作業將不會重新被分配。失效轉移功能可以在本次作業運行中用空閑服務器抓取孤兒作業分片執行。同樣失效轉移功能也會犧牲部分性能。
- 狀態監控:監控作業的運行狀態,可以監控數據處理功能和失敗次數,作業運行時間等。是冪等性,失效轉移必須的功能。
- 多作業模式:作業可分為簡單和數據流處理兩種模式,數據流又分為高吞吐處理模式和順序性處理模式,其中高吞吐處理模式可以開啟足夠多的線程快速的處理數據,而順序性處理模式將每個分片項分配到一個獨立線程,用于保證同一分片的順序性,這點類似于kafka的分區順序性。
- 其他一些功能,如錯過任務重執行,單機并行處理,容錯處理,Spring命名空間支持,運維平臺等。
五、elastic-job的部署和使用
將使用elastic-job框架的jar/war連接同一個基于Zookeeper的注冊中心即可。
作業框架執行數據并不限于數據庫,且作業框架本身是不對數據進行關聯的。作業可以用于處理數據,文件,API等任何操作。
使用elastic-job所需要關注的僅僅是將業務處理邏輯和框架所分配的分片項匹配并處理,如:如果分片項是1,則獲取id以1結尾的數據處理。所以如果是處理數據的話,最佳實踐是將作業分片項規則和數據中間層規則對應。
通過上面的部署圖可以看出來,作業分片只是個邏輯概念,分片和實際數據其實框架是不做任何匹配關系的。而根據分片項和實際業務如何關聯,是成功使 用elastic-job的關鍵所在。為了不讓代碼寫起來很無聊,看起來像if(shardingItem == 1) {do xxx} else if (shardingItem == 2) {do xxx},elastic-job提供了自定義參數,可將分片項序號和實際業務做映射。比如設置為1=北京,2=上海。那么代碼中可以通過北京或是上海的 枚舉,從業務中的北京倉庫或上海倉庫取數據。elastic-job更多的還是關注作業調度和分布式分配,處理數據還是交由數據中間層更好些。
誠如剛才所說,最佳實踐是將作業分片項規則和數據中間層規則對應,省去作業分片時,再次適配數據中間層的分片邏輯。