Spring hadoop之發散之Storm云計算學習摘錄總結

jopen 11年前發布 | 36K 次閱讀 Storm 分布式/云計算/大數據

1.Storm概念:

  

是一個分布式的、容錯的實時計算系統,它被托管在GitHub上,遵循 Eclipse Public License 1.0。Storm是由BackType開發的實時處理系統,BackType現在已在推ter麾下。GitHub上的最新版本是Storm 0.5.2,基本是用Clojure寫的。

Spring hadoop之發散之Storm云計算學習摘錄總結

Storm為分布式實時計算提供了一組通用原語,可被用于“流處理”之中,實時處理消息并更新數據庫。這是管理隊列及工作者集群的另一種方式。 Storm也可被用于“連續計算”(continuous computation),對數據流做連續查詢,在計算時就將結果以流的形式輸出給用戶。它還可被用于“分布式RPC”,以并行的方式運行昂貴的運算。


2.底層實現原理 

https://github.com/mvogiatzis/first-stories-推ter/wiki/Algorithm-logic


2.1 Storm架構

Storm集群由一個主節點和多個工作節點組成。主節點運行了一個名為“Nimbus”的守護進程,用于分配代碼、布置任務及故障檢測。每個工作節點都運行了一個名為“Supervisor”的守護進程,用于監聽工作,開始并終止工作進程。Nimbus和Supervisor都能快速失敗,而且是無狀態的,這樣一來它們就變得十分健壯,兩者的協調工作是由Zookeeper來完成的。ZooKeeper用于管理集群中的不同組件,ZeroMQ是內部消息系統,JZMQ是ZeroMQMQ的Java Binding。有個名為storm-deploy的子項目,可以在AWS上一鍵部署Storm集群.


3.概念介紹:


3.1 Storm術語解釋

Storm的術語包括Stream、Spout、Bolt、Task、Worker、Stream Grouping和Topology。Stream是被處理的數據。Sprout是數據源。Bolt處理數據。Task是運行于Spout或Bolt中的線程。Worker是運行這些線程的進程。Stream Grouping規定了Bolt接收什么東西作為輸入數據。數據可以隨機分配(術語為Shuffle),或者根據字段值分配(術語為Fields),或者廣播(術語為All),或者總是發給一個Task(術語為Global),也可以不關心該數據(術語為None),或者由自定義邏輯來決定(術語為Direct)。Topology是由Stream Grouping連接起來的Spout和Bolt節點網絡.下面進行詳細介紹:
  • Topologies 用于封裝一個實時計算應用程序的邏輯,類似于HadoopMapReduce Job

  • Stream 消息流,是一個沒有邊界的tuple序列,這些tuples會被以一種分布式的方式并行地創建和處理
  • Spouts 消息源,是消息生產者,他會從一個外部源讀取數據并向topology里面面發出消息:tuple
  • Bolts 消息處理者,所有的消息處理邏輯被封裝在bolts里面,處理輸入的數據流并產生輸出的新數據流,可執行過濾,聚合,查詢數據庫等操作
  • Task 每一個SpoutBolt會被當作很多task在整個集群里面執行,每一個task對應到一個線程.
  • Stream groupings 消息分發策略,定義一個Topology的其中一步是定義每個tuple接受什么樣的流作為輸入,stream grouping就是用來定義一個stream應該如果分配給Bolts.
3.2 stream grouping分類
1. Shuffle Grouping: 隨機分組, 隨機派發stream里面的tuple,保證每個bolt接收到的tuple數目相同.
2. Fields Grouping:按字段分組,比如按userid來分組,具有同樣userid的tuple會被分到相同的Bolts,而不同的userid則會被分配到不同的Bolts.
3. All Grouping:廣播發送, 對于每一個tuple,所有的Bolts都會收到.
4. Global Grouping: 全局分組,這個tuple被分配到storm中的一個bolt的其中一個task.再具體一點就是分配給id值最低的那個task.
5. Non Grouping: 不分組,意思是說stream不關心到底誰會收到它的tuple.目前他和Shuffle grouping是一樣的效果,有點不同的是storm會把這個bolt放到這個bolt的訂閱者同一個線程去執行.
6. Direct Grouping: 直接分組,這是一種比較特別的分組方法,用這種分組意味著消息的發送者舉鼎由消息接收者的哪個task處理這個消息.只有被聲明為Direct Stream的消息流可以聲明這種分組方法.而且這種消息tuple必須使用emitDirect方法來發射.消息處理者可以通過TopologyContext來或者處理它的消息的taskid (OutputCollector.emit方法也會返回taskid)

