Spark學習-RDD編程基礎
1. RDD基礎概念
Spark上開發的應用程序都是由一個driver programe構成,這個所謂的驅動程序在Spark集群通過跑main函數來執行各種并行操作。集群上的所有節點進行并行計算需要共同訪問一個分區元素的集合,這就是 RDD(RDD resilient distributed dataset)彈性分布式數據集 。RDD可以存儲在內存或磁盤中,具有一定的容錯性,可以在節點宕機重啟后恢復。 在Spark 中, 對數據的所有操作不外乎創建RDD、轉化已有RDD 以及調RDD 操作進行求值。而在這一切背后,Spark 會自動將RDD 中的數據分發到集群上,并將操作并行化執行。
2. 創建RDD
創建RDD有兩種方式:一種是通過并行化驅動程序中的已有集合創建,另外一種方法是讀取外部數據集。
2.1 并行化
一種非常簡單的創建RDD的方式,將程序中的一個集合傳給 SparkContext 的 parallelize() 方法。 python中的操作(pyspark打開shell):
>>> data = [1, 2, 3, 4, 5]
>>> distData = sc.parallelize(data)
對RDD進行測試操作
對集合中的所有元素進行相加,返回結果為15
>>> distData.reduce(lambda a, b: a + b)
15</code></pre>
scala中的操作(spark-shell打開shell):
scala> val data = Array(1,2,3,4,5)
data: Array[Int] = Array(1, 2, 3, 4, 5)
scala> val distData = sc.parallelize(data)
distData: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:23
scala> distData.reduce((a,b)=>a+b)
res0: Int = 15</code></pre>
2.2 讀取外部數據集
Spark可以從任何Hadoop支持的存儲上創建RDD,比如本地的文件系統,HDFS,Cassandra等。Spark可以支持文本文件,SequenceFiles等。
這種方法更為常用。
python:
# 從protocols文件創建RDD
distFile = sc.textFile("/etc/protocols")
scala:
// 從protocols文件創建RDD
val distFile = sc.textFile("/etc/protocols")
RDD操作
RDD支持兩種操作:
- 轉換(transformations):將已存在的數據集轉換成新的數據集,例如map。 轉換是惰性的,不會立刻計算結果,僅僅記錄轉換操作應用的目標數據集,當動作需要一個結果時才計算。
- 動作(actions) :數據集計算后返回一個值給驅動程序,例如reduce
轉換操作是懶惰的,舉個例子:
>>> lines = sc.textFile("README.md")
>>> pythonLines = lines.filter(lambda line: "Python" in line)
>>>> pythonLines.first()
如果Spark 在我們運行lines = sc.textFile(…) 時就把文件中所有的行都讀取并存儲起來,就會消耗很多存儲空間,而我們馬上就要篩選掉其中的很多數據。相反, 一旦Spark 了解了完整的轉化操作鏈之后,它就可以只計算求結果時真正需要的數據。 事實上,在行動操作first() 中,Spark 只需要掃描文件直到找到第一個匹配的行為止,而不需要讀取整個文件。
下面引用《Spark快速大數據分析》的一段話:
我們不應該把RDD 看作存 放著特定數據的數據集,而最好把每個RDD 當作我們通過轉化操作構建出來的、記錄如 何計算數據的指令列表。把數據讀取到RDD 的操作也同樣是惰性的。因此,當我們調用 sc.textFile() 時,數據并沒有讀取進來,而是在必要時才會讀取。和轉化操作一樣的是, 讀取數據的操作也有可能會多次執行。
簡單的例子理解轉換和行動操作:
python:
# 從protocols文件創建RDD
distFile = sc.textFile("/etc/protocols")
Map操作,每行的長度,轉換操作
lineLengths = distFile.map(lambda s: len(s))
Reduce操作,獲得所有行長度的和,即文件總長度,這里才會執行map運算
totalLength = lineLengths.reduce(lambda a, b: a + b)
可以將轉換后的RDD保存到集群內存中
lineLengths.persist()</code></pre>
scala:
// 從protocols文件創建RDD
val distFile = sc.textFile("/etc/protocols")
// Map操作,每行的長度
val lineLengths = distFile.map(s => s.length)
// Reduce操作,獲得所有行長度的和,即文件總長度,這里才會執行map運算
val totalLength = lineLengths.reduce((a, b) => a + b)
// 可以將轉換后的RDD保存到集群內存中
lineLengths.persist()</code></pre>
常見的轉換和行動操作
如下圖所示:

4. 向Spark傳遞函數
4.1 python
- 匿名函數:lambda的大量使用,讓簡單的函數寫成表達式的樣子。
- 模塊里的頂層函數
- Spark中已調用函數中定義的本地函數
# 定義需要傳遞給Spark的函數
def myFunc(s):
words = s.split(" ")
return len(words)
將詞數統計函數傳遞給Spark
sc.textFile("/etc/protocols").map(myFunc).reduce(lambda a,b:a+b)</code></pre>
上述中定義的函數為計算一行字符串中單詞的個數, sc.textFile("/etc/protocols").map(myFunc).reduce(lambda a,b:a+b)
表明先讀取內容轉出RDD,然后轉換map()是對每一行都進行統計操作,然后進行行動操作,用lambda匿名函數計算所有行的單詞和。
4.2 scala
- 匿名函數:常用于短小的代碼片段。
- 全局單例對象中的靜態函數:例如,你可以定義一個object MyFunctions,然后將MyFunctions.func1傳入,就像下面這樣:
//創建一個單例對象MyFunctions
object MyFunctions {
def func1(s: String): Int = {s.split(" ").length}
}
val lineLengths = sc.textFile("/etc/protocols").map(MyFunctions.func1)
val count = lineLengths.reduce((a,b)=>a+b)
與python幾乎一樣,注意其匿名函數的寫法。
參考資料:
1. 《spark快速大數據分析》
2. https://www.shiyanlou.com/courses/456/labs/1462/document
來自:http://blog.csdn.net/taoyanqi8932/article/details/60972046