Spark——共享變量
來自: http://www.solinx.co/archives/570
Spark執行不少操作時都依賴于 閉包函數 的調用,此時如果閉包函數使用到了外部變量驅動程序在使用行動操作時傳遞到集群中各worker節點任務時就會進行一系列操作:
1、驅動程序使將閉包中使用變量封裝成對象,驅動程序序列化對象,傳給worker節點任務;
2、worker節點任務接收到對象,執行閉包函數;
由于使用外部變量勢必會通過網絡、序列化、反序列化,如外部變量過大或過多使用外部變量將會影響Spark程序的性能;
Spark提供了兩種類型的 共享變量(Shared Variables) :廣播變量(Broadcast Variables)、累加器(Accumulators );
廣播變量(Broadcast Variables)
Spark提供的 廣播變量 可以解決閉包函數引用外部大變量引起的性能問題;廣播變量將只讀變量緩存在每個worker節點中,Spark使用了高效廣播算法分發變量從而提高通信性能;如直接在閉包函數中使用外部 變量該變量會緩存在每個任務(jobTask)中如果多個任務同時使用了一個大變量勢必會影響到程序性能;
廣播變量:每個worker節點中緩存一個副本,通過高效廣播算法提高傳輸效率,廣播變量是只讀的;
Spark Scala Api與Java Api默認使用了Jdk自帶序列化庫,通過使用第三方或使用自定義的序列化庫還可以進一步提高廣播變量的性能;
廣播變量使用示例:
val sc = SparkContext(""); val eigenValue = sc.bradcast(loadEigenValue()) val eigen = computer.map{x => val temp = eigenValue.value ... ... }
左節點不使用廣播變量,右使用廣播變量
累加器(Accumulators)
累加器可以使得worker節點中指定的值聚合到驅動程序中,如統計Spark程序執行過程中的事件總數等;
val sc = new SparkContext(...) val file = sc.textFile("xxx.txt") val eventCount = sc.accumulator(0,"EventAccumulator") //累加器初始值為0 val formatEvent = file.flatMap(line => { if(line.contains("error")){ eventCount +=1 } }) formatEvent.saveAsTextFile("eventData.txt") println("error event count : " + eventCount);
在使用 累加器(Accumulators) 時需要注意,只有在 行動操作 中才會觸發累加器,也就是說上述代碼中由于flatMap()為 轉換操作 因為Spark惰性特征所以只用當saveAsTextFile() 執行時累加器才會被觸發;累加器只有在驅動程序中才可訪問,worker節點中的任務不可訪問累加器中的值;
Spark原生支持了數字類型的的累加器如:Int、Double、Long、Float等;此外Spark還支持自定義累加器用戶可以通過繼承AccumulableParam特征來實現自定義的累加器此外Spark還提供了accumulableCollection()累加集合用于;創建累加器時可以使用名字也可以不是用名字,當使用了名字時在Spark UI中可看到當中程序中定義的累加器, 廣播變量存儲級別為MEMORY_AND_DISK;