Spring hadoop之發散之Storm云計算學習摘錄總結
1.Storm概念:
是一個分布式的、容錯的實時計算系統,它被托管在GitHub上,遵循 Eclipse Public License 1.0。Storm是由BackType開發的實時處理系統,BackType現在已在推ter麾下。GitHub上的最新版本是Storm 0.5.2,基本是用Clojure寫的。
Storm為分布式實時計算提供了一組通用原語,可被用于“流處理”之中,實時處理消息并更新數據庫。這是管理隊列及工作者集群的另一種方式。 Storm也可被用于“連續計算”(continuous computation),對數據流做連續查詢,在計算時就將結果以流的形式輸出給用戶。它還可被用于“分布式RPC”,以并行的方式運行昂貴的運算。
2.底層實現原理
https://github.com/mvogiatzis/first-stories-推ter/wiki/Algorithm-logic
3.概念介紹:
- Topologies 用于封裝一個實時計算應用程序的邏輯,類似于Hadoop的MapReduce Job
- Stream 消息流,是一個沒有邊界的tuple序列,這些tuples會被以一種分布式的方式并行地創建和處理
- Spouts 消息源,是消息生產者,他會從一個外部源讀取數據并向topology里面面發出消息:tuple
- Bolts 消息處理者,所有的消息處理邏輯被封裝在bolts里面,處理輸入的數據流并產生輸出的新數據流,可執行過濾,聚合,查詢數據庫等操作
- Task 每一個Spout和Bolt會被當作很多task在整個集群里面執行,每一個task對應到一個線程.
- Stream groupings 消息分發策略,定義一個Topology的其中一步是定義每個tuple接受什么樣的流作為輸入,stream grouping就是用來定義一個stream應該如果分配給Bolts們.
2. Fields Grouping:按字段分組,比如按userid來分組,具有同樣userid的tuple會被分到相同的Bolts,而不同的userid則會被分配到不同的Bolts.
3. All Grouping:廣播發送, 對于每一個tuple,所有的Bolts都會收到.
4. Global Grouping: 全局分組,這個tuple被分配到storm中的一個bolt的其中一個task.再具體一點就是分配給id值最低的那個task.
5. Non Grouping: 不分組,意思是說stream不關心到底誰會收到它的tuple.目前他和Shuffle grouping是一樣的效果,有點不同的是storm會把這個bolt放到這個bolt的訂閱者同一個線程去執行.
6. Direct Grouping: 直接分組,這是一種比較特別的分組方法,用這種分組意味著消息的發送者舉鼎由消息接收者的哪個task處理這個消息.只有被聲明為Direct Stream的消息流可以聲明這種分組方法.而且這種消息tuple必須使用emitDirect方法來發射.消息處理者可以通過TopologyContext來或者處理它的消息的taskid (OutputCollector.emit方法也會返回taskid)
詳細介紹:
3.3 Storm組件 詳細介紹
Storm集群主要由一個主節點和一群工作節點(worker node)組成,通過 Zookeeper進行協調。
3.3.1 主節點:
主節點通常運行一個后臺程序 —— Nimbus,用于響應分布在集群中的節點,分配任務和監測故障。這個很類似于Hadoop中的Job Tracker。
3.3.2工作節點:
工作節點同樣會運行一個后臺程序 —— Supervisor,用于收聽工作指派并基于要求運行工作進程。每個工作節點都是topology中一個子集的實現。而Nimbus和Supervisor之間的協調則通過Zookeeper系統或者集群。
3.3.3 Zookeeper
Zookeeper是完成Supervisor和Nimbus之間協調的服務。而應用程序實現實時的邏輯則被封裝進Storm中的“topology”。topology則是一組由Spouts(數據源)和Bolts(數據操作)通過Stream Groupings進行連接的圖。下面對出現的術語進行更深刻的解析。
3.3.4 Spout:
簡而言之,Spout從來源處讀取數據并放入topology。Spout分成可靠和不可靠兩種;當Storm接收失敗時,可靠的Spout會對tuple(元組,數據項組成的列表)進行重發;而不可靠的Spout不會考慮接收成功與否只發射一次。而Spout中最主要的方法就是nextTuple(),該方法會發射一個新的tuple到topology,如果沒有新tuple發射則會簡單的返回。
3.3.5 Bolt:
Topology中所有的處理都由Bolt完成。Bolt可以完成任何事,比如:連接的過濾、聚合、訪問文件/數據庫、等等。Bolt從Spout中接收數據并進行處理,如果遇到復雜流的處理也可能將tuple發送給另一個Bolt進行處理。而Bolt中最重要的方法是execute(),以新的tuple作為參數接收。不管是Spout還是Bolt,如果將tuple發射成多個流,這些流都可以通過declareStream()來聲明。
3.3.6Stream Groupings:
Stream Grouping定義了一個流在Bolt任務間該如何被切分。這里有Storm提供的6個Stream Grouping類型:
1. 隨機分組(Shuffle grouping):隨機分發tuple到Bolt的任務,保證每個任務獲得相等數量的tuple。
2. 字段分組(Fields grouping):根據指定字段分割數據流,并分組。例如,根據“user-id”字段,相同“user-id”的元組總是分發到同一個任務,不同“user-id”的元組可能分發到不同的任務。
3. 全部分組(All grouping):tuple被復制到bolt的所有任務。這種類型需要謹慎使用。
4. 全局分組(Global grouping):全部流都分配到bolt的同一個任務。明確地說,是分配給ID最小的那個task。
5. 無分組(None grouping):你不需要關心流是如何分組。目前,無分組等效于隨機分組。但最終,Storm將把無分組的Bolts放到Bolts或Spouts訂閱它們的同一線程去執行(如果可能)。
6. 直接分組(Direct grouping):這是一個特別的分組類型。元組生產者決定tuple由哪個元組處理者任務接收。
當然還可以實現CustomStreamGroupimg接口來定制自己需要的分組。
1 TopologyBuilder builder = new TopologyBuilder(); 02 03 builder.setSpout("spout", new RandomSentenceSpout(), 5); 04 05 builder.setBolt("map", new SplitSentence(), 4) 06 .shuffleGrouping("spout"); 07 08 builder.setBolt("reduce", new WordCount(), 8) 09 .fieldsGrouping("map", new Fields("word")); 10 11 Config conf = new Config(); 12 conf.setDebug(true); 13 14 LocalCluster cluster = new LocalCluster(); 15 cluster.submitTopology("word-count", conf, builder.createTopology()); 16 17 Thread.sleep(10000); 18 19 cluster.shutdown();
如果建立自己的Topology(非Transactional的),用戶通常需要利用如下接口和對象:
public interface ISpout extends Serializable {
void open(Map conf, TopologyContext context, SpoutOutputCollector collector);
void close();
void activate();
void deactivate();
void nextTuple();
void ack(Object msgId);
void fail(Object msgId);
}
public interface IBolt extends Serializable {
void prepare(Map stormConf, TopologyContext context, OutputCollector collector);
void execute(Tuple input);
void cleanup();
}
IRichBolt和IRichSpout與IBolt和ISpout的不同在于多了兩個接口:
declareOutputFields(OutputFieldsDeclarer declarer):聲明輸出字段
getComponentConfiguration() :該接口是在0.7.0引入的,用于支持組件級的配置,即允許用戶針對單個Spout或Bolt進行參數配置。
實現了這兩個接口后,通過調用TopologyBuilder建立起Topology。TopologyBuilder實際上是封裝了StormTopology的thrift接口,也就是說Topology實際上是通過thrift定義的一個struct,TopologyBuilder將這個對象建立起來,然后nimbus實際上會運行一個thrift服務器,用于接收用戶提交的結構。由于是采用thrift實現,所以用戶可以用其他語言建立Topology,這樣就提供了比較方便的多語言操作支持。
對于用戶來說,通常需要做的就是提供自己的ISpout和IBlot實現,然后利用TopologyBuilder建立起自己需要的拓撲結構。
Storm框架會拿到用戶提供這個拓撲結構及Spout和Blot對象,驅動整個處理過程。簡單介紹下ISpout的那些接口的調用時機,在創建Spout對象時,會調用open函數。對象銷毀時調用close(),但是框架并不保證close函數一定會被調用,因為進程可能是通過kill -9被殺死的。activate和deactivate是在spout被activate或deactivate時被調用,這兩個動作是由用戶從外部觸發的,Strom的命令行提供兩個命令activate和deactivate,允許用戶activate和deactivate一個Topology,當用戶執行deactivate時,對應Topology的spout會被deactivate,產生影響就是spout的nextTuple此后將不會被調用,直到用戶再調用activate。Spout的核心功能是通過nextTuple實現的,用戶通過該函數完成Tuple的發射。該函數會被框架周期性的調用。會有類似如下的一個循環:
While(true)
{
if(…)
spout.activate();
if(…)
sput.deactivate();
if(…)
spout.nextTupe();
}
首先這三個函數都是在一個線程中,因此不需要同步。其次,nextTuple()不能阻塞,如果沒有Tuple可以發射需要立即返回,用戶不能提供一個阻塞式的實現,否則可能阻塞整個后臺循環。另外,后臺可能會調節nextTuple()的調用頻率,比如系統有一個配置參數可以控制當前被pending的Tuple最大數目,如果達到這個限制,可能就會做一些流控。
ack和fail則是兩個回調函數。Spout在發射出一個tuple后,該tuple會通過acking機制被acker追蹤,除了顯式的fail和ack外,每個tuple有一個超時時間,如果超過這個時間還未確定該tuple的狀態,那么acker會通知spout,這個tuple處理失敗了,然后框架得到這個消息后,就會調用spout的fail函數,如果acker發現這個tuple處理成功了,也會通知spout,然后會調用spout的ack函數。所以通常來說用戶在發射tuple時,要確保數據不丟失,都會將已經發射的tuple緩存起來,然后在ack函數中刪除對應tuple,在fail函數中重發對應的tuple。
另外需要注意的一點是,Spout使用的collector是SpoutOutputCollector,Bolt使用的collector是OutputCollector。這兩個雖然提供的功能類似,都是負責發送tuple的,但是由于一個是面向Spout,一個是面向Bolt的,它們的接口也略有不同。具體如下:
public interface ISpoutOutputCollector {
List<Integer> emit(String streamId, List<Object> tuple, Object messageId);
void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId);
void reportError(Throwable error);
}
Spout通過調用ISpoutOutputCollector的emit函數進行tuple的發射,當然實際上emit函數并未完成實際的發送,它主要是根據用戶提供的streamId,計算出該tuple需要發送到的目標taskID。emitDirect函數,更裸一些,直接指定目標taskID。它們都只是將<tasked,tuple>組成的序列對放到一個隊列中,然后會有另一個線程負責將tuple從隊列中取出并發送到目標task。
public interface IOutputCollector extends IErrorReporter {
List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple);
void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple);
void ack(Tuple input);
void fail(Tuple input);
}
IOutputCollector是會被Bolt調用的,與ISpoutOutputCollector功能類似。但是區別也很明顯,首先我們可以看到它的emit系列函數,多了一個參數Collection<Tuple> anchors,增加這樣一個anchors原因在于,對于spout來說,它產生的tuple就是root tuple,但是對于bolt來說,它是通過一個或多個輸入tuple,進而產生輸出tuple的,這樣tuple之間是有一個父子關系的,anchors就是用于指定當前要emit的這個tuple的所有父親,正是通過它,才建立起tuple樹,如果用戶給了一個空的anchors,那么這個要emit的tuple將不會被加入tuple樹,也就不會被追蹤,即使后面它丟失了,也不會被spout感知。
除了anchors參數外,IOutputCollector還多了ack和fail兩個接口。這兩個接口,與Spout的ack和fail完全不同,對于Spout來說ack和fail是提供給Spout在tuple發送成功或失敗時進行處理的一個機會。而IOutputCollector的ack和fail則是向acker匯報當前tuple的處理狀態的,是需要Bolt在處理完tuple后主動調用的。
5. 生命周期過程
1.在提交了一個topology之后(在nimbus所在的機器), 創建spout/bolt實例(spout/bolt在storm中統稱為component)并進行序列化.
component的初始化操作應該在prepare/open方法中進行, 而不是在實例化component的時候進行.
http://www.blogjava.net/killme2008/archive/2011/11/17/364112.html
http://xumingming.sinaapp.com/127/推ter-storm%E5%A6%82%E4%BD%95%E4%BF%9D%E8%AF%81%E6%B6%88%E6%81%AF%E4%B8%8D%E4%B8%A2%E5%A4%B1/
7.一些常用的操作 有相應的方法支持 無需自己實現 each boardCast 聚合 過濾 等等 BaseFunction
http://blog.csdn.net/derekjiang/article/details/9126185
8.一些參考資源 :
https://github.com/nathanmarz/storm
https://github.com/tdunning/storm-counts
https://github.com/nathanmarz/storm/wiki/Trident-tutorial
https://github.com/nathanmarz/storm-starter
https://github.com/mvogiatzis/first-stories-推ter/wiki/Algorithm-logic
理解了上面
Spont 的基本操作的執行順序 作用 繼承相應的ISpont 或其實現
理解了 上面
Sbold 的基本操作執行順序 作用 繼承相應的ISbold 或其實現
理解了上面分組 拓撲的概念
特別是那個
OutputCollector emit values 與 declareOutputFields declare Fields 關系后 ,一切都明確了
9 此文章是摘錄網上各類文章總結
參考的主要是官方的文檔 網上也有好多中文漢化版本
https://github.com/nathanmarz/storm/wiki/Concepts
https://github.com/nathanmarz/storm/wiki/Trident-tutorial
https://github.com/nathanmarz/storm/wiki/Tutorial
https://github.com/nathanmarz/storm/wiki/Guaranteeing-message-processing
https://github.com/mbonaci/mbo-storm/wiki/Storm-setup-in-Eclipse-with-Maven,-Git-and-GitHub
https://github.com/nathanmarz/storm/wiki/Distributed-RPC
https://github.com/nathanmarz/storm/wiki/Serialization
https://github.com/nathanmarz/storm/wiki/Lifecycle-of-a-topology
https://github.com/nathanmarz/storm/wiki/Spout-implementations
10 。可用的maven源 pom配置
<dependency> <groupId>storm</groupId> <artifactId>storm</artifactId> <version>0.8.2</version> <!-- keep storm out of the jar-with-dependencies --> <scope>provided</scope> </dependency> <dependency> <groupId>org.clojure</groupId> <artifactId>clojure</artifactId> <version>1.5.1</version> </dependency> <dependency> <groupId>org.clojure</groupId> <artifactId>clojure-contrib</artifactId> <version>1.2.0</version> </dependency>個人收集的全部工廠 加上oschina的工廠 不在擔心找不到jar問題
<repositories> <repository> <id>github-releases</id> <url>http://oss.sonatype.org/content/repositories/github-releases/</url> </repository> <repository> <id>clojars.org</id> <url>http://clojars.org/repo</url> </repository> <repository> <id>推ter4j</id> <url>http://推ter4j.org/maven2</url> </repository> <repository> <id>springsource-repo</id> <name>SpringSource Repository</name> <url>http://repo.springsource.org/release</url> </repository> <repository> <releases> <enabled>true</enabled> </releases> <id>bonecp-repo</id> <name>BoneCP Repository</name> <url>http://jolbox.com/bonecp/downloads/maven</url> </repository> <repository> <id>apache</id> <name>apache Repository</name> <url>http://repo1.maven.org/maven2/</url> </repository> <repository> <id>maven</id> <name>opensource-snapshot</name> <url>https://repository.apache.org/content/groups/public/</url> </repository> <repository> <id>spring-releases</id> <name>Spring Maven RELEASE Repository</name> <url>http://maven.springframework.org</url> </repository> <repository> <id>maven2</id> <name>opensource-releases</name> <url>https://repository.apache.org/content/repositories/releases</url> </repository> <repository> <id>jboos</id> <name>opensource-releases</name> <url>http://repository.jboss.com/maven2</url> </repository> <repository> <id>nexus</id> <name>opensource-releases</name> <url>http://repository.sonatype.org/content/groups/public</url> </repository> <repository> <id>oss</id> <name>oss-snapshots</name> <url>https://oss.sonatype.org/content/repositories/snapshots/</url> </repository> <repository> <id>ibiblio</id> <name>opensource-releases</name> <url>http://mirrors.ibiblio.org/pub/mirrors/maven2/org/acegisecurity</url> </repository> <repository> <id>msource</id> <url>http://source.mysema.com/maven2/releases/</url> <releases> <enabled>true</enabled> </releases> <snapshots> <enabled>false</enabled> </snapshots> </repository> <repository> <id>java.net-Public</id> <name>Maven Java Net Snapshots and Releases</name> <url>https://maven.java.net/content/groups/public/</url> </repository> <repository> <id>java.net.m2</id> <name>java.net m2 repo</name> <url>http://download.java.net/maven/2</url> </repository> <repository> <id>alibaba-opensource</id> <name>alibaba-opensource</name> <url>http://code.alibabatech.com/mvn/releases/</url> </repository> <repository> <id>alibaba-opensource-snapshot</id> <name>alibaba-opensource-snapshot</name> <url>http://code.alibabatech.com/mvn/snapshots/</url> </repository> <repository> <id>JBoss Repository</id> <url>https://repository.jboss.org/nexus/content/repositories/releases</url> <name>JBoss Repository</name> </repository> </repositories>
來自:http://my.oschina.net/yilian/blog/175451