含 Apache Spark 的 Lambda 架構
目標
市場上很多玩家已經建造了MapReduce工作流用來日常處理兆兆字節的歷史數據。但是誰愿意等待24小時來拿到更新后的分析報告?這篇文章會向你介紹 Lambda Architecture ,它被設計出來既可以利用批量處理方法,也可以使用流式處理方法。這樣我們就可以利用Apache Spark(核心, SQL, 流),Apache Parquet,推ter Stream等工具處理實時流式數據,實現對歷史數據的快速訪問。代碼簡潔干凈,而且附上直接明了的實例!
Apache Hadoop: 簡要歷史
Apache Hadoop的豐富歷史開始于大約2002年。Hadoop是Doug Cutting創立的, 他也是Apache Lucene這一被廣泛使用的文本檢索庫的創造者. Hadoop的起源與Apache Nutch有關, Apache Nutch是一個開源的web搜索引擎 , 本身也是Lucene項目的一部分. Apache Nutch在大約10年前成為一個獨立的項目 .
事實上,許多用戶實現了成功的基于HadoopM/R的通道,一直運行到現在.現實生活中我至少能舉出好幾個例子:
-
Oozie協調下的工作流 每日運行和處理多達8TB數據并生成分析報告
-
bash管理的工作流每日運行和處理多達8TB數據并生成分析報告
現在是2016年了!
商業現實已經改變,所以做出長遠的決定變得更有價值。除此以外,技術本身也在演化進步。Kafka, Storm, Trident, Samza, Spark, Flink, Parquet, Avro, Cloud providers等時髦的技術被工程師們和在商業上廣泛使用.
因此, 現代基于Hadoop的 M/R通道 (以及Kafka,現代的二進制形式如Avro和數據倉庫等。在本例中Amazon Redshift用作ad-hoc查詢) 可能看起來像這樣:
以上M/R通道看起來很不錯,但是它仍然是傳統上具有許多缺點的批處理。由于在新數據不斷進入系統時,批處理過程通常需要花費很多時間來完成,它們主要是提供給終端用戶的乏味的數據 罷了。
Lambda 架構
Nathan Marz 為通用,可擴展和容錯性強的數據處理架構想出了一個術語 Lambda架構 。這個數據架構結合了批處理和流處理方法的優點來處理大批量數據。
我強烈推薦閱讀 Nathan Marz 的書 ,這本書從源碼角度對Lambda架構進行了完美的詮釋。
層結構
從頂層來看,這是層的結構:
所有進入系統的數據被分配到了批處理層和高速層來處理。批處理層管理著主數據集(一個不可修改,只能新增的原始數據)和預計算批處理視圖。服務層索引批處理視圖,因此可以對它們進行低延時的臨時查詢。高速層只處理近期的數據。任何輸入的查詢結果都合并了批處理視圖和實時視圖的查詢結果。
焦點
許多工程師認為 Lambda架構 就包含這些層和定義數據流程,但是 Nathan Marz 在他的書中 把焦點放在了其他重要的地方,如:
-
分布式思想
-
避免增量架構
-
關注數據的不可變性
-
創建再計算算法
數據的相關性
正如前面所提到的,任何輸入的查詢結果都會從批處理視圖和實時視圖的查詢結果返回,因此這些視圖需要被合并。在這里,需要注意的一點是,一個實時視圖是上一個實時視圖和新的數據增量的函數,因此一個增量算法可以在這里使用。批處理視圖是所有數據的視圖,因此再計算算法可以在這里使用。
均衡取舍
我們生活中的一切問題都存在權衡,Lambda架構( Lambda Architecture )不例外。 通常,我們需要解決幾個主要的權衡:
-
完全重新計算vs.部分重新計算
-
某些情況下,可以考慮使用Bloom過濾器來避免完全重新計算
-
-
重算算法 vs. 增量算法
-
使用增量算法是個很大的誘惑,但參考指南,我們必須使用重算算法,即使它更難得到相同的結果
</ul> </li>
-
加法算法 vs. 近似算法
-
Lambda Architecture 能與加法算法很好地協同工作。 因此,在另一種情況下,我們需要考慮使用近似算法,例如,使用HyperLogLog處理 count-distinct 的問題等。
</ul> </li>
</ul>
-
批量層(Batch Layer):寫一次,批量讀取多次
-
服務層(Serving layer):隨機讀取,不支持隨機寫入,批量計算和批量寫入
-
速度層(Speed layer):隨機讀取,隨機寫入;增量計算
實現
有許多實現Lambda架構的方法,因為對于每個層的底層解決方案是非常獨立的。每個層需要底層實現的特定功能,有助于做出更好的選擇并避免過度決策:
例如,其中一個實現方案的構成(使用Kafka,Apache Hadoop,Voldemort,推ter Storm,Cassandra)可能如下圖所示:
Apache Spark
Apache Spark 可以被認為是用于 Lambda 架構各層的集成解決方案。其中,Spark Core 包含了高層次的API和優化的支持通用圖運算引擎,Spark SQL 用于SQL和結構化數據處理、Spark Streaming 可以解決高拓展、高吞吐、容錯的實時流處理。在批處理中使用Spark可能小題大做,而且不是所有方案和數據集都適用。但除此之外,Spark算是對 Lambda Architecture 的合理的實現。
示例應用
下面通過一些路徑創建一個示例應用,以展示 Lambda Architecture ,其主要目的是 提供 #morningatlohika tweets(一個由我 在 Lviv, Ukraine發起的 本地技術演講 ,)這個hash標簽的統計:包括之前到今天 這一刻 的所有時間。
批處理視圖(Batch View)
簡單地說,假定我們的主數據集包含自開始時間以來的所有更新。 此外,我們已經實現了一個批處理,可用于創建我們的業務目標所需的批處理視圖,因此我們有一個預計算的批處理視圖,其中包含所有與 #morningatlohika 相關的標簽 統計信息:
apache – 6 architecture – 12 aws – 3 java – 4 jeeconf – 7 lambda – 6 morningatlohika – 15 simpleworkflow – 14 spark – 5
編號很容易記住,因為,為方便查看,我使用對應標簽的英文單詞的字母數目作為編號。
實時視圖
假設應用程序啟動后,同時有人發如下tweet:
“Cool blog post by @tmatyashovsky about #lambda #architecture using #apache #spark at #morningatlohika”
此時,正確假設應用程序啟動后,同時有人發如下tweet:的實時視圖應該包含如下的hash標簽和統計數據(本例中都是1,因為每個hash標簽只用了一次):
apache – 1 architecture – 1 lambda – 1 morningatlohika – 1 spark – 1
查詢
當終端用戶查詢出現是,為了給全部hash標簽返回實時統計結果,我們只需要合并批處理視圖和實時視圖。所以,輸出如下所示編碼(hash標簽的正確統計數據都加了1):
apache – 7 architecture – 13 aws – 3 java – 4 jeeconf – 7 lambda – 7 morningatlohika – 16 simpleworkflow – 14 spark – 6
場景
示例中的場景可以簡化為如下步驟:
-
用Apache Spark創建批處理視圖(.parquet)
-
在Spark中緩存批處理視圖
-
將流處理應用連接到推ter
-
實時監視包含#morningatlohika 的tweets
-
構造增量實時視圖
-
查詢,即,即時合并批處理視圖和實時視圖
技術細節
此源代碼是基于Apache Spark 1.6.x(注:再引入結構流之前)。 Spark Streaming架構是純微型批處理架構:
所以當我處理一個流媒體應用程序時,我使用DStream來連接使用推terUtils的推ter:
JavaDStream<Status> 推terStatuses = 推terUtils.createStream(javaStreamingContext, create推terAuthorization(), new String[]{推terFilterText});
在每個微批次中(使用可配置的批處理間隔),我正在對新tweets中的hashtags統計信息進行計算,并使用updateStateByKey()狀態轉換函數來更新實時視圖的狀態。簡單地說,就是使用臨時表將實時視圖存儲在存儲器中。
查詢服務反映了批處理的合并過程和通過代碼表示的DataFrame實時視圖:
DataFrame realTimeView = streamingService.getRealTimeView(); DataFrame batchView = servingService.getBatchView(); DataFrame mergedView = realTimeView.unionAll(batchView) .groupBy(realTimeView.col(HASH_TAG.getValue())) .sum(COUNT.getValue()) .orderBy(HASH_TAG.getValue());
List<Row> merged = mergedView.collectAsList();
return merged.stream() .map(row -> new HashTagCount(row.getString(0), row.getLong(1))) .collect(Collectors.toList());</code></pre>
成果
在簡化的方案下,文章開頭提到的基于Hadoop 的M/R 管道可以通過Apache Spark進行如下優化:
本章結語
正如上文提到的 Lambda架構 有優點和缺點,所以結果就是有支持者和反對者。一些人會說批處理視圖和實時視圖有很多重復的邏輯,因為最終他們需要從查詢的角度創建出可以合并的視圖。因此,他們創建了 Kappa 架構——一個 Lambda 架構的簡化方案。 Kappa架構的系統去掉了批處理系統,取而代之的是數據從流處理系統中快速通過:
即便在此場景中,Spark也能發揮作用,比如,參與流處理系統:
來自:https://www.oschina.net/translate/lambda-architecture-with-apache-spark
-
-