spark 編程珠璣-RDD 篇

jopen 8年前發布 | 9K 次閱讀 分布式/云計算/大數據

 

RDD

有關RDD的基礎概念請閱讀spark官方文檔,或網上搜索其他內容。本文完全是實戰經驗的總結。

惰性求值

RDD的轉換操作都是惰性求值的。
惰性求值意味著我們對RDD調用轉化操做(例如map操作)并不會立即執行,相反spark會在內部記錄下所要求執行的操作的相關信息。
把數據讀取到RDD的操作同樣也是惰性的,因此我們調用sc.textFile()時數據沒有立即讀取進來,而是有必要時才會讀取。和轉化操作一樣讀取數據操作也有可能被多次執行。這在寫代碼時要特別注意。

關于惰性求值,對新手來說可能有與直覺相違背之處。有接觸過函數式語言類如haskell的應該不會陌生。
在最初接觸spark時,我們也會有這樣的疑問。
也參與過這樣的討論:

 val sc = new SparkContext("local[2]", "test")
 val f:Int ? Int = (x:Int) ? x + 1
 val g:Int ? Int = (x:Int) ? x + 1
 val rdd = sc.parallelize(Seq(1,2,3,4),1)
 //1
 val res1 = rdd.map(x ? g(f(x))).collect
 //2
 val res2 = rdd.map(g).map(f).collect

第1和第2兩種操作均能得到我們想要的結果,但那種操作更好呢?
直觀上我們會覺得第1種操作更好,因為第一種操作可以僅僅需要一次迭代就能得到我們想要的結果。第二種操作需要兩次迭代操作才能完成。
是我們想象的這樣嗎?讓我們對函數f和g的調用加上打印。按照上面的假設。1和2的輸出分別是這樣的:

1:  f   g   f   g   f   g   f   g       
2:  g   g   g   g   f   f   f   f

代碼:

val sc = new SparkContext("local[2]", "test")
val f:Int ? Int = (x:Int) ? {
    print("f\t")
    x + 1
    }
val g:Int ? Int = (x:Int) ? {
  print("g\t")
  x + 1
}
val rdd = sc.parallelize(Seq(1,2,3,4), 1
//1
val res1 = rdd.map(x ? g(f(x))).collect()
//2
val res2 = rdd.map(f).map(g).collect()

將上面的代碼copy試著運行一下吧,我們在控制臺得到的結果是這樣的。

f   g   f   g   f   g   f   g
f   g   f   g   f   g   f   g

是不是大大出乎我們的意料?這說明什么?說明spark是懶性求值的! 我們在調用map(f)時并不會真正去計算, map(f)只是告訴spark數據是怎么計算出來的。map(f).map(g)其實就是在告訴spark數據先通過f在通過g計算出來的。然后在collect()時,spark在一次迭代中先后對數據調用f、g。

繼續回到我們最初的問題,既然兩種調用方式,在性能上毫無差異,那種調用方式更好呢?我們更推薦第二種調用方式,除了api更加清晰之外。在調用鏈很長的情況下,我們可以利用spark的檢查點機制,在中間添加檢查點,這樣數據恢復的代價更小。而第一種方式調用鏈一旦出錯,數據只能從頭計算。

那么spark到底施加了何種魔法,如此神奇?讓我們來撥開spark的層層面紗。最好的方式當然是看源碼了。以map為例:

RDD的map方法

  /**
   * Return a new RDD by applying a function to all elements of this RDD.
   */
  def map[U: ClassTag](f: T => U): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
  }

和MapPartitionsRDD的compute方法

override def compute(split: Partition, context:
                TaskContext): Iterator[U] =
                f(context, split.index,
                firstParent[T].iterator(split, 
                context))

關鍵是這個 iter.map(cleanF)),我們調用一個map方法其實是在iter對象上調用一個map方法。iter對象是scala.collection.Iterator的一個實例。
在看一下Iterator的map方法

def map[B](f: A => B): Iterator[B]=  
    new AbstractIterator[B] {
    def hasNext = self.hasNext
    def next() = f(self.next())
}

聯想到我們剛才說的我們在RDD上調用一個map方法只是告訴spark數據是怎么計算出來的,并不會真正計算。是不是恍然大悟了。

向spark傳遞函數

我們可以把定義好的內聯函數、方法的引用或靜態方法傳遞給spark。就像scala的其它函數式API一樣。我們還要考慮一些細節,比如傳遞的函數及其引用的變量是可序列話的(實現了java的Serializable接口)。除此之外傳遞一個對象的方法或字段時,會包含對整個對象的引用。我們可以把該字段放到一個局部變量中,來避免傳遞包含該字段的整個對象。

scala中的函數傳遞

class SearchFunctions(val query:String){
    def isMatch(s:String) = s.contains(query)

    def getMatchFuncRef(rdd:RDD[String])
        :RDD[String]= {
        //isMatch 代表this.isMatch因此我們要傳遞整個this
        rdd.map(isMatch)
    }

    def getMatchFieldRef(rdd:RDD[String])={
    //query表示this.query因此我們要傳遞整個this
    rdd.map(x=>x.split(query))
    }

    def getMatchsNoRef(rdd:RDD[String])={
    //安全只要把我們需要的字段放到局部變量中
    val q = this.query
    rdd.map(x=>x.split(query))
    }
}

如果在scala中出現了NotSerializableException,通常問題就在我們傳遞了一個不可序列化類中的函數或字段。傳遞局部可序列變量或頂級對象中的函數始終是安全的。

持久化

如前所述,spark的RDD是惰性求值的,有時我們希望能過多次使用同一個RDD。如果只是簡單的對RDD調用行動操作,spark每次都會重算RDD和它的依賴。這在迭代算法中消耗巨大。 可以使用RDD.persist()讓spark把RDD緩存下來。

避免GroupByKey

讓我們來看看兩種workCount的方式,一種使用reduceByKey,另一種使用groupByKey。

val words = Array("one", "two", "two", "three", "three", "three")
val wordPairsRDD = sc.parallelize(words).map(word => (word, 1))

val wordCountsWithReduce = wordPairsRDD
  .reduceByKey(_ + _)
  .collect()

val wordCountsWithGroup = wordPairsRDD
  .groupByKey()
  .map(t => (t._1, t._2.sum))
  .collect()

雖然兩種方式都能產生正確的結果,但reduceByKey在大數據集時工作的更好。這時因為spark會在shuffling數據之前,為每一個分區添加一個combine操作。這將大大減少shuffling前的數據。

而groupBykey會shuff所有的數據,這大大加重了網絡傳輸的數據量。另外如果一個key對應很多value,這樣也可能引起out of memory。

來自: http://my.oschina.net/u/2533099/blog/603996

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!