Spark學習-RDD編程基礎

風云決 7年前發布 | 13K 次閱讀 Spark 分布式/云計算/大數據

1. RDD基礎概念

Spark上開發的應用程序都是由一個driver programe構成,這個所謂的驅動程序在Spark集群通過跑main函數來執行各種并行操作。集群上的所有節點進行并行計算需要共同訪問一個分區元素的集合,這就是 RDD(RDD resilient distributed dataset)彈性分布式數據集 。RDD可以存儲在內存或磁盤中,具有一定的容錯性,可以在節點宕機重啟后恢復。 在Spark 中, 對數據的所有操作不外乎創建RDD、轉化已有RDD 以及調RDD 操作進行求值。而在這一切背后,Spark 會自動將RDD 中的數據分發到集群上,并將操作并行化執行。

2. 創建RDD

創建RDD有兩種方式:一種是通過并行化驅動程序中的已有集合創建,另外一種方法是讀取外部數據集。

2.1 并行化

一種非常簡單的創建RDD的方式,將程序中的一個集合傳給 SparkContext 的 parallelize() 方法。 python中的操作(pyspark打開shell):

>>> data = [1, 2, 3, 4, 5]
>>> distData = sc.parallelize(data)

對RDD進行測試操作

對集合中的所有元素進行相加,返回結果為15

>>> distData.reduce(lambda a, b: a + b) 15</code></pre>

scala中的操作(spark-shell打開shell):

scala> val data = Array(1,2,3,4,5)
data: Array[Int] = Array(1, 2, 3, 4, 5)

scala> val distData = sc.parallelize(data) distData: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:23

scala> distData.reduce((a,b)=>a+b) res0: Int = 15</code></pre>

2.2 讀取外部數據集

Spark可以從任何Hadoop支持的存儲上創建RDD,比如本地的文件系統,HDFS,Cassandra等。Spark可以支持文本文件,SequenceFiles等。

這種方法更為常用。

python:

# 從protocols文件創建RDD
distFile = sc.textFile("/etc/protocols")

scala:

// 從protocols文件創建RDD
val distFile = sc.textFile("/etc/protocols")

RDD操作

RDD支持兩種操作:

  1. 轉換(transformations):將已存在的數據集轉換成新的數據集,例如map。 轉換是惰性的,不會立刻計算結果,僅僅記錄轉換操作應用的目標數據集,當動作需要一個結果時才計算。
  2. 動作(actions) :數據集計算后返回一個值給驅動程序,例如reduce

轉換操作是懶惰的,舉個例子:

>>> lines = sc.textFile("README.md")
>>> pythonLines = lines.filter(lambda line: "Python" in line)
>>>> pythonLines.first()

如果Spark 在我們運行lines = sc.textFile(…) 時就把文件中所有的行都讀取并存儲起來,就會消耗很多存儲空間,而我們馬上就要篩選掉其中的很多數據。相反, 一旦Spark 了解了完整的轉化操作鏈之后,它就可以只計算求結果時真正需要的數據。 事實上,在行動操作first() 中,Spark 只需要掃描文件直到找到第一個匹配的行為止,而不需要讀取整個文件。

下面引用《Spark快速大數據分析》的一段話:

我們不應該把RDD 看作存 放著特定數據的數據集,而最好把每個RDD 當作我們通過轉化操作構建出來的、記錄如 何計算數據的指令列表。把數據讀取到RDD 的操作也同樣是惰性的。因此,當我們調用 sc.textFile() 時,數據并沒有讀取進來,而是在必要時才會讀取。和轉化操作一樣的是, 讀取數據的操作也有可能會多次執行。

簡單的例子理解轉換和行動操作:

python:

# 從protocols文件創建RDD
distFile = sc.textFile("/etc/protocols")

Map操作,每行的長度,轉換操作

lineLengths = distFile.map(lambda s: len(s))

Reduce操作,獲得所有行長度的和,即文件總長度,這里才會執行map運算

totalLength = lineLengths.reduce(lambda a, b: a + b)

可以將轉換后的RDD保存到集群內存中

lineLengths.persist()</code></pre>

scala:

// 從protocols文件創建RDD
val distFile = sc.textFile("/etc/protocols")

// Map操作,每行的長度 val lineLengths = distFile.map(s => s.length)

// Reduce操作,獲得所有行長度的和,即文件總長度,這里才會執行map運算 val totalLength = lineLengths.reduce((a, b) => a + b)

// 可以將轉換后的RDD保存到集群內存中 lineLengths.persist()</code></pre>

常見的轉換和行動操作

如下圖所示:

4. 向Spark傳遞函數

4.1 python

  1. 匿名函數:lambda的大量使用,讓簡單的函數寫成表達式的樣子。
  2. 模塊里的頂層函數
  3. Spark中已調用函數中定義的本地函數
# 定義需要傳遞給Spark的函數
def myFunc(s):
    words = s.split(" ")
    return len(words)

將詞數統計函數傳遞給Spark

sc.textFile("/etc/protocols").map(myFunc).reduce(lambda a,b:a+b)</code></pre>

 

上述中定義的函數為計算一行字符串中單詞的個數, sc.textFile("/etc/protocols").map(myFunc).reduce(lambda a,b:a+b)

表明先讀取內容轉出RDD,然后轉換map()是對每一行都進行統計操作,然后進行行動操作,用lambda匿名函數計算所有行的單詞和。

4.2 scala

  1. 匿名函數:常用于短小的代碼片段。
  2. 全局單例對象中的靜態函數:例如,你可以定義一個object MyFunctions,然后將MyFunctions.func1傳入,就像下面這樣:
//創建一個單例對象MyFunctions
object MyFunctions {
  def func1(s: String): Int = {s.split(" ").length}
}
val lineLengths = sc.textFile("/etc/protocols").map(MyFunctions.func1)
val count = lineLengths.reduce((a,b)=>a+b)

與python幾乎一樣,注意其匿名函數的寫法。

參考資料:

1. 《spark快速大數據分析》

2. https://www.shiyanlou.com/courses/456/labs/1462/document

 

來自:http://blog.csdn.net/taoyanqi8932/article/details/60972046

 

 本文由用戶 風云決 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!