使用 推ter Storm 處理實時的大數據
流式處理大數據簡介
IBM DW/M. Tim Jones, 獨立作家, 顧問
簡介: Storm 是一個開源的、大數據處理系統,與其他系統不同,它旨在用于分布式實時處理且與語言無關。了解 推ter Storm、它的架構,以及批處理和流式處理解決方案的發展形勢。
Hadoop(大數據分析領域無可爭辯的王者)專注于批處理。這種模型對許多情形(比如為網頁建立索引)已經足夠,但還存在其他一些使用模型,它們 需要來自高度動態的來源的實時信息。為了解決這個問題,就得借助 Nathan Marz 推出的 Storm(現在在 推ter 中稱為 BackType)。Storm 不處理靜態數據,但它處理預計會連續的流數據。考慮到 推ter 用戶每天生成 1.4 億條推文 (tweet),那么就很容易看到此技術的巨大用途。
但 Storm 不只是一個傳統的大數據分析系統:它是復雜事件處理 (CEP) 系統的一個示例。CEP 系統通常分類為計算和面向檢測,其中每個系統都可通過用戶定義的算法在 Storm 中實現。舉例而言,CEP 可用于識別事件洪流中有意義的事件,然后實時地處理這些事件。
Nathan Marz 提供了在 推ter 中使用 Storm 的大量示例。一個最有趣的示例是生成趨勢信息。推ter 從海量的推文中提取所浮現的趨勢,并在本地和國家級別維護它們。這意味著當一個案例開始浮現時,推ter 的趨勢主題算法就會實時識別該主題。這種實時算法在 Storm 中實現為 推ter 數據的一種連續分析。
什么是 “大數據”?大數據 指的是海量無法通過傳統方式管理的數據。互聯網范圍的數據正在推動能夠處理這類新數據的新架構和應用程序的創建。這些架構高度可擴展,且能夠跨無限多的服務器并行、高效地處理數據。
大數據實現
Hadoop 的核心是使用 Java? 語言編寫的,但支持使用各種語言編寫的數據分析應用程序。最新的應用程序的實現采用了更加深奧的路線,以充分利用現代語言和它們的特性。例如,位于伯克利 的加利福尼亞大學 (UC) 的 Spark 是使用 Scala 語言實現的,而 推ter Storm 是使用 Clojure(發音同 closure)語言實現的。
Clojure 是 Lisp 語言的一種現代方言。類似于 Lisp,Clojure 支持一種功能性編程風格,但 Clojure 還引入了一些特性來簡化多線程編程(一種對創建 Storm 很有用的特性)。Clojure 是一種基于虛擬機 (VM) 的語言,在 Java 虛擬機上運行。但是,盡管 Storm 是使用 Clojure 語言開發的,您仍然可以在 Storm 中使用幾乎任何語言編寫應用程序。所需的只是一個連接到 Storm 的架構的適配器。已存在針對 Scala、JRuby、Perl 和 PHP 的適配器,但是還有支持流式傳輸到 Storm 拓撲結構中的結構化查詢語言適配器。
Storm 的關鍵屬性
Storm 實現的一些特征決定了它的性能和可靠性的。Storm 使用 ZeroMQ 傳送消息,這就消除了中間的排隊過程,使得消息能夠直接在任務自身之間流動。在消息的背后,是一種用于序列化和反序列化 Storm 的原語類型的自動化且高效的機制。
Storm 的一個最有趣的地方是它注重容錯和管理。Storm 實現了有保障的消息處理,所以每個元組都會通過該拓撲結構進行全面處理;如果發現一個元組還未處理,它會自動從噴嘴處重放。Storm 還實現了任務級的故障檢測,在一個任務發生故障時,消息會自動重新分配以快速重新開始處理。Storm 包含比 Hadoop 更智能的處理管理,流程會由監管員來進行管理,以確保資源得到充分使用。
Storm 模型
Storm 實現了一種數據流模型,其中數據持續地流經一個轉換實體網絡(參見 圖 1)。一個數據流的抽象稱為一個流,這是一個無限的元組序列。元組就像一種使用一些附加的序列化代碼來表示標準數據類型(比如整數、浮點和字節數組)或用戶定義類型的結構。每個流由一個惟一 ID 定義,這個 ID 可用于構建數據源和接收器 (sink) 的拓撲結構。流起源于噴嘴,噴嘴將數據從外部來源流入 Storm 拓撲結構中。
圖 1. 一個普通的 Storm 拓撲結構的概念性架構
接收器(或提供轉換的實體)稱為螺栓。螺栓實現了一個流上的單一轉換和一個 Storm 拓撲結構中的所有處理。螺栓既可實現 MapReduce 之類的傳統功能,也可實現更復雜的操作(單步功能),比如過濾、聚合或與數據庫等外部實體通信。典型的 Storm 拓撲結構會實現多個轉換,因此需要多個具有獨立元組流的螺栓。噴嘴和螺栓都實現為 Linux? 系統中的一個或多個任務。
可使用 Storm 為詞頻輕松地實現 MapReduce 功能。如 圖 2 中所示,噴嘴生成文本數據流,螺栓實現 Map 功能(令牌化一個流的各個單詞)。來自 “map” 螺栓的流然后流入一個實現 Reduce 功能的螺栓中(以將單詞聚合到總數中)。
圖 2. MapReduce 功能的簡單 Storm 拓撲結構
請注意,螺栓可將數據傳輸到多個螺栓,也可接受來自多個來源的數據。Storm 擁有流分組 的概念,流分組實現了混排 (shuffling)(隨機但均等地將元組分發到螺栓)或字段分組(根據流的字段進行流分區)。還存在其他流分組,包括生成者使用自己的內部邏輯路由元組的能力。
但是,Storm 架構中一個最有趣的特性是有保障的消息處理。Storm 可保證一個噴嘴發射出的每個元組都會處理;如果它在超時時間內沒有處理,Storm 會從該噴嘴重放該元組。此功能需要一些聰明的技巧來在拓撲結構中跟蹤元素,也是 Storm 的重要的附加價值之一。
除了支持可靠的消息傳送外,Storm 還使用 ZeroMQ 最大化消息傳送性能(刪除中間排隊,實現消息在任務間的直接傳送)。ZeroMQ 合并了擁塞檢測并調整了它的通信,以優化可用的帶寬。
Storm 示例演示
現在讓我們通過實現一個簡單的 MapReduce 拓撲結構的代碼(參見 清單 1),看一下 Storm 示例。這個示例使用了來自 Nathan 的 Storm 入門工具包(可從 GitHub 獲取)的巧妙設計的字數示例。此示例演示了 圖 2 中所示的拓撲結構,它實現了一個包含一個螺栓的 map 轉換和包含一個螺栓的 reduce 轉換。
清單 1. 為圖 2 中的 Storm 構建一個拓撲結構TopologyBuilder builder = new TopologyBuilder();builder.setSpout("spout", new RandomSentenceSpout(), 5);
builder.setBolt("map", new SplitSentence(), 4).shuffleGrouping("spout"); builder.setBolt("reduce", new WordCount(), 8).fieldsGrouping("map", new Fields("word"));Config conf = new Config(); conf.setDebug(true);
LocalCluster cluster = new LocalCluster(); cluster.submitTopology("word-count", conf, builder.createTopology());
Thread.sleep(10000);
cluster.shutdown();</pre>
清單 1(添加了行號以供引用)首先使用TopologyBuilder聲明一個新拓撲結構。接下來在第 3 行,定義了一個噴嘴(名為spout),該噴嘴包含一個RandomSentenceSpout。RandomSentenceSpout類(也就是nextTuple方法)發出 5 個隨機句子的其中一個作為它的數據。setSpout方法末尾的5參數是一個并行性提示(或要為此活動創建的任務數)。
在第 5 和 6 行。我定義了第一個螺栓(或算法轉換實體),在本例中為 map(或 split)螺栓。這個螺栓使用SplitSentence令牌化輸入流并將其作為輸出的各個單詞發出。請注意,第 6 行使用了shuffleGrouping,它定義了對此螺栓(在本例中為 “spout”)的輸入訂閱,還將流分組定義為混排。這種混排分組意味著來自噴嘴的輸入將混排 或隨機分發給此螺栓中的任務(該螺栓已提示具有 4 任務并行性)。
在第 8 和 9 行,我定義了最后一個螺栓,這個螺栓實際上用于 reduce 元素,使用該元素的輸入作為 map 螺栓。WordCount方法實現了必要的字數統計行為(將相似的單詞分組到一起,以維護總數),但不是混排的,所以它的輸出是一致的。如果有多個任務在實現 reduce 行為,那么您最終會得到分段的計數,而不是總數。
第 11 和 12 行創建和定義了一個配置對象并啟用了 Debug 模式。Config類包含大量配置可能性。
第 14 和 15 行創建了本地集群(在本例中,用于定義本地模式的用途)。我定義了我的本地集群、配置對象和拓撲結構的名稱(可通過builder類的createTopology元素獲取)。
最后,在第 17 行,Storm 休眠一段時間,然后在第 19 行關閉集群。請記住,Storm 是一個持續運行的操作系統,所以任務可存在相當長時間,不斷處理它們訂閱的流上的新元組。
您可在 Storm 入門工具包中了解這個非常簡單的實現的更多信息,包括噴嘴和螺栓的細節。
使用 Storm
Nathan Marz 編寫了一組簡單易懂的文檔,詳細介紹了如何安裝 Storm 來執行集群模式和本地模式的操作。本地模式無需一個龐大的節點集群,即可使用 Storm。如果需要在一個集群中使用 Storm 但缺乏節點,也可在 Amazon Elastic Compute Cloud (EC2) 中實現一個 Storm 集群。
其他開源的大數據解決方案
自 Google 在 2004 年推出 MapReduce 范式以來,已誕生了多個使用原始 MapReduce 范式(或擁有該范式的質量)的解決方案。Google 對 MapReduce 的最初應用是建立萬維網的索引。盡管此應用程序仍然很流行,但這個簡單模型解決的問題也正在增多。
表 1 提供了一個可用開源大數據解決方案的列表,包括傳統的批處理和流式處理應用程序。在將 Storm 引入開源之前將近一年的時間里,Yahoo! 的 S4 分布式流計算平臺已向 Apache 開源。S4 于 2010 年 10 月發布,它提供了一個高性能計算 (HPC) 平臺,向應用程序開發人員隱藏了并行處理的復雜性。S4 實現了一個可擴展的、分散化的集群架構,并納入了部分容錯功能。
![]()
更多信息
盡管 Hadoop 仍然是宣傳最多的大數據分析解決方案,但仍可能存在許多其他的解決方案,每種解決方案都具有不同的特征。我在過去的文章中探討了 Spark,它納入了數據集的內存中處理功能(能夠重新構建丟失的數據)。但 Hadoop 和 Spark 都專注于大數據集的批處理。Storm 提供了一個新的大數據分析模型,而且因為它最近被開源,所以也引起廣泛的關注。
與 Hadoop 不同,Storm 是一個計算系統,它沒有包括任何存儲概念。這就使得 Storm 能夠用在各種各樣的上下文中,無論數據是從一個非傳統來源動態傳入,還是存儲在數據庫等存儲系統中(或者由一個控制器用于對其他一些設備(比如一個交易系 統)進行實時操作)都是如此。
請參見 參考資料 獲取有關 Storm 的更多信息的鏈接,了解如何讓一個集群正常運行,以及其他大數據分析解決方案(包括批處理和流式處理)。
參考資料
- 復雜事件處理 是 Storm 以及其他許多解決方案(比如 Yahoo! 的 S4)實現的模式。Storm 與 S4 之間的一個重要區別在于,Storm 在面對故障時提供了有保障的消息處理,而 S4 可能丟失消息。
- Nathan Marz(Storm 背后的重要開發人員)為他的新產品編寫了多篇有趣且實用的介紹文章。對 Storm 的最早介紹來自 2011 年 5 月的 Storm 預覽:能夠實時處理的 Hadoop - BackType Technology,隨后是 8 月推出的 A Storm is coming: more details and plans for release。
- Storm 維基 提供了有關 Storm、它的理論基礎的大量優秀文檔,以及有關獲取 Storm 和設置新項目的各種教程。您還將找到一些有關 Storm 的許多方面的實用文檔,包括 Storm 在本地模式、集群模式和在 Amazon 上的使用。
- Spark,一種快速數據分析替代方案(M. Tim Jones,developerWorks,2011 年 11 月)介紹了 UC Berkeley 的內存中彈性數據分析平臺。
- 應用程序虛擬化的過去與未來(M. Tim Jones,developerWorks,2011 年 5 月)詳細介紹了虛擬化在語言抽象方面的使用。Storm 使用基于虛擬機的語言 Clojure 來實現,還使用 Java 技術和許多其他語言來構建它的內部(螺栓)應用程序。
- GitHub 上提供了 Storm 的一個 thorough class tree exists,詳細介紹了 Storm 的類和接口。
- Hadoop 已開始解決簡單批處理以外的模型。例如,通過調度,Hadoop 可調整其處理數據的方式,以便更多地關注交互性,而不是批量數據處理。在 Hadoop 中的調度(M. Tim Jones,developerWorks,2011 年 12 月)中了解有關 Hadoop 調度的更多信息。
</ul>