Storm筆記

jopen 9年前發布 | 38K 次閱讀 Storm 分布式/云計算/大數據

用了一段時間Storm后的筆記。發現可以記的東西不多,證明Storm挺簡單的,你只要遵循一些簡單的接口與原則,就能寫出大規模實時消息處理的程序。

為什么用Storm

沒接觸前把Storm想象得很強大,接觸后覺得它就那樣可有可無,再后來又覺得沒有了全部自己做也麻煩。

1. 集群管理:支持應用的部署,工作節點的管理(任務分配、HA、Scalable等),Metrics的收集。

2. 數據流的傳輸與路由:支持多種數據在各處理節點間自由流動,基于Netty的高效傳輸機制,支持輪詢,多播,按屬性分組的路由。

3. 數據高可靠性的保證:還支持實現數據流動了多個節點后,在某個節點的失敗,可以引發數據從源頭開始重傳的高級功能

按Storm的官方說法,你也可以自己搭建許多消息隊列和worker組成的網絡來實現實時處理,但是:

乏味:你大部份開發時間花費在配置消息發送到哪里,部署worker,還有部署中間隊列。你所關心的實時處理邏輯對應到你的代碼的只占了很少的比例 。

脆弱:你要自己負責保持每個worker和隊列正常工作。

伸縮時痛苦:當單個worker或隊列的消息吞吐量太高時,你需要分區,即數據如何分散。你需要重新配置其它worker,讓它們發送消息到新位置。這導致刪除或添加部件都可能失敗。

缺點

核心代碼是用Clojure寫成,翻看代碼非常不便。其實,它現在很多新的外部模塊都用Java來寫了,另外阿里同學翻寫了一個JStorm

其他流處理方案

Spark-Streaming: 總是有人問為什么不用Spark Stream,首先它是Micro-Batch風格的準實時方案,間隔一般設到500ms。另外,它的消息流拓撲好像沒Storm那樣可以隨便亂入,有時候必須弄個DB來做中間傳輸。

Samza, Linkedin的產品,在Linkedin里與Apache Kafka搭配。不過它的使用者沒有Apache Kafka多。待研究。

Pulsar:來自eBay的開源實時分析平臺,看文章很強,待研究。

自定義Spout

Storm對可靠消息傳輸的支持程度,很大程度上依賴于Spout的實現。

并不默認就是支持高可靠性的,collector emit的時候要傳輸msgId,要自己處理ack(msgId)和fail(msgId)函數。而很多spout其實沒有這樣做,只有Kafka Spout做的比較正規。

默認的,如果三十秒,消息流經的所有下游節點沒有都ack完畢,或者有一個節點報fail,則觸發fail(msgId)函數。

因為ack/fail的參數只有msgId,這就要Spout想在ack/fail時對消息源如Kafka/JMS進行ack/fail,或fail時想重發消息,如果需要完整的消息體而不只是msgId才能完成時,要自己把msgId對應的消息存起來(會撐爆內存么)。

另外,因為每個Spout 是單線程的,會循環的調用nextTuple()的同時,調用ack()或fail()處理結果。所以nextTuple()如果沒消息時不要長期阻塞,但也不要完全不阻塞,參考storm-starter里的spout,等個50ms好了。在JStorm里,就改為了兩條分開的線程。

另外,spout有時是每次被調用nextTuple()時主動去pull消息的,有時是被動接收消息后存放在 LinkedBlockingQueue里,netxtTuple()時從Queue里取消息的。如果消息源沒有ack機制,Spout突然crash的話,存在queue里的消息也會丟失。

Spout還有個Max Pending的配置,如果有太多消息沒有ack,它就不會再調nextTuple()。但如果上游消息源是主動Push的,消息還是會源源不斷的來,累積在queue里。

RichBolt vs BasicBolt

直接用BasicBolt,會在execute()后自動ack/fail Tuple,而RichBolt則需要自行調用ack/fail。

那什么時候使用RichBolt? Bolt不是在每次execute()時立刻產生新消息,需要異步的發送新消息(比如聚合一段時間的數據再發送)時,又或者想異步的ack/fail原消息時就需要。

BasicBolt的prepare()里并沒有collector參數,只在每次execute()時傳入collector。而RichBolt剛好相反,你可以在初始化時就把collector保存起來,用它在任意時候發送消息。

另外,如果用RichBolt的collector,還要考慮在發送消息時是否帶上傳入的Tuple,如果不帶,則下游的處理節點出錯也不會回溯到Spout重發。用BasicBolt則已默認帶上。

異常處理