詳細介紹:


3.3 Storm組件 詳細介紹

Storm集群主要由一個主節點和一群工作節點(worker node)組成,通過 Zookeeper進行協調。

3.3.1 主節點:

主節點通常運行一個后臺程序 —— Nimbus,用于響應分布在集群中的節點,分配任務和監測故障。這個很類似于Hadoop中的Job Tracker。

3.3.2工作節點:

工作節點同樣會運行一個后臺程序 —— Supervisor,用于收聽工作指派并基于要求運行工作進程。每個工作節點都是topology中一個子集的實現。而Nimbus和Supervisor之間的協調則通過Zookeeper系統或者集群。

3.3.3 Zookeeper

Zookeeper是完成Supervisor和Nimbus之間協調的服務。而應用程序實現實時的邏輯則被封裝進Storm中的“topology”。topology則是一組由Spouts(數據源)和Bolts(數據操作)通過Stream Groupings進行連接的圖。下面對出現的術語進行更深刻的解析。

3.3.4 Spout:

簡而言之,Spout從來源處讀取數據并放入topology。Spout分成可靠和不可靠兩種;當Storm接收失敗時,可靠的Spout會對tuple(元組,數據項組成的列表)進行重發;而不可靠的Spout不會考慮接收成功與否只發射一次。而Spout中最主要的方法就是nextTuple(),該方法會發射一個新的tuple到topology,如果沒有新tuple發射則會簡單的返回。

3.3.5 Bolt:

Topology中所有的處理都由Bolt完成。Bolt可以完成任何事,比如:連接的過濾、聚合、訪問文件/數據庫、等等。Bolt從Spout中接收數據并進行處理,如果遇到復雜流的處理也可能將tuple發送給另一個Bolt進行處理。而Bolt中最重要的方法是execute(),以新的tuple作為參數接收。不管是Spout還是Bolt,如果將tuple發射成多個流,這些流都可以通過declareStream()來聲明。

3.3.6Stream Groupings:

Stream Grouping定義了一個流在Bolt任務間該如何被切分。這里有Storm提供的6個Stream Grouping類型:

1. 隨機分組(Shuffle grouping):隨機分發tuple到Bolt的任務,保證每個任務獲得相等數量的tuple。

2. 字段分組(Fields grouping):根據指定字段分割數據流,并分組。例如,根據“user-id”字段,相同“user-id”的元組總是分發到同一個任務,不同“user-id”的元組可能分發到不同的任務。

3. 全部分組(All grouping):tuple被復制到bolt的所有任務。這種類型需要謹慎使用。

4. 全局分組(Global grouping):全部流都分配到bolt的同一個任務。明確地說,是分配給ID最小的那個task。

5. 無分組(None grouping):你不需要關心流是如何分組。目前,無分組等效于隨機分組。但最終,Storm將把無分組的Bolts放到Bolts或Spouts訂閱它們的同一線程去執行(如果可能)。

6. 直接分組(Direct grouping):這是一個特別的分組類型。元組生產者決定tuple由哪個元組處理者任務接收。

當然還可以實現CustomStreamGroupimg接口來定制自己需要的分組

Spring hadoop之發散之Storm云計算學習摘錄總結

main方法    
1  TopologyBuilder builder = new TopologyBuilder();
02          
03  builder.setSpout("spout", new RandomSentenceSpout(), 5);
04          
05  builder.setBolt("map", new SplitSentence(), 4)
06           .shuffleGrouping("spout");
07  
08  builder.setBolt("reduce", new WordCount(), 8)
09           .fieldsGrouping("map", new Fields("word"));
10  
11  Config conf = new Config();
12  conf.setDebug(true);
13  
14  LocalCluster cluster = new LocalCluster();
15  cluster.submitTopology("word-count", conf, builder.createTopology());
16  
17  Thread.sleep(10000);
18  
19  cluster.shutdown();

