[Storm中文文檔]Trident教程

Trident是一個基于Storm的用于實時計算的高級抽象原語。它支持高吞吐(每秒百萬級別),有狀態的流處理,并且還能夠提供低延時的分布式查詢功能。如果你熟悉一些比較高級的批處理工具,比如Pig和Cascading,那么對于Trident你應該有一種似曾相識的感覺。Trident具有連接,聚合,分組,自定義行為和過濾的功能。除此之外,Trident能夠基于內存或者數據庫做有狀態的,增量式的計算。Trident本身能夠保證每個Tuple嚴格只被執行一次,所以使用Trident很容易構建一個靠譜的Topology。

Illustrative example

下面通過一個例子介紹Trident。這個例子需要做兩件事:

  1. 從一個能產生句子的輸入流中實時計算各個單詞的數量;
  2. 實現查詢功能:輸入一個句子,句子中每個單詞用空格分隔,查詢這個句子中所有單詞出現的數量的總和。

出于演示目的,本例將從一個能夠產生無限英文句子的輸入流中讀取數據:

FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3,
               new Values("the cow jumped over the moon"),
               new Values("the man went to the store and bought some candy"),
               new Values("four score and seven years ago"),
               new Values("how many apples can you eat"));
spout.setCycle(true);

該Spout能夠循環產生無限的英文語句,下面的代碼是計算單詞出現次數的部分代碼:

TridentTopology topology = new TridentTopology();        
TridentState wordCounts =
     topology.newStream("spout1", spout)
       .each(new Fields("sentence"), new Split(), new Fields("word"))
       .groupBy(new Fields("word"))
       .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))                
       .parallelismHint(6);

我們一行一行地對照上面的代碼來作說明。首先創建一個TridentTopology,它提供了用于構建Trident實時計算程序的一些接口。TridentTopology有一個函數叫做 newStream ,它通過一個指定的Spout創建一個新的數據輸入流。在本例中,輸入流僅僅是一個比較簡單的FixedBatchSpout。輸入流也可以是消息隊列,比如Kestrel和Kafka。Trident在Zookeeper保存每一個從輸入流中讀取的Tuple的處理信息,在上面的代碼中,字符串”spout1”表示這些Tuple的處理信息在Zookeeper上的存儲路徑。

Trident是將輸入數據分成許多小塊做批量處理的。例如,本例中輸入的數據流有可能被分割成如下這樣的小塊:

通常來說,每一個batch可能包含幾千到上百萬的Tuple,這完全取決于輸入的數據量。

Trident提供了一整套比較完整的API來處理這些batch中的數據。這些API與Pig和Cascading的API非常類似:能夠分組,連接,聚合,執行自定義行為,還能進行過濾等等。當然,獨立處理每一個batch并沒有多大意義,所以,Trident提供了跨batch的數據聚合與存儲功能,比如存儲在Memory,Memcache,Cassandra或者其他存儲設備。此外,Trident還能提供一流的實時查詢功能。這些狀態能夠被Trident更新(就像上面的例子),也能作為一個獨立的狀態源存在(筆者注:這句話不太理解!)。

回到上面的例子,Spout發射了一個包含sentence字段的數據流。下一行代碼定義了一個函數Split用于處理輸入流中的每一個Tuple:獲取sentence字段并將其切割成很多單詞。每一個sentence類型的Tuple將會衍生出很多word類型的Tuple:比如本例中’the cow jumped over the moon’這個sentence類型的Tuple將會衍生出6個word類型的Tuple。下面是Split函數的定義:

public class Split extends BaseFunction {
   public void execute(TridentTuple tuple, TridentCollector collector) {
       String sentence = tuple.getString(0);
       for(String word: sentence.split(" ")) {
           collector.emit(new Values(word));                
       }
   }
}

正如我們所見到的,Split函數的定義非常簡單。僅僅是獲取sentence,然后用空格切割成很多word,然后分別發射這些word。

剩余的代碼計算word出現的次數并將結果保存起來。首先數據流按照word字段分組,然后每一個分組都被自定義聚合器Count聚合。persistentAggregate函數知道怎么存儲和更新計算結果的狀態。在本例中,單詞出現的次數被保存在內存中,但是也可以替換成其他的存儲設備,比如Memcached,Cassandra等等。比如替換存儲設備為Memcached很簡單,只需要將包含persistentAggregate的這一行代碼替換成下面的代碼即可,這兒的serverLocations表示的是Memcached集群的機器的host/ip列表:

