storm基礎框架分析
背景
前期收到的問題:
1、在Topology中我們可以指定spout、bolt的并行度,在提交Topology時Storm如何將spout、bolt自動發布到每個服務器并且控制服務的CPU、磁盤等資源的?
2、Storm處理消息時會根據Topology生成一棵消息樹,Storm如何跟蹤每個消息、如何保證消息不丟失以及如何實現重發消息機制?
上篇:storm是如何保證at least once語義的
回答了第2個問題。
本篇來建立一個基本的背景,來大概看下構成storm流式計算能力的一些基礎框架,并部分回答第一個問題。
worker、executor、task的關系
worker是一個進程.
executor是一個線程,是運行tasks的物理容器.
task是對spout/bolt/acker等任務的邏輯抽象.
supervisor會定時從zookeeper獲取拓補信息topologies、任務分配信息assignments及各類心跳信息,以此為依據進行任務分配。
在supervisor同步時,會根據新的任務分配情況來啟動新的worker或者關閉舊的worker并進行負載均衡。
worker通過定期的更新connections信息,來獲知其應該通訊的其它worker。
worker啟動時,會根據其分配到的任務啟動一個或多個executor線程。這些線程僅會處理唯一的topology。
如果有新的tolopogy被提交到集群,nimbus會重新分配任務,這個后面會說到。
executor線程負責處理多個spouts或者多個bolts的邏輯,這些spouts或者bolts,也稱為tasks。
具體有多少個worker,多少個executor,每個executor負責多少個task,是由配置和指定的parallelism-hint共同決定的,但這個值并不一定等于實際運行中的數目。
如果計算出的總的executors超過了nimbus的限制,此topology將不會得到執行。
并行度的作用:
上述代碼會在nimbus進行任務分配時調用:
線程模型及消息系統
基本關系如下所示:
worker啟動時,除了啟動多個executor線程,還會啟動多個工作線程來負責消息傳遞。
worker會訂閱到transfer-queue來消費消息,同時也會發布消息到transfer-queue,比如需要進行遠程發布時(某個bolt在另一個進程或者節點上)。
executor會發布消息到executor-send-queue比如emit tuple,同時會從executor-receive-queue消費消息,比如執行ack或者fail。
batch-transfer-worker-handler線程訂閱到executor-send-queue消費消息,并將消息發布到transfer-queue供worker消費。
transfer-thread會訂閱到transfer-queue消費消息,并負責將消息通過socket發送到遠程節點的端口上。
worker通過receive-thread線程來收取遠程消息,并將消息以本地方式發布到消息中指定的executor對應的executor-receive-queue。executor按第3點來消費消息。
以上所有的消息隊列都是Disruptor Queue,非常高效的線程間通訊框架。
所謂本地發布,是指在worker進程內及executor線程間進行消息發布。
所謂遠程發布,是指在worker進程間、不同的機器間進行消息發布。
任務調度及負載均衡
任務調度的主要角色
nimbus將可以工作的worker稱為worker-slot.
nimbus是整個集群的控管核心,總體負責了topology的提交、運行狀態監控、負載均衡及任務重新分配,等等工作。
nimbus分配的任務包含了topology代碼所在的路徑(在nimbus本地)、tasks、executors及workers信息。
worker由node + port唯一確定。
supervisor負責實際的同步worker的操作。一個supervisor稱為一個node。所謂同步worker,是指響應nimbus的任務調度和分配,進行worker的建立、調度與銷毀。
其通過將topology的代碼從nimbus下載到本地以進行任務調度。
任務分配信息中包含task到worker的映射信息task -> node + host,所以worker節點可據此信息判斷跟哪些遠程機器通訊。
集群的狀態機
集群狀態管理
集群的狀態是通過一個storm-cluster-state的對象來描述的。
其提供了許多功能接口,比如:
zookeeper相關的基本操作,如create-node、set-data、remove-node、get-children等.
心跳接口,如supervisor-heartbeat!、worker-heatbeat!等.
心跳信息,如executors-beats等.
啟動、更新、停止storm,如update-storm!等.
如下圖所示:
任務調度的依據
zookeeper是整個集群狀態同步、協調的核心組件。
supervisor、worker、executor等組件會定期向zookeeper寫心跳信息。
當topology出現錯誤、或者有新的topology提交到集群時,topologies信息會同步到zookeeper。
nimbus會定期監視zookeeper上的任務分配信息assignments,并將重新分配的計劃同步到zookeeper。
所以,nimbus會根據心跳、topologies信息及已分配的任務信息為依據,來重新分配任務,如下圖所示:
任務調度的時機
如上文的狀態機圖所示,rebalance和do-reblalance(比如來自web調用)會觸發mk-assignments即任務(重新)分配。
同時,nimbus進程啟動后,會周期性地進行mk-assignments調用,以進行負載均衡和任務分配。
客戶端通過storm jar ... topology 方式提交topology,會通過thrift接口調用nimbus的提交功能,此時會啟動storm,并觸發mk-assignments調用。
topology提交過程
一個topology的提交過程:
非本地模式下,客戶端通過thrift調用nimbus接口,來上傳代碼到nimbus并觸發提交操作.
nimbus進行任務分配,并將信息同步到zookeeper.
supervisor定期獲取任務分配信息,如果topology代碼缺失,會從nimbus下載代碼,并根據任務分配信息,同步worker.
worker根據分配的tasks信息,啟動多個executor線程,同時實例化spout、bolt、acker等組件,此時,等待所有connections(worker和其它機器通訊的網絡連接)啟動完畢,此storm-cluster即進入工作狀態。
除非顯示調???kill topology,否則spout、bolt等組件會一直運行。
主要過程如下圖所示:
結語
以上,基本闡述了storm的基礎框架,但未涉及trident機制,也基本回答了問題1。
來自:http://www.uml.org.cn/bigdata/201608024.asp