Storm 的可靠性保證測試
Storm 是一個分布式的實時計算框架,可以很方便地對流式數據進行實時處理和分析,能運用在實時分析、在線數據挖掘、持續計算以及分布式 RPC 等場景下。Storm 的實時性可以使得數據從收集到處理展示在秒級別內完成,從而為業務方決策提供實時的數據支持。
在美團點評公司內部,實時計算主要應用場景包括實時日志解析、用戶行為分析、實時消息推送、消費趨勢展示、實時新客判斷、實時活躍用戶數統計等。這些數據提供給各事業群,并作為他們實時決策的有力依據,彌補了離線計算“T+1”的不足。
在實時計算中,用戶不僅僅關心時效性的問題,同時也關心消息處理的成功率。本文將通過實驗驗證 Storm 的消息可靠性保證機制,文章分為消息保證機制、測試目的、測試環境、測試場景以及總結等五節。
Storm 的消息保證機制
Storm 提供了三種不同層次的消息保證機制,分別是 At Most Once、At Least Once 以及 Exactly Once。消息保證機制依賴于消息是否被完全處理。
消息完全處理
每個從 Spout(Storm 中數據源節點)發出的 Tuple(Storm 中的最小消息單元)可能會生成成千上萬個新的 Tuple,形成一棵 Tuple 樹,當整棵 Tuple 樹的節點都被成功處理了,我們就說從 Spout 發出的 Tuple 被完全處理了。 我們可以通過下面的例子來更好地詮釋消息被完全處理這個概念:
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("sentences", new KafkaSpout(spoutConfig), spoutNum);
builder.setBolt("split", new SplitSentence(), 10)
.shuffleGrouping("sentences");
builder.setBolt("count", new WordCount(), 20)
.fieldsGrouping("split", new Fields("word"));
這個 Topology 從 Kafka(一個開源的分布式消息隊列)讀取信息發往下游,下游的 Bolt 將收到的句子分割成單獨的單詞,并進行計數。每一個從 Spout 發送出來的 Tuple 會衍生出多個新的 Tuple,從 Spout 發送出來的 Tuple 以及后續衍生出來的 Tuple 形成一棵 Tuple 樹,下圖是一棵 Tuple 樹示例:
上圖中所有的 Tuple 都被成功處理了,我們才認為 Spout 發出的 Tuple 被完全處理。如果在一個固定的時間內(這個時間可以配置,默認為 30 秒),有至少一個 Tuple 處理失敗或超時,則認為整棵 Tuple 樹處理失敗,即從 Spout 發出的 Tuple 處理失敗。
如何實現不同層次的消息保證機制
Tuple 的完全處理需要 Spout、Bolt 以及 Acker(Storm 中用來記錄某棵 Tuple 樹是否被完全處理的節點)協同完成,如上圖所示。從 Spout 發送 Tuple 到下游,并把相應信息通知給 Acker,整棵 Tuple 樹中某個 Tuple 被成功處理了都會通知 Acker,待整棵 Tuple 樹都被處理完成之后,Acker 將成功處理信息返回給 Spout;如果某個 Tuple 處理失敗,或者超時,Acker 將會給 Spout 發送一個處理失敗的消息,Spout 根據 Acker 的返回信息以及用戶對消息保證機制的選擇判斷是否需要進行消息重傳。
Storm 提供的三種不同消息保證機制中。利用 Spout、Bolt 以及 Acker 的組合我們可以實現 At Most Once 以及 At Least Once 語義,Storm 在 At Least Once 的基礎上進行了一次封裝(Trident),從而實現 Exactly Once 語義。
Storm 的消息保證機制中,如果需要實現 At Most Once 語義,只需要滿足下面任何一條即可:
-
關閉 ACK 機制,即 Acker 數目設置為 0
-
Spout 不實現可靠性傳輸
- Spout 發送消息是使用不帶 message ID 的 API
- 不實現 fail 函數
-
Bolt 不把處理成功或失敗的消息發送給 Acker
如果需要實現 At Least Once 語義,則需要同時保證如下幾條:
-
開啟 ACK 機制,即 Acker 數目大于 0
-
Spout 實現可靠性傳輸保證
- Spout 發送消息時附帶 message 的 ID
- 如果收到 Acker 的處理失敗反饋,需要進行消息重傳,即實現 fail 函數
-
Bolt 在處理成功或失敗后需要調用相應的方法通知 Acker
實現 Exactly Once 語義,則需要在 At Least Once 的基礎上進行狀態的存儲,用來防止重復發送的數據被重復處理,在 Storm 中使用 Trident API 實現。
下圖中,每種消息保證機制中左邊的字母表示上游發送的消息,右邊的字母表示下游接收到的消息。從圖中可以知道,At Most Once 中,消息可能會丟失(上游發送了兩個 A,下游只收到一個 A);At Least Once 中,消息不會丟失,可能重復(上游只發送了一個 B ,下游收到兩個 B);Exactly Once 中,消息不丟失、不重復,因此需要在 At Least Once 的基礎上保存相應的狀態,表示上游的哪些消息已經成功發送到下游,防止同一條消息發送多次給下游的情況。
測試目的
Storm 官方提供 At Most Once、At Least Once 以及 Exactly Once 三種不同層次的消息保證機制,我們希望通過相關測試,達到如下目的:
-
三種消息保證機制的表現,是否與官方的描述相符;
-
At Most Once 語義下,消息的丟失率和什么有關系、關系如何;
-
At Least Once 語義下,消息的重復率和什么有關系、關系如何。
測試環境
本文的測試環境如下: 每個 worker(worker 為一個 物理 JVM 進程,用于運行實際的 Storm 作業)分配 1 CPU 以及 1.6G 內存。Spout、Bolt、Acker 分別跑在單獨的 worker 上。并通過在程序中控制拋出異常以及人工 Kill Spout/Bolt/Acker 的方式來模擬實際情況中的異常情況。
三種消息保證機制的測試均由 Spout 從 Kafka 讀取測試數據,經由相應 Bolt 進行處理,然后發送到 Kafka,并將 Kafka 上的數據同步到 MySQL 方便最終結果的統計,如下圖所示:
測試數據為 Kafka 上順序保存的一系列純數字,數據量分別有十萬、五十萬、一百萬等,每個數字在每個測試樣例中出現且僅出現一次。
測試場景
對于三種不同的消息保證機制,我們分別設置了不同的測試場景,來進行充分的測試。其中為了保證 Spout/Bolt/Acker 發生異常的情況下不影響其他節點,在下面的測試中,所有的節點單獨運行在獨立的 Worker 上。
At Most Once
從背景中可以得知,如果希望實現 At Most Once 語義,將 Acker 的數目設置為 0 即可,本文的測試過程中通過把設置 Acker 為 0 來進行 At Most Once 的測試。
輸入數據
保存在 Kafka 上的一系列純數字,數據量從十萬到五百萬不等,每個測試樣例中,同一個數字在 Kafka 中出現且僅出現一次。
測試結果
異常次數 | 測試數據總量 | 結果集中不同 Tuple 的總量 | 丟失的 Tuple 數據量 | Tuple 的丟失百分比 | Tuple 的重復量 |
---|---|---|---|---|---|
0 | 500000 | 500000 | 0 | 0% | 0 |
0 | 1000000 | 1000000 | 0 | 0% | 0 |
0 | 2000000 | 2000000 | 0 | 0% | 0 |
0 | 3000000 | 3000000 | 0 | 0% | 0 |
異常次數 | 測試數據總量 | 結果集中不同 Tuple 的總量 | 丟失的 Tuple 數據量 | Tuple 的丟失百分比 | Tuple 的重復量 |
---|---|---|---|---|---|
1 | 3000000 | 2774940 | 225060 | 7.50% | 0 |
2 | 3000000 | 2307087 | 692913 | 23.09% | 0 |
3 | 3000000 | 2082823 | 917177 | 30.57% | 0 |
4 | 3000000 | 1420725 | 1579275 | 52.64% | 0 |
結論
不發生異常的情況下,消息能夠不丟不重;Bolt 發生異常的情況下,消息會丟失,不會重復,其中消息的 丟失數目 與 異常次數正相關 。與官方文檔描述相符,符合預期。
At Least Once
為了實現 At Least Once 語義,需要 Spout、Bolt、Acker 進行配合。我們使用 Kafka-Spout 并通過自己管理 offset 的方式來實現可靠的 Spout;Bolt 通過繼承 BaseBasicBolt,自動幫我們建立 Tuple 樹以及消息處理之后通知 Acker;將 Acker 的數目設置為 1,即打開 ACK 機制,這樣整個 Topology 即可提供 At Least Once 的語義。
測試數據
Kafka 上保存的十萬到五十萬不等的純數字,其中每個測試樣例中,每個數字在 Kafka 中出現且僅出現一次。
測試結果
Acker 發生異常的情況
異常的次數 | 測試數據總量 | 結果集中不重復的 Tuple 數 | 數據重復的次數(>1) | 出現重復的 Tuple 數 | 數據丟失數量 | 最大積壓量 |
---|---|---|---|---|---|---|
0 | 100000 | 100000 | - | - | 0 | 2000(默認值) |
0 | 200000 | 200000 | - | - | 0 | 2000 |
0 | 300000 | 300000 | - | - | 0 | 2000 |
0 | 400000 | 400000 | - | - | 0 | 2000 |
異常的次數 | 測試數據總量 | 結果集中不重復的 Tuple 數 | 數據重復的次數(>1) | 出現重復的 Tuple 數 | 數據丟失數量 | 最大積壓量 |
---|---|---|---|---|---|---|
1 | 100000 | 100000 | 2 | 2000 | 0 | 2000 |
2 | 100000 | 100000 | 2 | 4001 | 0 | 2000 |
3 | 100000 | 100000 | 2 | 6000 | 0 | 2000 |
4 | 100000 | 100000 | 2 | 8000 | 0 | 2000 |
Spout 發生異常的情況
異常的次數 | 測試數據總量 | 結果集中不重復的 Tuple 數 | 數據重復的次數(>1) | 出現重復的 Tuple 數 | 數據丟失數量 |
---|---|---|---|---|---|
0 | 100000 | 100000 | - | - | 0 |
0 | 200000 | 200000 | - | - | 0 |
0 | 300000 | 300000 | - | - | 0 |
0 | 400000 | 400000 | - | - | 0 |
異常的次數 | 測試數據總量 | 結果集中不重復的 Tuple 數 | 數據重復的次數(>1) | 出現重復的 Tuple 數 | 數據丟失數量 |
---|---|---|---|---|---|
1 | 100000 | 100000 | 2 | 2052 | 0 |
2 | 100000 | 100000 | 2 | 4414 | 0 |
4 | 100000 | 100000 | 2 | 9008 | 0 |
6 | 100000 | 100000 | 2 | 9690 | 0 |
3 | 1675 | 0 |
Bolt 發生異常的情況
調用 emit 函數之前發生異常
異常次數 | 結果集中不重復的 Tuple 數 | 數據重復的次數 (>1) | 出現重復的 Tuple 數 | 數據丟失量 |
---|---|---|---|---|
0 | 100000 | - | - | 0 |
0 | 200000 | - | - | 0 |
0 | 300000 | - | - | 0 |
0 | 400000 | - | - | 0 |
異常次數 | 結果集中不重復的 Tuple 數 | 數據重復的次數 (>1) | 出現重復的 Tuple 數 | 數據丟失量 |
---|---|---|---|---|
1 | 100000 | - | - | 0 |
2 | 100000 | - | - | 0 |
4 | 100000 | - | - | 0 |
8 | 100000 | - | - | 0 |
10 | 100000 | - | - | 0 |
調用 emit 函數之后發生異常
異常次數 | 結果集中不重復的 Tuple 數 | 數據重復的次數(>1) | 出現重復的 Tuple 數 | 數據丟失數量 |
---|---|---|---|---|
0 | 100000 | - | - | 0 |
0 | 200000 | - | - | 0 |
0 | 300000 | - | - | 0 |
0 | 400000 | - | - | 0 |
異常次數 | 結果集中不重復的 Tuple 數 | 數據重復的次數(>1) | 出現重復的 Tuple 數 | 數據丟失數量 |
---|---|---|---|---|
1 | 100000 | 2 | 2 | 0 |
2 | 100000 | 2 | 3 | 0 |
4 | 100000 | 2 | 5 | 0 |
8 | 100000 | 2 | 9 | 0 |
10 | 100000 | 2 | 11 | 0 |
結論
從上面的表格中可以得到,消息不會丟失,可能發生重復,重復的數目與異常的情況相關。
-
不發生任何異常的情況下,消息不會重復不會丟失。
-
Spout 發生異常的情況下,消息的重復數目約等于 spout.max.pending(Spout 的配置項,每次可以發送的最多消息條數) * NumberOfException(異常次數)。
-
Acker 發生異常的情況下,消息重復的數目等于 spout.max.pending * NumberOfException。
-
Bolt 發生異常的情況:
- emit 之前發生異常,消息不會重復。
- emit 之后發生異常,消息重復的次數等于異常的次數。
結論與官方文檔所述相符,每條消息至少發送一次,保證數據不會丟失,但可能重復,符合預期。
Exactly Once
對于 Exactly Once 的語義,利用 Storm 中的 Trident 來實現。
測試數據
Kafka 上保存的一萬到一百萬不等的數字,每個數字在每次測試樣例中出現且僅出現一次。
測試結果
Spout 發生異常情況
異常數 | 測試數據量 | 結果集中不重復的 Tuple 數 | 結果集中所有 Tuple 的總和 |
---|---|---|---|
1 | 10000 | 10000 | 50005000 |
2 | 10000 | 10000 | 50005000 |
3 | 10000 | 10000 | 50005000 |
Acker 發生異常的情況
異常數 | 測試數據量 | 結果集中不重復的 Tuple 數 | 結果集中所有 Tuple 的總和 |
---|---|---|---|
1 | 10000 | 10000 | 50005000 |
2 | 10000 | 10000 | 50005000 |
3 | 10000 | 10000 | 50005000 |
Bolt 發生異常的情況
異常數 | 測試數據量 | 結果集中不重復的 Tuple 數 | 結果集中所有 Tuple 的總和 |
---|---|---|---|
1 | 10000 | 10000 | 50005000 |
2 | 10000 | 10000 | 50005000 |
3 | 10000 | 10000 | 50005000 |
結論
在所有情況下,最終結果集中的消息不會丟失,不會重復,與官方文檔中的描述相符,符合預期。
總結
對 Storm 提供的三種不同消息保證機制,用戶可以根據自己的需求選擇不同的消息保證機制。
不同消息可靠性保證的使用場景
對于 Storm 提供的三種消息可靠性保證,優缺點以及使用場景如下所示:
可靠性保證層次 | 優點 | 缺點 | 使用場景 |
---|---|---|---|
At most once | 處理速度快 | 數據可能丟失 | 都處理速度要求高,且對數據丟失容忍度高的場景 |
At least once | 數據不會丟失 | 數據可能重復 | 不能容忍數據丟失,可以容忍數據重復的場景 |
Exactly once | 數據不會丟失,不會重復 | 處理速度慢 | 對數據不丟不重性質要求非常高,且處理速度要求沒那么高,比如支付金額 |
如何實現不同層次的消息可靠性保證
對于 At Least Once 的保證需要做如下幾步:
-
需要開啟 ACK 機制,即 Topology 中的 Acker 數量大于零;
-
Spout 是可靠的。即 Spout 發送消息的時候需要附帶 msgId,并且實現失敗消息重傳功能(fail 函數 ,可以參考下面的 Spout 代碼);
-
Bolt 在發送消息時,需要調用 emit(inputTuple, outputTuple)進行建立 anchor 樹(參考下面建立 anchor 樹的代碼),并且在成功處理之后調用 ack ,處理失敗時調用 fail 函數,通知 Acker。
不滿足以上三條中任意一條的都只提供 At Most Once 的消息可靠性保證,如果希望得到 Exactly Once 的消息可靠性保證,可以使用 Trident 進行實現。
不同層測的可靠性保證如何實現
如何實現可靠的 Spout
實現可靠的 Spout 需要在 nextTuple 函數中發送消息時,調用帶 msgID 的 emit 方法,然后實現失敗消息的重傳(fail 函數),參考如下示例:
/**
* 想實現可靠的 Spout,需要實現如下兩點
* 1. 在 nextTuple 函數中調用 emit 函數時需要帶一個 msgId,用來表示當前的消息(如果消息發送失敗會用 msgId 作為參數回調 fail 函數)
* 2. 自己實現 fail 函數,進行重發(注意,在 storm 中沒有 msgId 和消息的對應關系,需要自己進行維護)
*/
public void nextTuple() {
//設置 msgId 和 Value 一樣,方便 fail 之后重發
collector.emit(new Values(curNum + "", round + ""), curNum + ":" + round);
}
@Override
public void fail(Object msgId) {//消息發送失敗時的回調函數
String tmp = (String)msgId; //上面我們設置了 msgId 和消息相同,這里通過 msgId 解析出具體的消息
String[] args = tmp.split(":");
//消息進行重發
collector.emit(new Values(args[0], args[1]), msgId);
}
如何實現可靠的 Bolt
Storm 提供兩種不同類型的 Bolt,分別是 BaseRichBolt 和 BaseBasicBolt,都可以實現可靠性消息傳遞,不過 BaseRichBolt 需要自己做很多周邊的事情(建立 anchor 樹,以及手動 ACK/FAIL 通知 Acker),使用場景更廣泛,而 BaseBasicBolt 則由 Storm 幫忙實現了很多周邊的事情,實現起來方便簡單,但是使用場景單一。如何用這兩個 Bolt 實現(不)可靠的消息傳遞如下所示:
//BaseRichBolt 實現不可靠消息傳遞
public class SplitSentence extends BaseRichBolt {//不建立 anchor 樹的例子
OutputCollector _collector;
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
_collector = collector;
}
public void execute(Tuple tuple) {
String sentence = tuple.getString(0);
for(String word: sentence.split(" ")) {
_collector.emit(new Values(word)); // 不建立 anchor 樹
}
_collector.ack(tuple); //手動 ack,如果不建立 anchor 樹,是否 ack 是沒有區別的,這句可以進行注釋
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
//BaseRichBolt 實現可靠的 Bolt
public class SplitSentence extends BaseRichBolt {//建立 anchor 樹以及手動 ack 的例子
OutputCollector _collector;
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
_collector = collector;
}
public void execute(Tuple tuple) {
String sentence = tuple.getString(0);
for(String word: sentence.split(" ")) {
_collector.emit(tuple, new Values(word)); // 建立 anchor 樹
}
_collector.ack(tuple); //手動 ack,如果想讓 Spout 重發該 Tuple,則調用 _collector.fail(tuple);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
下面的示例會可以建立 Multi-anchoring
List<Tuple> anchors = new ArrayList<Tuple>();
anchors.add(tuple1);
anchors.add(tuple2);
_collector.emit(anchors, new Values(1, 2, 3));
//BaseBasicBolt 是吸納可靠的消息傳遞
public class SplitSentence extends BaseBasicBolt {//自動建立 anchor,自動 ack
public void execute(Tuple tuple, BasicOutputCollector collector) {
String sentence = tuple.getString(0);
for(String word: sentence.split(" ")) {
collector.emit(new Values(word));
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
Trident
在 Trident 中,Spout 和 State 分別有三種狀態,如下圖所示:
其中表格中的 Yes 表示相應的 Spout 和 State 組合可以實現 Exactly Once 語義,No 表示相應的 Spout 和 State 組合不保證 Exactly Once 語義。下面的代碼是一個 Trident 示例:
OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(spoutConf); //Opaque Spout
//TransactionalTridentKafkaSpout spout = new TransactionalTridentKafkaSpout(spoutConf); //Transaction Spout
TridentTopology topology = new TridentTopology();
String spoutTxid = Utils.kafkaSpoutGroupIdBuilder(topologyConfig.kafkaSrcTopic, topologyConfig.topologyName);
Stream stream = topology.newStream(spoutTxid, spout)
.name("new stream")
.parallelismHint(1);
// kafka config
KafkaProducerConfig kafkaProducerConfig = new KafkaProducerConfig(); //KafkaProducerConfig 僅對 kafka 相關配置進行了封裝,具體可以參考 TridentKafkaStateFactory2(Map<String, String> config)
Map<String, String> kafkaConfigs = kafkaProducerConfig.loadFromConfig(topologyConfig);
TridentToKafkaMapper tridentToKafkaMapper = new TridentToKafkaMapper(); //TridentToKafkaMapper 繼承自 TridentTupleToKafkaMapper<String, String>,實現 getMessageFromTuple 接口,該接口中返回 tridentTuple.getString(0);
String dstTopic = "test__topic_for_all";
TridentKafkaStateFactory2 stateFactory = new TridentKafkaStateFactory2(kafkaConfigs);
stateFactory.withTridentTupleToKafkaMapper(tridentToKafkaMapper);
stateFactory.withKafkaTopicSelector(new DefaultTopicSelector(dstTopic));
stream.each(new Fields("bytes"), new AddMarkFunction(), new Fields("word")) //從spout 出來數據是一個 bytes 類型的數據,第二個是參數是自己的處理函數,第三個參數是處理函數的輸出字段
.name("write2kafka")
.partitionPersist(stateFactory //將數據寫入到 Kafka 中,可以保證寫入到 Kafka 的數據是 exactly once 的
, new Fields("word")
, new TridentKafkaUpdater())
.parallelismHint(1);
來自:http://tech.meituan.com/test-of-storms-reliability.html