如果建立自己的Topology(Transactional),用戶通常需要利用如下接口和對象:

IRichBolt

IRichSpout

TopologyBuilder

public interface ISpout extends Serializable {

void open(Map conf, TopologyContext context, SpoutOutputCollector collector);

void close();

void activate();

void deactivate();

void nextTuple();

void ack(Object msgId);

void fail(Object msgId);

}

public interface IBolt extends Serializable {

       void prepare(Map stormConf, TopologyContext context, OutputCollector collector);

       void execute(Tuple input);

       void cleanup();

}

IRichBoltIRichSpoutIBoltISpout的不同在于多了兩個接口:

declareOutputFields(OutputFieldsDeclarer declarer):聲明輸出字段

getComponentConfiguration() :該接口是在0.7.0引入的,用于支持組件級的配置,即允許用戶針對單個SpoutBolt進行參數配置。

 實現了這兩個接口后,通過調用TopologyBuilder建立起TopologyTopologyBuilder實際上是封裝了StormTopologythrift接口,也就是說Topology實際上是通過thrift定義的一個structTopologyBuilder將這個對象建立起來,然后nimbus實際上會運行一個thrift服務器,用于接收用戶提交的結構。由于是采用thrift實現,所以用戶可以用其他語言建立Topology,這樣就提供了比較方便的多語言操作支持。

 對于用戶來說,通常需要做的就是提供自己的ISpoutIBlot實現,然后利用TopologyBuilder建立起自己需要的拓撲結構。

 Storm框架會拿到用戶提供這個拓撲結構及SpoutBlot對象,驅動整個處理過程。簡單介紹下ISpout的那些接口的調用時機,在創建Spout對象時,會調用open函數。對象銷毀時調用close(),但是框架并不保證close函數一定會被調用,因為進程可能是通過kill -9被殺死的。activatedeactivate是在spoutactivatedeactivate時被調用,這兩個動作是由用戶從外部觸發的,Strom的命令行提供兩個命令activatedeactivate,允許用戶activatedeactivate一個Topology,當用戶執行deactivate時,對應Topologyspout會被deactivate,產生影響就是spoutnextTuple此后將不會被調用,直到用戶再調用activateSpout的核心功能是通過nextTuple實現的,用戶通過該函數完成Tuple的發射。該函數會被框架周期性的調用。會有類似如下的一個循環:

While(true)

{

    if(…)

spout.activate();

if(…)

sput.deactivate();

if(…)

    spout.nextTupe();

}

首先這三個函數都是在一個線程中,因此不需要同步。其次,nextTuple()不能阻塞,如果沒有Tuple可以發射需要立即返回,用戶不能提供一個阻塞式的實現,否則可能阻塞整個后臺循環。另外,后臺可能會調節nextTuple()的調用頻率,比如系統有一個配置參數可以控制當前被pendingTuple最大數目,如果達到這個限制,可能就會做一些流控。

 ackfail則是兩個回調函數。Spout在發射出一個tuple后,該tuple會通過acking機制被acker追蹤,除了顯式的failack外,每個tuple有一個超時時間,如果超過這個時間還未確定該tuple的狀態,那么acker會通知spout,這個tuple處理失敗了,然后框架得到這個消息后,就會調用spoutfail函數,如果acker發現這個tuple處理成功了,也會通知spout,然后會調用spoutack函數。所以通常來說用戶在發射tuple時,要確保數據不丟失,都會將已經發射的tuple緩存起來,然后在ack函數中刪除對應tuple,在fail函數中重發對應的tuple

 另外需要注意的一點是,Spout使用的collectorSpoutOutputCollectorBolt使用的collectorOutputCollector。這兩個雖然提供的功能類似,都是負責發送tuple的,但是由于一個是面向Spout,一個是面向Bolt的,它們的接口也略有不同。具體如下:

public interface ISpoutOutputCollector {

       List<Integer> emit(String streamId, List<Object> tuple, Object messageId);

       void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId);

void reportError(Throwable error);

}

Spout通過調用ISpoutOutputCollectoremit函數進行tuple的發射,當然實際上emit函數并未完成實際的發送,它主要是根據用戶提供的streamId,計算出該tuple需要發送到的目標taskIDemitDirect函數,更裸一些,直接指定目標taskID。它們都只是將<tasked,tuple>組成的序列對放到一個隊列中,然后會有另一個線程負責將tuple從隊列中取出并發送到目標task

 public interface IOutputCollector extends IErrorReporter {

List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple);

void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple);

void ack(Tuple input);

void fail(Tuple input);

}

IOutputCollector是會被Bolt調用的,與ISpoutOutputCollector功能類似。但是區別也很明顯,首先我們可以看到它的emit系列函數,多了一個參數Collection<Tuple> anchors,增加這樣一個anchors原因在于,對于spout來說,它產生的tuple就是root tuple,但是對于bolt來說,它是通過一個或多個輸入tuple,進而產生輸出tuple的,這樣tuple之間是有一個父子關系的,anchors就是用于指定當前要emit的這個tuple的所有父親,正是通過它,才建立起tuple樹,如果用戶給了一個空的anchors,那么這個要emittuple將不會被加入tuple樹,也就不會被追蹤,即使后面它丟失了,也不會被spout感知。

 除了anchors參數外,IOutputCollector還多了ackfail兩個接口。這兩個接口,與Spoutackfail完全不同,對于Spout來說ackfail是提供給Spouttuple發送成功或失敗時進行處理的一個機會。而IOutputCollectorackfail則是向acker匯報當前tuple的處理狀態的,是需要Bolt在處理完tuple后主動調用的。

5. 生命周期過程


1.在提交了一個topology之后(在nimbus所在的機器), 創建spout/bolt實例(spout/bolt在storm中統稱為component)并進行序列化.


2.將序列化的component發送給所有的任務所在的機器
3.在每一個任務上反序列化component.
4.在開始執行任務之前, 先執行component的初始化方法(bolt是prepare, spout是open).

 component的初始化操作應該在prepare/open方法中進行, 而不是在實例化component的時候進行.

http://www.blogjava.net/killme2008/archive/2011/11/17/364112.html

6.保證處理完成


http://xumingming.sinaapp.com/127/推ter-storm%E5%A6%82%E4%BD%95%E4%BF%9D%E8%AF%81%E6%B6%88%E6%81%AF%E4%B8%8D%E4%B8%A2%E5%A4%B1/

7.一些常用的操作 有相應的方法支持 無需自己實現 each boardCast 聚合 過濾 等等  BaseFunction

http://blog.csdn.net/derekjiang/article/details/9126185

8.一些參考資源

https://github.com/nathanmarz/storm

https://github.com/tdunning/storm-counts

https://github.com/nathanmarz/storm/wiki/Trident-tutorial

https://github.com/nathanmarz/storm-starter

https://github.com/mvogiatzis/first-stories-推ter/wiki/Algorithm-logic

理解了上面

Spont 的基本操作的執行順序 作用  繼承相應的ISpont 或其實現

理解了 上面

Sbold 的基本操作執行順序 作用 繼承相應的ISbold 或其實現

理解了上面分組 拓撲的概念 

特別是那個

OutputCollector emit values 與 declareOutputFields declare Fields 關系后 ,一切都明確了


9 此文章是摘錄網上各類文章總結

參考的主要是官方的文檔 網上也有好多中文漢化版本

https://github.com/nathanmarz/storm/wiki/Concepts

https://github.com/nathanmarz/storm/wiki/Trident-tutorial


https://github.com/nathanmarz/storm/wiki/Tutorial

https://github.com/nathanmarz/storm/wiki/Guaranteeing-message-processing

https://github.com/mbonaci/mbo-storm/wiki/Storm-setup-in-Eclipse-with-Maven,-Git-and-GitHub

https://github.com/nathanmarz/storm/wiki/Distributed-RPC

