LTS 輕量級分布式任務調度框架(Light Task Scheduler)
框架概況:
LTS是一個輕量級分布式任務調度框架。有三種角色, JobClient, JobTracker, TaskTracker。各個節點都是無狀態的,可以部署多個,來實現負載均衡,實現更大的負載量, 并且框架具有很好的容錯能力。 采用多種注冊中心(Zookeeper,redis等)進行節點信息暴露,master選舉。(Mongo or Mysql)存儲任務隊列和任務執行日志, netty做底層通信。
- JobClient : 主要負責提交任務, 和 接收任務執行反饋結果。
- JobTracker : 負責接收并分配任務,任務調度。
- TaskTracker: 負責執行任務,執行完反饋給JobTracker。
支持任務類型:
- 實時任務
- 也支持定時任務 (如:3天之后執行)
- CronExpression (如:0 0/1 * * * ?)
感興趣,請加QQ群:109500214 一起探討、完善。并且記得star一下哈,3Q
github地址:https://github.com/qq254963746/light-task-scheduler
架構圖
節點組:
- 1. 一個節點組等同于一個集群,同一個節點組中的各個節點是對等的,外界無論連接節點組中的任務一個節點都是可以的。
- 2. 每個節點組中都有一個master節點(master宕機,會自動選舉出新的master節點),框架會提供接口API來監聽master節點的變化,用戶可以自己使用master節點做自己想做的事情。
- 3. JobClient和TaskTracker都可以存在多個節點組。譬如 JobClient 可以存在多個節點組。 譬如:JobClient 節點組為 ‘lts_WEB’ 中的一個節點提交提交一個 只有節點組為’lts_TRADE’的 TaskTracker 才能執行的任務。
- 4. (每個集群中)JobTacker只有一個節點組。
- 5. 多個JobClient節點組和多個TaskTracker節點組再加上一個JobTacker節點組, 組成一個大的集群。
工作流程:
- 1. JobClient 提交一個 任務 給 JobTracker, 這里我提供了兩種客戶端API, 一種是如果JobTracker 不存在或者提交失敗,直接返回提交失敗。另一種客戶端是重試客戶端, 如果提交失敗,先存儲到本地FailStore(可以使用NFS來達到同個節點組共享leveldb文件的目的,多線程訪問,已經做了文件鎖處理),返回 給客戶端提交成功的信息,待JobTracker可用的時候,再將任務提交。
- 2. JobTracker收到JobClient提交來的任務,將任務存入任務隊列。JobTracker等待TaskTracker的Pull請求,然后將任務Push給TaskTracker去執行。
- 3. TaskTracker收到JobTracker分發來的任務之后,然后從線程池中拿到一個線程去執行。執行完畢之后,再反饋任務執行結果給 JobTracker(成功or 失敗[失敗有失敗錯誤信息]),如果發現JobTacker不可用,那么存儲本地FailStore,等待TaskTracker可用的時候再反饋。反饋 結果的同時,詢問JobTacker有沒有新的任務要執行。
- 4. JobTacker收到TaskTracker節點的任務結果信息。根據任務信息決定要不要反饋給客戶端。不需要反饋的直接刪除,需要反饋的,直接反饋,反饋失敗進入FeedbackQueue, 等待重新反饋。
- 5. JobClient收到任務執行結果,進行自己想要的邏輯處理。
特性
-
負載均衡:
- JobClient和TaskTracker可是根據自己設置的負載均衡策略來請求JobTracker節點組中的一個節點。當連接上后將一直保持連接這個節點,保持連接通道,直到這個節點不可用,減少每次都重新連接一個節點帶來的性能開銷。
-
健壯性:
- 當節點組中的一個節點當機之后,自動轉到其他節點工作。當整個節點組當機之后,將會采用存儲文件的方式,待節點組可用的時候進行重發。
- 當執行任務的TaskTracker節點當機之后,JobTracker會將這個TaskTracker上的未完成的任務(死任務),重新分配給節點組中其他節點執行。
-
伸縮性:
- 因為各個節點都是無狀態的,可以動態增加機器部署實例, 節點關注者會自動發現。
- 擴展性:
- 采用和dubbo一樣的SPI擴展方式,可以實現任務隊列擴展,日志記錄器擴展等
日志記錄
對于任務的分發,執行,還有用戶通過 (BizLogger) 【LtsLoggerFactory.getBizLogger()】 輸入的業務日志,LTS都有記錄,用戶可以在LTS Admin 后臺界面查看某個任務的所有日志,可以實時查看這個任務的執行情況。
開發計劃:
- WEB后臺管理:性能統計分析,預警等
- 實現LTS的分布式隊列存儲
LTS Admin
調用示例
下面提供的是最簡單的配置方式。更多配置請查看 lts-example 模塊下的 API 調用方式例子.
JobTracker 端
final JobTracker jobTracker = new JobTracker(); // 節點信息配置 jobTracker.setRegistryAddress("zookeeper://127.0.0.1:2181"); // 1. 任務隊列用mongo jobTracker.addConfig("job.queue", "mongo"); // mongo 配置 jobTracker.addConfig("mongo.addresses", "127.0.0.1:27017"); jobTracker.addConfig("mongo.database", "lts"); jobTracker.setOldDataHandler(new OldDataDeletePolicy()); // 啟動節點 jobTracker.start();
TaskTracker端
TaskTracker taskTracker = new TaskTracker(); taskTracker.setJobRunnerClass(TestJobRunner.class); taskTracker.setRegistryAddress("zookeeper://127.0.0.1:2181"); taskTracker.setNodeGroup("test_trade_TaskTracker"); taskTracker.setWorkThreads(20); taskTracker.start(); // 任務執行類 public class TestJobRunner implements JobRunner { @Override public void run(Job job) throws Throwable { System.out.println("我要執行"+ job); System.out.println(job.getParam("shopId")); // TODO 用戶自己的業務邏輯, 應該保證冪等 try { Thread.sleep(5*1000L); } catch (InterruptedException e) { e.printStackTrace(); } } }
JobClient端
JobClient jobClient = new RetryJobClient(); // final JobClient jobClient = new JobClient(); jobClient.setNodeGroup("test_jobClient"); jobClient.setRegistryAddress("zookeeper://127.0.0.1:2181"); jobClient.start(); // 提交任務 Job job = new Job(); job.setTaskId("3213213123"); job.setParam("shopId", "11111"); job.setTaskTrackerNodeGroup("test_trade_TaskTracker"); // job.setCronExpression("0 0/1 * * * ?"); // 支持 cronExpression表達式 // job.setTriggerTime(new Date()); // 支持指定時間執行 Response response = jobClient.submitJob(job);
本文由用戶 pykde 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!