.persistentAggregate(MemcachedState.transactional(serverLocations), new Count(), new Fields("count"))        
MemcachedState.transactional()

persistentAggregate存儲的值就是所有batch聚合之后的值。

Trident一個比較酷的事情就是它是完全容錯的,保證數據嚴格只被執行一次的。這使得我們很容易構建靠譜的實時計算系統。Trident存儲每個Tuple的處理狀態,以便在有錯誤發生時,可以恢復數據并且防止一個數據被重復處理。

persistentAggregate方法會將數據流轉換成TridentState對象。在本例中TridentState對象就是所有單詞的統計數據。我們將會運用這個TridentState對象來實現一個分布式的實時查詢系統。

Topology的另一個部分就是實現一個低延時的分布式查詢系統:輸入一個句子,句子中的單詞用空格分隔,查詢系統返回這些單詞的統計數據的和。除了在后臺是并行執行的,這種查詢和普通的RPC調用沒有啥區別。下面是一個查詢的例子:

DRPCClient client = new DRPCClient("drpc.server.location", 3772);
System.out.println(client.execute("words", "cat dog the man");
// prints the JSON-encoded result, e.g.: "[[5078]]"

正如你所見到的,它和普通的RPC調用完全沒有區別,除了它是在Storm集群間并行執行的。對于一般的小型的查詢,耗時大概在10ms,當然越重的查詢花費的時間會更長,盡管延時還受到所分配的資源多少的影響。

實現分布式查詢系統的代碼如下:

topology.newDRPCStream("words")
       .each(new Fields("args"), new Split(), new Fields("word"))
       .groupBy(new Fields("word"))
       .stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count"))
       .each(new Fields("count"), new FilterNull())
       .aggregate(new Fields("count"), new Sum(), new Fields("sum"));

相同的TridentTopology被用來創建一個新的DRPC流,這個函數命名為words。當執行查詢操作的時候,這個名字會作為DRPCClient的第一個參數。

每一個DRPC請求會被當做一個只有一個Tuple的batch處理。Tuple包含一個叫做args的字段,該字段表示DRPC客戶端提供的參數,在本例中,該參數就是一個sentence。

首先,Split函數用來將輸入的sentence切割成很多個word。然后數據流會按照word分組,然后stateQuery函數將會在第一部分創建的TridentState(wordCounts)上執行查詢操作。StateQuery接收一個數據源(在本例中,就是已經計算好的單詞統計數據)和一個用于查詢的函數作為輸入。在本例中,我們使用MapGet函數來獲取單詞的統計值。由于單詞是按照和構建TridentState時一樣的方法(按照word分組)分組的,所以每一個單詞的查詢請求都會被路由到管理和更新該單詞的統計數據的分區中取執行。

下一步,沒有出現過的單詞(count值為0)將會被Storm內建的FilterNull過濾器過濾,然后Sum函數將會把所有單詞的統計數據做一次求和操作,保存到sum字段中,并返回給DRPC客戶端。

Trident致力于提高Topology結構的性能。在上面的示例中,Trident自動完成了兩件事:

  1. 讀取和寫入操作會自動使用batch的方式執行,如果有20次更新需要被同步到數據庫中,Trident會自動將這些操作匯總到一起,只做一次讀寫操作,而不是20次。因此Trident可以在方便你計算的同時提高極高的性能。
  2. Trident對聚合操作做了極大的優化。Trident并不是簡單地把一個Group中所有的Tuple都發送到同一個機器上進行聚合,如果條件允許,Trident在將數據通過網路發送之前已經做了部分聚合操作,Count聚合器在每一個分區分別計算統計值,然后通過網絡發送分區的計算結果,然后將所有分區的計算結果進行匯總得到總的結果。這與MapReduce的計算模型非常相似。

再看一個Trident的例子:

Reach

這個例子是一個用于計算一個給定的URL的Reach的純粹的DRPC Topology。什么是URL的Reach?Reach是指在推ter上看到過一個URL的不同的人的數量,要計算Reach值,首先你要獲取曾經發布過該URL的所有人,然后獲取所有這些人的所有粉絲,然后將這些粉絲做唯一化處理,得到的數字就是該URL的Reach值。如果使用單機計算URL的Reach值,這將會是一個非常繁重的任務,因為這將會產生成千上萬的數據庫請求,千萬級別的Tuple數量。使用Storm和Trident,你能夠將上面說到的這些步驟在集群機器間作并行計算。

該Topoloy將會從兩個地方讀取數據。一個用于讀取曾經發布過該URL的人,另一個用于讀取這些人的粉絲。定義如下:

TridentState urlToTweeters =
       topology.newStaticState(getUrlToTweetersState());
TridentState tweetersToFollowers =
       topology.newStaticState(getTweeterToFollowersState());

topology.newDRPCStream("reach")
       .stateQuery(urlToTweeters, new Fields("args"), new MapGet(), new Fields("tweeters"))
       .each(new Fields("tweeters"), new ExpandList(), new Fields("tweeter"))
       .shuffle()
       .stateQuery(tweetersToFollowers, new Fields("tweeter"), new MapGet(), new Fields("followers"))
       .parallelismHint(200)
       .each(new Fields("followers"), new ExpandList(), new Fields("follower"))
       .groupBy(new Fields("follower"))
       .aggregate(new One(), new Fields("one"))
       .parallelismHint(20)
       .aggregate(new Count(), new Fields("reach"));

上面的的topology使用newStaticState方法創建一個TridentState對象,表示一種外部存儲,使用這個TridentState對象,我們就能在該topology上面進行查詢了,和其他所有狀態源一樣,查詢請求會自動轉換成batch類型的請求,以提高性能。

這個拓撲的定義非常簡單,它就是一個簡單的批處理任務。首先,urlToTweeters用于查詢曾經發布過該URL的人,返回一個列表,然后使用ExpandList函數將該列表轉換成一個個的Tuple分別發射出去。

接下來,我們獲取每一個tweeter的followers。我們使用suffle函數將需要處理的tweeter分配到集群的每一個分區,然后集群的每一個分區會分別獲取他們收到的tweeter的follower,可以為該步驟設置很大的并行度,提高查詢性能。

接下來,tweeter的所有follower需要去重,這個可以分兩個步驟完成:首先按照follower分組,然后使用One聚合器對每一個分組進行聚合,One聚合器僅僅是簡單地為每一個分組發射一個Tuple(重復的Tuple被舍棄),保存在one字段中,然后使用Count聚合器統計Tuple的數量,存入reach字段中,這個值就是URL的Reach值。One聚合器的定義如下:

public class One implements CombinerAggregator<Integer> {
   public Integer init(TridentTuple tuple) {
       return 1;
   }

   public Integer combine(Integer val1, Integer val2) {
       return 1;
   }

   public Integer zero() {
       return 1;
   }        
}

這是一個”combiner aggregator”,為了提高性能,它會在將Tuple通過網絡傳輸之前做部分聚合操作。Sum集合器也是一個”combiner aggregator”,因此在最后計算總值是非常高效的。

再看看Trident更細節的東西.

Fields and tuples

Trident的數據模型就是一個TridentTuple,它是一個命名的值列表。在topology中,TridentTuple是在一系列的計算中增量產生的。這些操作一般以一組字段作為輸入,然后產生一組輸出字段,輸入一般是輸入Tuple的一組子字段。

參照如下的例子,假設你有一個輸入流,命名為stream,它包含三個字段: x, y, z, 為了運行一個過濾器,并且這個過濾器只接受輸入流中的y字段,我們可以這樣寫:

stream.each(new Fields("y"), new MyFilter())

假設MyFilter的定義如下:

public class MyFilter extends BaseFilter {
   public boolean isKeep(TridentTuple tuple) {
       return tuple.getInteger(0) < 10;
   }
}

只保留y字段的值小于10的Tuple。TridentTuple傳給MyFilter過濾器的輸入Tuple包含一個字段y。需要注意的是,選擇一個Tuple的某些字段的這個操作是非常高效的。

接下來看看”function fields”是如何工作的。假設你定義了一個函數:

public class AddAndMultiply extends BaseFunction {
   public void execute(TridentTuple tuple, TridentCollector collector) {
       int i1 = tuple.getInteger(0);
       int i2 = tuple.getInteger(1);
       collector.emit(new Values(i1 + i2, i1 * i2));
   }
}

該函數接收兩個數字同時發射兩個數字:輸入的兩個數字的和和輸入的兩個數字的積。假設你有一個輸入流,包含x,y,z三個字段,你可以像下面這樣使用上面的函數:

stream.each(new Fields("x", "y"), new AddAndMultiply(), new Fields("added", "multiplied"));

函數的輸入字段是追加到輸入的字段后面的,所以該函數執行后的輸出會包含5個字段:x,y,z, added, multiplied,added字段是AddAndMultiply發射的第一個字段,multiplied是第二個字段。

但是對于聚合操作,新產生的字段將會替換輸入的字段。所以,如果你有一個輸入流,包含val1,val2兩個字段,當你做如下的操作:

stream.aggregate(new Fields("val2"), new Sum(), new Fields("sum"))

之后,輸出的Tuple會只包含sum一個字段,表示該batch種所有val2字段的值的總和。

對于分組之后再聚合的操作,輸出字段將會包含分組的字段和聚合操作新產生的字段。比如:

stream.groupBy(new Fields("val1"))
     .aggregate(new Fields("val2"), new Sum(), new Fields("sum"))

上面的例子的輸出結果中將會包含val1和sum兩個字段。

State

實時計算系統一個比較關鍵的問題就是如何確保在有錯誤發生或者重試時還能保證計算的正確性。出問題是無法避免的,所以當某個節點宕機或者其他錯誤出現時,我們需要重試。現在的問題是:如何保證在重試過程中,一條數據值被處理一次?

這是一個比較困難的問題,我們通過一個例子進行說明。假設你現在正在統計輸入流中處理過的Tuple的數量,并且需要將統計結果保存到數據庫中。如果你僅僅在數據庫中存儲一個統計值,現在如果你想進行一次狀態更新,那么你將無法知道當前的這個Tuple是否之前已經被處理過,或者之前嘗試處理過,數據庫也更新成功,但是之后的某些步驟失敗了,亦或者其他步驟都處理成功了,但是更新數據庫失敗了。

要解決這個問題需要做如下兩件事情:

  1. 為每一個batch分配一個唯一的transaction id(txid),當batch數據重試時,該batch會具有和之前一樣的txid;
  2. 數據的更新操作在batch之間是強有序的。也就是說,如果batch 2更新完成之前,batch 3不允許更新。

具備以上兩個條件之后,你就能實現有且僅有一次更新的目的。除了保存統計值之外,你還需要將txid也保存在數據庫中。當更新數據時,你就可以將數據庫中保存的txid和當前處理的batch的txid做比較。如果兩者相等,你應該忽略該batch,因為batch之間的處理是強有序的,你能夠跟確定當前batch在之前已經被處理過了;如果兩者不一致,則表示當前batch之前沒有處理過,你需要在數據庫中更新數據。

當然,上面說到的這些操作都不需要你自己實現,他們都已經被Trident封裝在內部的實現里,并且會自動實現。如果你不想在數據庫中花費外的空間去存儲txid,你可以不做。但是在這種情況下,Trident只能保證一個Tuple至少被處理一次,無法保證只被處理一次。可以參考 這篇文檔 解更多關于Trident State的知識。

State允許你使用任何策略來保存狀態。所以它可以將狀態保存在外部的數據庫,也可以保存在內存中并備份到HDFS中(類似于Hbase的工作模式)。State并不需要永久保存狀態,例如,你可以實現一個內存版的State,僅僅保存最近的X個小時的數據,老數據直接丟棄。可以參照 這個項目 看看Memcached integration的實現。

Execution of Trident topologies

Trident的Topology會被編譯成效率最高的Storm Topology。只有在需要對數據進行repartition的時候(如groupby或者shuffle)才會把tuple通過network發送出去,比如你有一個Trident Topology如下:

它將被編譯成如下的Storm Topology:

英文文檔原文地址: http://storm.apache.org/documentation/Trident-tutorial.html

聲明

來自: http://qifuguang.me/2015/11/27/Storm中文文檔-Trident教程/

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!