LTS 任務調度框架(Light Task Schedule)

jopen 10年前發布 | 167K 次閱讀 任務調度 作業調度框架

框架概況:

LTS是一個輕任務調度框架,參考hadoop的部分思想。有三種角色, JobClient, JobTracker, TaskTracker。各個節點都是無狀態的,可以部署多個,來實現負載均衡,實現更大的負載量, 并且框架具有很好的容錯能力。 采用Zookeeper暴露節點信息,master選舉。Mongo存儲任務隊列和任務執行日志, netty做底層通信。

  • JobClient : 主要負責提交任務, 和 接收任務執行反饋結果。
  • JobTracker : 負責接收并分配任務,任務調度。
  • TaskTracker: 負責執行任務,執行完反饋給JobTracker。

架構圖

架構圖.png

節點組:

  • 1. 一個節點組等同于一個集群,同一個節點組中的各個節點是對等的,外界無論連接節點組中的任務一個節點都是可以的。
  • 2. 每個節點組中都有一個master節點,采用zookeeper進行master選舉(master宕機,會自動選舉出新的master節點),框架會提供接口API來監聽master節點的變化,用戶可以自己使用master節點做自己想做的事情。
  • 3. JobClient和TaskTracker都可以存在多個節點組。譬如 JobClient 可以存在多個節點組。 譬如:JobClient 節點組為 ‘QN_WEB’ 中的一個節點提交提交一個 只有節點組為’QN_TRADE’的 TaskTracker 才能執行的任務。
  • 4. (每個集群中)JobTacker只有一個節點組。
  • 5. 多個JobClient節點組和多個TaskTracker節點組再加上一個JobTacker節點組, 組成一個大的集群。

工作流程:

  • 1. JobClient 提交一個 任務 給 JobTracker, 這里我提供了兩種客戶端API, 一種是如果JobTracker 不存在或者提交失敗,直接返回提交失敗。另一種客戶端是重試客戶端, 如果提交失敗,先存儲文件,返回給客戶端提交成功的信息,待JobTracker可用的時候,再將任務提交。
  • 2. JobTracker 收到JobClient提交來的任務,先生成一個唯一的JobID。然后將任務儲存在Mongo集群中。JobTracker 發現有(任務執行的)可用的TaskTracker節點(組) 之后,將優先級最大,最先提交的任務分發給TaskTracker。這里JobTracker會優先分配給比較空閑的TaskTracker節點,達到負載均衡。
  • 3. TaskTracker 收到JobTracker分發來的任務之后,執行。執行完畢之后,再反饋任務執行結果給JobTracker(成功or 失敗[失敗有失敗錯誤信息]),如果發現JobTacker不可用,那么存儲文件,等待TaskTracker可用的時候再反饋。反饋結果的同時,詢問 JobTacker有沒有新的任務要執行。
  • 4. JobTacker收到TaskTracker節點的任務結果信息,生成并插入(mongo)任務執行日志。根據任務信息決定要不要反饋給客戶端。不需要反饋的直接刪除, 需要反饋的(同樣JobClient不可用存儲文件,等待可用重發)。
  • 5. JobClient 收到任務執行結果,進行自己想要的邏輯處理。

特性

  • 負載均衡:

    • JobClient 和 TaskTracker會隨機連接JobTracker節點組中的一個節點,實現JobTracker負載均衡。當連接上后,將一直保持連接這個節點,保持連接通道,知道這個節點不可用,減少每次都重新連接一個節點帶來的性能開銷。
    • JobTracker 分發任務時,是優先分配給最空間的一個TaskTracker節點,實現TaskTracker節點的負載均衡。
  • 健壯性:

    • 當節點組中的一個節點當機之后,自動轉到其他節點工作。當整個節點組當機之后,將會采用存儲文件的方式,待節點組可用的時候進行重發。
    • 當執行任務的TaskTracker節點當機之后,JobTracker 會將這個TaskTracker上的未完成的任務(死任務),重新分配給節點組中其他節點執行。
  • 伸縮性:

    • 因為各個節點都是無狀態的,可以動態增加機器部署實例, 節點關注者會自動發現。

調用示例

  • 安裝 zookeeper 和 mongo , 執行 data/mongo 目錄下的 mongo.md 中的語句

見 job-example 這里給出的是java API(設置配置)方式啟動,也可以使用配置文件中。

JobTracker 端

    final JobTracker jobTracker = new JobTracker();
    // 節點信息配置
    jobTracker.setZookeeperAddress("localhost:2181");
    jobTracker.setListenPort(8089);
    jobTracker.setClusterName("QN");

    // mongo 配置 (也可以配置在 mongo.properties中)
    Config config = new Config();
    config.setAddresses(new String[]{"127.0.0.1:27017"});
    config.setUsername("lts");
    config.setPassword("lts");
    config.setDbName("job");

    jobTracker.setStoreConfig(config);

    // 啟動節點
    jobTracker.start();

JobClient端

    JobClient jobClient = new RetryJobClient();
    //JobClient jobClient = new JobClient();
    jobClient.setNodeGroup("TEST");
    jobClient.setClusterName("QN");
    jobClient.setZookeeperAddress("localhost:2181");
    jobClient.start();

    // 提交任務
    Job job = new Job();
    job.setTaskId(UUID.randomUUID().toString());
    Map<String, String> extParams = new HashMap<String, String>();
    extParams.put("key", "value");
    job.setExtParams(extParams);
    job.setTaskTrackerNodeGroup("TEST_TRADE");
    Response response = jobClient.submitJob(job);

TaskTracker端

    TaskTracker taskTracker = new TaskTracker();
    taskTracker.setJobRunnerClass(TestJobRunner.class);

    taskTracker.setZookeeperAddress("localhost:2181");
    taskTracker.setNodeGroup("TEST_TRADE");
    taskTracker.setClusterName("QN");
    taskTracker.setWorkThreads(20);
    taskTracker.start();

    // 任務執行類
    public class TestJobRunner implements JobRunner {

        @Override
        public void run(Job job) throws Throwable {


            System.out.println("我要執行"+ job);

            try {
                Thread.sleep(5*1000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }
    }

項目主頁:http://www.baiduhome.net/lib/view/home/1414241431434

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!