https://github.com/nathanmarz/storm/wiki/Serialization

https://github.com/nathanmarz/storm/wiki/Lifecycle-of-a-topology

https://github.com/nathanmarz/storm/wiki/Spout-implementations

10 。可用的maven源 pom配置

<dependency>
            <groupId>storm</groupId>
            <artifactId>storm</artifactId>
            <version>0.8.2</version>
            <!-- keep storm out of the jar-with-dependencies -->
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.clojure</groupId>
            <artifactId>clojure</artifactId>
            <version>1.5.1</version>
        </dependency>
        <dependency>
            <groupId>org.clojure</groupId>
            <artifactId>clojure-contrib</artifactId>
            <version>1.2.0</version>
        </dependency>
個人收集的全部工廠 加上oschina的工廠 不在擔心找不到jar問題

<repositories>
<repository>
            <id>github-releases</id>
            <url>http://oss.sonatype.org/content/repositories/github-releases/</url>
        </repository>
        <repository>
            <id>clojars.org</id>
            <url>http://clojars.org/repo</url>
        </repository>
        <repository>
            <id>推ter4j</id>
            <url>http://推ter4j.org/maven2</url>
       </repository>

     <repository>
    <id>springsource-repo</id>
    <name>SpringSource Repository</name>
    <url>http://repo.springsource.org/release</url>
    </repository>
       <repository>  
        <releases>  
            <enabled>true</enabled>  
        </releases>  
        <id>bonecp-repo</id>  
        <name>BoneCP Repository</name>  
        <url>http://jolbox.com/bonecp/downloads/maven</url>  
    </repository>  
     <repository>
    <id>apache</id>
    <name>apache Repository</name>
    <url>http://repo1.maven.org/maven2/</url>
    </repository>


        <repository>
            <id>maven</id>
            <name>opensource-snapshot</name>
            <url>https://repository.apache.org/content/groups/public/</url>

        </repository>

        <repository>
    <id>spring-releases</id>
    <name>Spring Maven RELEASE Repository</name>
    <url>http://maven.springframework.org</url>
        </repository>
        <repository>
            <id>maven2</id>
            <name>opensource-releases</name>
            <url>https://repository.apache.org/content/repositories/releases</url>

        </repository>
            <repository>
            <id>jboos</id>
            <name>opensource-releases</name>
            <url>http://repository.jboss.com/maven2</url>

        </repository>
            <repository>
            <id>nexus</id>
            <name>opensource-releases</name>
            <url>http://repository.sonatype.org/content/groups/public</url>
        </repository>

            <repository>
            <id>oss</id>
            <name>oss-snapshots</name>
            <url>https://oss.sonatype.org/content/repositories/snapshots/</url>
        </repository>

            <repository>
            <id>ibiblio</id>
            <name>opensource-releases</name>
            <url>http://mirrors.ibiblio.org/pub/mirrors/maven2/org/acegisecurity</url>
        </repository>


      <repository>
            <id>msource</id>
            <url>http://source.mysema.com/maven2/releases/</url>
            <releases>
                <enabled>true</enabled>
            </releases>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </repository>
         <repository>
            <id>java.net-Public</id>
            <name>Maven Java Net Snapshots and Releases</name>
            <url>https://maven.java.net/content/groups/public/</url>
        </repository>
    <repository>
    <id>java.net.m2</id>
    <name>java.net m2 repo</name>
    <url>http://download.java.net/maven/2</url>
  </repository>
        <repository>
            <id>alibaba-opensource</id>
            <name>alibaba-opensource</name>
            <url>http://code.alibabatech.com/mvn/releases/</url>

        </repository>
        <repository>
            <id>alibaba-opensource-snapshot</id>
            <name>alibaba-opensource-snapshot</name>
            <url>http://code.alibabatech.com/mvn/snapshots/</url>

        </repository>
        <repository>
  <id>JBoss Repository</id>
  <url>https://repository.jboss.org/nexus/content/repositories/releases</url>
  <name>JBoss Repository</name>
  </repository>
    </repositories>

來自:http://my.oschina.net/yilian/blog/175451

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!