【原】Learning Spark (Python版) 學習筆記(一)
《Learning Spark》 這本書算是Spark入門的必讀書了,中文版是 《Spark快速大數據分析》 ,不過豆瓣書評很有意思的是,英文原版評分7.4,評論都說入門而已深入不足,中文譯版評分8.4,評論一片好評,有點意思。我倒覺得這本書可以作為官方文檔的一個補充,刷完后基本上對Spark的一些基本概念、碼簡單的程序是沒有問題的了。這本書有一個好處是它是用三門語言寫的,Python/Java/Scala,所以適用性很廣,我的觀點是,先精通一門語言,再去學其他語言。由于我工作中比較常用的是Python,所以就用把Python相關的命令總結一下。下一階段再深入學習Java和Scala。這一篇總結第一張-第三章的重點內容。
說到Spark,就不得不提到RDD,RDD,字面意思是彈性分布式數據集,其實就是分布式的元素集合。Python的基本內置的數據類型有整型、字符串、元祖、列表、字典,布爾類型等,而Spark的數據類型只有RDD這一種,在Spark里,對數據的所有操作,基本上就是圍繞RDD來的,譬如創建、轉換、求值等等。理解RDD后可以避免以后走很多彎路。關于 RDD的特點 ,可以搜到很多資料,其實我們只需要理解兩點就可以了:
1. 不可變
2. 分布式
有人會覺得很奇怪,如果RDD不可變,那么在進行數據操作的時候,怎么改變它的值,怎么進行計算呢?其實 RDD支持兩種操作 :
1. Tansformation (轉化操作):返回值還是一個RDD
2. Action (行動操作):返回值不是一個RDD
第一種Transformation是返回一個新的RDD,如map(),filter()等。這種操作是lazy(惰性)的,即從一個RDD轉換生成另一個RDD的操作不是馬上執行,只是記錄下來,只有等到有Action操作是才會真正啟動計算,將生成的新RDD寫到內存或hdfs里,不會對原有的RDD的值進行改變。而Action操作才會實際觸發Spark計算,對RDD計算出一個結果,并把結果返回到內存或hdfs中,如count(),first()等。
通俗點理解的話,就是假設你寫了一堆程序,里面對數據進行了多次轉換,這個時候實際上沒有計算,就只是放著這里。在最后出結果的時候會用到Action操作,這個時候Action會執行與之相關的轉換操作,運算速度會非常快(一是Action不一定需要調用所有的transformation操作,二是只有在最后一步才會計算相關的transformation操作)。如果Transformation沒有lazy性質的話,每轉換一次就要計算一次,最后Action操作的時候還要計算一次,會非常耗內存,也會極大降低計算速度。
還有一種情況,如果我們想多次使用同一個RDD,每次都對RDD進行Action操作的話,會極大的消耗Spark的內存,這種情況下,我們可以使用RDD.persist()把這個RDD緩存下來,在內存不足時,可以存儲到磁盤(disk)里。在Python中,儲存的對象永遠是通過Pickle庫序列化過的,所以社不設置序列化級別不會產生影響。
RDD的性質和操作方式講完了,現在來說說怎么 創建RDD ,有兩種方式
1. 讀取一個外部數據集
2. 在內存中對一個集合進行并行化 (parallelize)
第二種方式相對來說更簡單,你可以直接在shell里快速創建RDD,舉個例子:
A = [1,2,3,4,5] lines = sc.parallelize(A) #另一種方式 lines = sc.parallelize([1,2,3,4,5])
但是這種方式并不是很好,因為你需要把你的整個數據集放在內存里,如果數據量比較大,會很占內存。所以,可以在測試的時候用這種方式,簡單快速。
讀取外部數據及時需要用到SparkContext.textFile()
1 lines = sc.textFile( " README.md " )
RDD的操作命令很多,包括map(),filter()等Transformation操作以及reduce(),fold(),aggregate()等Action操作,這里限于時間問題就先不一一寫了,等有時間再補上。
最后來講講如何 向 Spark傳遞函數 :
兩種方式:
1 .簡單的函數:lambda表達式 。
適合比較短的函數,不支持多語句函數和無返回值的語句。
2 .def函數
會將整個對象傳遞過去,但是最好不要傳遞一個帶字段引用的函數。如果你傳遞的對象是某個對象的成員,或者在某個函數中引用了一個整個字段,會報錯。舉個例子:
class MyClass(object): def __init__(self): self.field = “Hello” def doStuff(self, rdd): #報錯:因為在self.field中引用了整個self return rdd.map(lambda s: self.field + x)
解決方法:直接把你需要的字段拿出來放到一個局部變量里,然后傳遞這個局部變量就可以了。
class MyClass(object): def __init__(self): self.field = “Hello” def doStuff(self, rdd): #將需要的字段提取到局部變量中即可 field = self.field return rdd.map(lambda s: field + x)
前面三章講了Spark的基本概念和RDD的特性以及一些簡單的命令,比較簡單。后面三章主要講了鍵值對操作、數據的讀取和保存以及累加器、廣播變量等,下周再更新,順便把這次沒寫完的RDD常見操作命令補完。