storm基礎框架分析

背景

前期收到的問題:

1、在Topology中我們可以指定spout、bolt的并行度,在提交Topology時Storm如何將spout、bolt自動發布到每個服務器并且控制服務的CPU、磁盤等資源的?

2、Storm處理消息時會根據Topology生成一棵消息樹,Storm如何跟蹤每個消息、如何保證消息不丟失以及如何實現重發消息機制?

上篇:storm是如何保證at least once語義的

回答了第2個問題。

本篇來建立一個基本的背景,來大概看下構成storm流式計算能力的一些基礎框架,并部分回答第一個問題。

worker、executor、task的關系

worker是一個進程.

executor是一個線程,是運行tasks的物理容器.

task是對spout/bolt/acker等任務的邏輯抽象.

supervisor會定時從zookeeper獲取拓補信息topologies、任務分配信息assignments及各類心跳信息,以此為依據進行任務分配。

在supervisor同步時,會根據新的任務分配情況來啟動新的worker或者關閉舊的worker并進行負載均衡。

worker通過定期的更新connections信息,來獲知其應該通訊的其它worker。

worker啟動時,會根據其分配到的任務啟動一個或多個executor線程。這些線程僅會處理唯一的topology。

如果有新的tolopogy被提交到集群,nimbus會重新分配任務,這個后面會說到。

executor線程負責處理多個spouts或者多個bolts的邏輯,這些spouts或者bolts,也稱為tasks。

具體有多少個worker,多少個executor,每個executor負責多少個task,是由配置和指定的parallelism-hint共同決定的,但這個值并不一定等于實際運行中的數目。

如果計算出的總的executors超過了nimbus的限制,此topology將不會得到執行。

并行度的作用:

上述代碼會在nimbus進行任務分配時調用:

線程模型及消息系統

基本關系如下所示:

worker啟動時,除了啟動多個executor線程,還會啟動多個工作線程來負責消息傳遞。

worker會訂閱到transfer-queue來消費消息,同時也會發布消息到transfer-queue,比如需要進行遠程發布時(某個bolt在另一個進程或者節點上)。

executor會發布消息到executor-send-queue比如emit tuple,同時會從executor-receive-queue消費消息,比如執行ack或者fail。

batch-transfer-worker-handler線程訂閱到executor-send-queue消費消息,并將消息發布到transfer-queue供worker消費。

transfer-thread會訂閱到transfer-queue消費消息,并負責將消息通過socket發送到遠程節點的端口上。

worker通過receive-thread線程來收取遠程消息,并將消息以本地方式發布到消息中指定的executor對應的executor-receive-queue。executor按第3點來消費消息。

以上所有的消息隊列都是Disruptor Queue,非常高效的線程間通訊框架。

所謂本地發布,是指在worker進程內及executor線程間進行消息發布。

所謂遠程發布,是指在worker進程間、不同的機器間進行消息發布。

任務調度及負載均衡

任務調度的主要角色

nimbus將可以工作的worker稱為worker-slot.

nimbus是整個集群的控管核心,總體負責了topology的提交、運行狀態監控、負載均衡及任務重新分配,等等工作。

nimbus分配的任務包含了topology代碼所在的路徑(在nimbus本地)、tasks、executors及workers信息。

worker由node + port唯一確定。

supervisor負責實際的同步worker的操作。一個supervisor稱為一個node。所謂同步worker,是指響應nimbus的任務調度和分配,進行worker的建立、調度與銷毀。

其通過將topology的代碼從nimbus下載到本地以進行任務調度。

任務分配信息中包含task到worker的映射信息task -> node + host,所以worker節點可據此信息判斷跟哪些遠程機器通訊。

集群的狀態機

集群狀態管理

集群的狀態是通過一個storm-cluster-state的對象來描述的。

其提供了許多功能接口,比如:

zookeeper相關的基本操作,如create-node、set-data、remove-node、get-children等.

心跳接口,如supervisor-heartbeat!、worker-heatbeat!等.

心跳信息,如executors-beats等.

啟動、更新、停止storm,如update-storm!等.

如下圖所示:

任務調度的依據

zookeeper是整個集群狀態同步、協調的核心組件。

supervisor、worker、executor等組件會定期向zookeeper寫心跳信息。

當topology出現錯誤、或者有新的topology提交到集群時,topologies信息會同步到zookeeper。

nimbus會定期監視zookeeper上的任務分配信息assignments,并將重新分配的計劃同步到zookeeper。

所以,nimbus會根據心跳、topologies信息及已分配的任務信息為依據,來重新分配任務,如下圖所示:

任務調度的時機

如上文的狀態機圖所示,rebalance和do-reblalance(比如來自web調用)會觸發mk-assignments即任務(重新)分配。

同時,nimbus進程啟動后,會周期性地進行mk-assignments調用,以進行負載均衡和任務分配。

客戶端通過storm jar ... topology 方式提交topology,會通過thrift接口調用nimbus的提交功能,此時會啟動storm,并觸發mk-assignments調用。

topology提交過程

一個topology的提交過程:

非本地模式下,客戶端通過thrift調用nimbus接口,來上傳代碼到nimbus并觸發提交操作.

nimbus進行任務分配,并將信息同步到zookeeper.

supervisor定期獲取任務分配信息,如果topology代碼缺失,會從nimbus下載代碼,并根據任務分配信息,同步worker.

worker根據分配的tasks信息,啟動多個executor線程,同時實例化spout、bolt、acker等組件,此時,等待所有connections(worker和其它機器通訊的網絡連接)啟動完畢,此storm-cluster即進入工作狀態。

除非顯示調???kill topology,否則spout、bolt等組件會一直運行。

主要過程如下圖所示:

結語

以上,基本闡述了storm的基礎框架,但未涉及trident機制,也基本回答了問題1。

 

來自:http://www.uml.org.cn/bigdata/201608024.asp

 

 本文由用戶 KristalKort 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!