如果希望上游的Spout重發消息,則在BasicBolt中拋出FailedException 或在RichBolt中直接fail掉Tuple。
其他情況下,execute()方法不應該拋出任何異常,或者你故意拋出異常使得Topology停轉。

狀態管理

不像Linkedin的Samza,Storm完全不管數據的持久化,Bolt如果需要歷史數據,一般會使用路由規則,比如相同用戶的數據路由到同一個Bolt,然后Bolt自己在內存里管理數據。

當然,也可以用共享的NoSQL存儲如Redis,但此時壓力就都在Redis上了。

定時任務

如下設置,所有Bolt都會在定時收到一條消息,一般用于觸發sliding windows的統計等。

conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 300);

如下函數用于判斷是否Tick消息

protected static boolean isTickTuple(Tuple tuple) {
return tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)
&& tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID);
}

拓撲的定義

除了使用Java代碼,還可以使用Yaml來動態定義拓撲,見 https://github.com/ptgoetz/flux

并發度的定義及命令行動態擴容見官方文檔,另對于worker進程數的建議是Use one worker per topology per machine。

序列化

Tuple除了傳基本類型與數組,AraayList,HashMap外,也可以傳一下Java對象的。Storm使用Kyro作為序列化框架,據測比Hessian什么的都要快和小。但一定注冊這些Java對象的類型,否則就會使用Java默認的序列化。

參看官方文檔,有兩種方式注冊類型,一個是storm.yaml文件,一個是Config類的registerSerialization方法。如無特殊需求,直接注冊需要序列化的類就可以了,不需要自己實現一個Serializer。

Spout和Bolt的構造函數只會在submit Topology時調一次,然后序列化起來,直接發給工作節點,工作節點里實例化時不會被調用里,所以復雜的成員變量記得都定義成transient,在open(),prepare()里初始化及連接數據庫等資源。

另外,需要實現close()函數清理資源,但該函數不承諾在worker進程被殺時保證被調用。

fields grouping的算法

按名稱提取fileds的值,取hash,再按當前的可選Tasks取模。所以,動態擴展Task數量,或某Task失效被重建的話,都可能讓原來的分配完全亂掉。

與其他開源技術的集成

比如External目錄里的一堆,storm-contrib 里也有一堆,目前支持Jdbc,Redis,HBase,HDFS,Hive,甚至還有Esper,目標都是通過配置(比如SQL及Input/Output fields),而非代碼,或盡量少的代碼,實現交互。有時也可以不一定要直接用它們,當成Example Code來看就好了。

另外,與傳統的Java應用思路相比,Bolt/Spout與資源連接時,比較難實現共享連接池的概念,連接池一般都是每個Bolt/Spout實例自用的,要正確處理其連接數量。

HA的實現

如果Worker進程失效,Supervisor進程會檢查 Worker的心跳信息,重新進行創建。

如果整個機器節點失效,Nimbus會在其他節點上重新創建。

Supervisor進程和Nimbus進程,需要用Daemon程序如monit來啟動,失效時自動重新啟動。
因為它們在進程內都不保存狀態,狀態都保存在本地文件和ZooKeeper,因此進程可以隨便殺。

如果Nimbus進程所在的機器都直接倒了,需要在其他機器上重新啟動,Storm目前沒有自建支持,需要自己寫腳本實現。
即使Nimbus進程不在了,也只是不能部署新任務,有節點失效時不能重新分配而已,不影響已有的線程。
同樣,如果Supervisor進程失效,不影響已存在的Worker進程。

Zookeeper本身已經是按至少三臺部署的HA架構了。

運維管理

Storm UI也是用Clojure寫的,比較難改,好在它提供了Restful API,可以與其他系統集成,或基于它重寫一個UI。

Metrics的采樣率是1/20(topology.stats.sample.rate=0.05),即Storm隨機從20個事件里取出一個事件來進行統計,命中的話,counter 直接+20。

在舊版本的Storm使用舊版的ZooKeeper要啟動數據清理的腳本,在新版上只要修改ZooKeeper的配置文件zoo.cfg, 默認是24小時清理一次 autopurge.purgeInterval=24

日志的配置在logback/cluster.xml文件里,Storm的日志,天然的需要Logstash + ElasticSearch的集中式日志方案

storm.local.dir 要自己建,而且不支持~/ 代表用戶根目錄。

storm.yaml的默認值在 https://github.com/apache/storm/blob/master/conf/defaults.yaml

Tunning

1. 內部傳輸機制的各種配置,見文檔

2. 屏蔽ack機制,當可靠傳輸并不是最重要時。可以把Acker數量設為0,可以讓Spout不要發出msgId,或者bolt發送消息時不傳之前的Tuple。

資料

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!