深入淺出Spark(二) 什么是RDD
本系列講座是沁原對Sameer Farooqui的《Advanced Apache Spark》的解說。
原始視頻參考 https://www. 油Tube.com/watch? v=7ooZ4S7Ay6Y
(二)什么是RDD?
1. RDD的官方定義
RDD是Spark中的數據抽象,意思是彈性分布式數據集。在邏輯上是一個數據集,在物理上則可以分塊分布在不同的機器上并發運行。RDD的數據具有不可變性(immutable)
圖.1. 一個邏輯RDD在物理上分塊存儲在不同的服務器。
如圖所示,一個RDD數據集被分成了五塊,運行在了三個worker服務器上。第一臺上運行了兩個RDD數據塊,第二臺上運行兩個RDD數據庫塊,第三臺上運行剩下的一個數據塊。
2. RDD的生命周期
在Spark程序中,首先要讀取或創建RDD, 然后對數據進行一系列的變換操作(Transform),保存中間結果(Cache),最后對變換結果進行處理(Action)
2.1 RDD的產生可以通過對內存中的數據并行化,或直接讀取分布式數據庫(S3, HDFS, Cassandra 等等)而來。
圖.2. 通過parallelize接口,將內存數據變成RDD。(圖中sc指的是spark context實例。)
圖.3. 直接讀取文件生成RDD.
2.2. RDD支持數據變換接口,如常用的filter, map等等,在變換的過程中,RDD的數據并不立即發生實際變化(Lazily transform),而是保存了數據的依賴關系,直到要求RDD進行動作(Action)時。RDD會從全局的角度來優化Transform的運行過程。從而節省時間。
2.3 RDD的cache操作將數據的中間結果保存在內存中,方便下次使用。
2.4 RDD的Action操作將數據的運算結果進行統計和返回。常見的如count 和 collect.
圖.4. RDD操作實例
舉個例子。如圖4所示。從日志(Log)數據庫中讀取的文件生成logLinesRDD, 形成了四個物理分塊。通過filter變換提取出日志中的錯誤信息, 形成errorsRDD。 通過合并coalesce形成兩個塊。進一步過濾提取只包含錯誤1的日志errorMsg1RDD。最后進行collect 動作, 將結果合并返回到Driver。中間結果,我們使用了count動作來返回一共有多少條錯誤日志 。用saveToCassandra將錯誤日志保存到Cassandra數據庫中。圖中綠色的箭頭表示Action。紅色箭頭表示Transformation。
3. 根據數據源,RDD可以分成許多類,比如從Jdbc得來的RDD是JdbcRDD.
圖.5. RDD分類
每一類的RDD都定義如下幾個重要的的特征。
-
如何分塊。(Partition)
-
與父RDD的依賴關系(Dependency)
-
從父RDD求子RDD的函數(function)
-
希望當前RDD存儲的位置(preferred location)
-
負責存儲RDD的分塊類(Partitioner)
特征2,3是保存了數據的產生方式, 當數據丟失時可以進行數據恢復。4,5是本地化存儲策略。通過盡可能的本地存儲來提高運算速度。
例一: HadoopRDD
通過讀HDFS生成的RDD。它的分塊策略是每個HDFS塊生成一個分塊。該RDD沒有父節點。我們希望這個RDD的數據塊存在HDFS數據塊相同的位置。不用進一步分塊。
例二. FilteredRDD
FilteredRDD產生在Filter操作后。分塊與父RDD相同。與父RDD一一對應, 存儲位置與父塊相同。
例三:JoinedRDD
該RDD產生在shuffle操作之后。每個reduce操作有一個分區。依賴于被shuffle的父RDD。進一步分區是通過HashPartitioner實現的。
4. 總結
本節講解了
-
什么是RDD
-
RDD的生命周期。 創建(Create),懶變換(Lazily Transform),緩存(Cache),和動作(Action)。
- RDD的分類和特征。
本文作者Lion, 更多精彩內容,歡迎訪問官網 http:// BitTiger.io 或關注 “論碼農的自我修養” 微信公眾號:bit_tiger
來自:https://zhuanlan.zhihu.com/p/22062770