storm java 編程思路

ShaCGRU 9年前發布 | 9K 次閱讀 分布式/云計算/大數據

來自: http://my.oschina.net/u/2000675/blog/610587


總體思路

storm編程和hadoop的mapreduce的編程很類似,hadoop的mapreduce需要自己實現map函數,reduce函數,還有一個主類驅動;storm需要自己實現spout,bolt和一個主函數。storm編程為以下三步:

創建一個Spout讀取數據
創建bolt處理數據
創建一個主類,在主類中創建拓撲和一個集群對象,將拓撲提交到集群

Topology運行方式
Topology的運行可以分為本地模式和分布式模式,模式的設置可以在配置文件中設定,也可以在代碼中設置。本地模式其實什么都不需要安裝,有storm jar包就夠了 

(1)本地運行的提交方式:
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(topologyName, conf, topology);
cluster.killTopology(topologyName);
cluster.shutdown();
 
(2)分布式提交方式:
StormSubmitter.submitTopology(topologyName, topologyConfig, builder.createTopology());
需要注意的是,在Storm代碼編寫完成之后,需要打包成jar包放到Nimbus中運行,打包的時候,不需要把依賴的jar都打進去,否則如果把依賴的storm.jar包打進去的話,運行時會出現重復的配置文件錯誤導致Topology無法運行。因為Topology運行之前,會加載本地的storm.yaml配置文件。
在Nimbus運行的命令如下:
storm jar StormTopology.jar maincalss args
Topology運行流程
  有幾點需要說明的地方:
(1)Storm提交后,會把代碼首先存放到Nimbus節點的inbox目錄下,之后,會把當前Storm運行的配置生成一個stormconf.ser文件放到Nimbus節點的stormdist目錄中,在此目錄中同時還有序列化之后的Topology代碼文件;
(2)在設定Topology所關聯的Spouts和Bolts時,可以同時設置當前Spout和Bolt的executor數目和task數目,默認情況下,一個Topology的task的總和是和executor的總和一致的。之后,系統根據worker的數目,盡量平均的分配這些task的執行。worker在哪個supervisor節點上運行是由storm本身決定的;
(3)任務分配好之后,Nimbes節點會將任務的信息提交到zookeeper集群,同時在zookeeper集群中會有workerbeats節點,這里存儲了當前Topology的所有worker進程的心跳信息;
(4)Supervisor節點會不斷的輪詢zookeeper集群,在zookeeper的assignments節點中保存了所有Topology的任務分配信息、代碼存儲目錄、任務之間的關聯關系等,Supervisor通過輪詢此節點的內容,來領取自己的任務,啟動worker進程運行;
(5)一個Topology運行之后,就會不斷的通過Spouts來發送Stream流,通過Bolts來不斷的處理接收到的Stream流,Stream流是無界的。
最后一步會不間斷的執行,除非手動結束Topology。
Topology方法調用流程
Topology中的Stream處理時的方法調用過程如下:

 有幾點需要說明的地方:
   (1)每個組件(Spout或者Bolt)的構造方法和declareOutputFields方法都只被調用一次。
   (2)open方法、prepare方法的調用是多次的。入口函數中設定的setSpout或者setBolt里的并行度參數指的是executor的數目,是負責運行組件中的task的線程         的數目,此數目是多少,上述的兩個方法就會被調用多少次,在每個executor運行的時候調用一次。相當于一個線程的構造方法。
   (3)nextTuple方法、execute方法是一直被運行的,nextTuple方法不斷的發射Tuple,Bolt的execute不斷的接收Tuple進行處理。只有這樣不斷地運行,才會產生無界的Tuple流,體現實時性。相當于線程的run方法。
   (4)在提交了一個topology之后,Storm就會創建spout/bolt實例并進行序列化。之后,將序列化的component發送給所有的任務所在的機器(即Supervisor節點),在每一個任務上反序列化component。
   (5)Spout和Bolt之間、Bolt和Bolt之間的通信,是通過zeroMQ的消息隊列實現的。
   (6)上圖沒有列出ack方法和fail方法,在一個Tuple被成功處理之后,需要調用ack方法來標記成功,否則調用fail方法標記失敗,重新處理這個Tuple。
Topology并行度
    在Topology的執行單元里,有幾個和并行度相關的概念。
(1)worker:每個worker都屬于一個特定的Topology,每個Supervisor節點的worker可以有多個,每個worker使用一個單獨的端口,它對Topology中的每個component運行一個或者多個executor線程來提供task的運行服務。
(2)executor:executor是產生于worker進程內部的線程,會執行同一個component的一個或者多個task。
(3)task:實際的數據處理由task完成,在Topology的生命周期中,每個組件的task數目是不會發生變化的,而executor的數目卻不一定。executor數目小于等于task的數目,默認情況下,二者是相等的。
    在運行一個Topology時,可以根據具體的情況來設置不同數量的worker、task、executor,而設置的位置也可以在多個地方。
(1)worker設置:
(1.1)可以通過設置yaml中的topology.workers屬性
(1.2)在代碼中通過Config的setNumWorkers方法設定
(2)executor設置:
    通過在Topology的入口類中setBolt、setSpout方法的最后一個參數指定,不指定的話,默認為1;
(3)task設置:
    (3.1) 默認情況下,和executor數目一致;
    (3.2)在代碼中通過TopologyBuilder的setNumTasks方法設定具體某個組件的task數目;
終止Topology
    通過在Nimbus節點利用如下命令來終止一個Topology的運行:
storm kill topologyName
    kill之后,可以通過UI界面查看topology狀態,會首先變成KILLED狀態,在清理完本地目錄和zookeeper集群中的和當前Topology相關的信息之后,此Topology就會徹底消失了。
Topology跟蹤

    Topology提交后,可以在Nimbus節點的web界面查看,默認的地址是http://NimbusIp:8080

 

 本文由用戶 ShaCGRU 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!