spark快速大數據分析之讀書筆記
來自: http://my.oschina.net/sucre/blog/617340
RDD編程
1、Spark中的RDD就是一個不可變的分布式對象集合。每個RDD都被分為多個分區,這些分區運行在集群中的不同節點上。
2、用戶可以使用兩種方法創建RDD:讀取一個外部數據集,以及在驅動器程序中對一個集合進行并行化(比如list和set)。
創建RDD最簡單的方式就是把程序中一個已有的集合傳給SparkContext的parallelize()方法。
val lines = sc.textFile("README.md") val linesP = sc.parallelize(List("pandas","i like pandas"))
創建出來后,RDD支持兩種類型的操作:轉化操作(transformation)和行動操作(action)。轉化操作會由一個RDD生成一個新的RDD。行動操作會對RDD計算出一個結果,并把結果返回到驅動器程序中,或把結果存儲到外部存儲系統(如HDFS)中
3、轉化操作和行動操作的區別在于spark計算RDD的方式不同。如果對于一個特定的函數是屬于轉化操作還是行動操作感到困惑,可以看看它的返回值類型:轉化操作返回的是RDD,而行動操作返回的是其他的數據類型。
雖然你可以在任何時候定義新的RDD,但spark只會惰性計算這些RDD。它們只有第一次在一個行動操作中用到時,才會真正計算。默認情況下,Spark的RDD會在你每次對它們進行行動操作時重新計算。如果想在多個行動操作中重用同一個RDD,可以使用RDD.persist(),這里也可以使用RDD.cache(),讓spark把這個RDD緩存下來。
來看兩個例子:
轉化操作
val inputRDD = sc.textFile("log.txt") val errorRDD = inputRDD.filter(line=>line.contains("error"))
行動操作
errorRDD.count() for(String line:errorRDD.take(10)){ System.out.println(line) }
這里take()獲取了RDD中的少量元素。除此這外,RDD中還有一個獲取元素的方法,collect()函數,但是只有當整個數據集能在單臺機器的內存中放得下時,才能使用collect(),因此,collect()不能用在大規模數據集上。
4、之所以把RDD稱為彈性的,是因為在任何時候都能進行重算。當保存RDD數據的一臺機器失敗時,Spark還可以使用這種特性來重算出丟掉的分區。
5、總結一下,每個spark程序或shell對話都按如下方式工作:
(1)從外部數據創建出輸入RDD。
(2)使用諸如filter()這樣的轉化操作對RDD進行轉化,以定義新的RDD。
(3)告訴spark對需要被重用的中間結果RDD執行persist()操作。
(4)使用行動操作(例如count()和first()等)來觸發一次并行計算,spark會對計算進行優化后再執行。