Spark初探

jopen 9年前發布 | 58K 次閱讀 Spark 分布式/云計算/大數據

Spark 基于內存計算,提高了在大數據環境下數據處理的實時性,同時保證了高容錯性和高可伸縮性,允許用戶將Spark 部署在大量廉價硬件之上,形成集群。

認識Spark

Apache Spark is an open source cluster computing system that aims to make data analytics fast — both fast to run and fast to write. Spark是一個開源的分布式計算系統,它的目的是使得數據分析更快——寫起來和運行起來都很快。

Spark 是基于內存計算的大數據并行計算框架。Spark 基于內存計算,提高了在大數據環境下數據處理的實時性,同時保證了高容錯性和高可伸縮性,允許用戶將Spark 部署在大量廉價硬件之上,形成集群。

Spark發展史

2009,Spark誕生于加州大學伯克利分校AMPLab

2013年6月,Spark成為Apache孵化項目

2014年2月,Spark取代MapReduce成為Apache頂級項目

Spark生態系統

Spark擁有一套生態系統,叫做伯克利數據分析棧(BDAS

Spark初探

為什么比MapReduce

Spark官網上如是說:Run programs up to 100x faster than Hadoop MapReduce in memory, or 10x faster on disk. 那Spark究竟是為什么比MapReduce快呢?

  1. MapReduce通常會將中間結果放到HDFS上,Spark 是基于內存計算的大數據并行計算框架,中間結果在內存中,對于迭代運算效率比較高。

  2. MapReduce消耗了大量時間去排序,而有些場景不需要去排序,Spark可以避免不要的排序帶來的開銷。

  3. Spark能夠將要執行的操作做成一張有向無環圖(DAG),然后進行優化。

其他優勢

Spark采用事件驅動的類庫AKKA啟動任務,通過線程池來避免啟動任務的開銷。

Spark更加通用,除了具有mapreduce算子之外,還有filterjoin80多種算子。

支持的API

Scala(很好),Python(不錯),Java

運行模式

  • Local(只用于測試)

  • Standalone:獨立模式

  • Spark on yarn:最有前景的模式

  • Spark on Mesos:官方推薦

  • Amazon EC2

Spark runtime

Spark初探

Spark運行時:用戶的Driver程序啟動多個Worker,Worker從文件系統中加載數據,生成新的RDD,并按照不同的分區Cache到內存中。

彈性分布式數據集RDD

RDD英文全稱Resilient Distributed Dataset,即彈性分布式數據集。RDD是只讀的、分區記錄的集合。Spark中的一切都是基于RDD的,我們通過以下幾個方面來了解它:

  1. 創建

    1)從集合轉換而來;

    2)基于文件系統(本地文件、HDFSHBase等)的輸入創建;

    3)從父RDD轉換而來。

  2. 計算類型

    1)Transformation(轉換):延遲執行,也就是通過轉換生成一個新的RDD時候并不會立即執行,只有等到Action(行動)時,才觸發操作。常用操作有mapfilter等。

    2)Action(行動):提交Spark作業,啟動計算操作,并產生最終結果(向用戶程序返回或者寫入文件系統)。

  3. 容錯

    LineageRDD含有如何從父RDD衍生出本RDD的相關信息,出錯時RDD可以通過Lineage恢復。

  4. 內部屬性

    1)分區列表

    2)計算每個分片的函數

    3)對父RDD的一組依賴

    4)對Key-Value數據類型RDD的分區器,用戶可以指定分區策略和分區數

    5)每個數據分區的地址列表(如HDFS上的數據塊的地址)

Spark Shell

Spark自帶的交互式Shell程序,方便用戶進行交互式編程。進入方式:

./bin/spark-shell

當打開spark shell的時候SparkContext已經被初始化了,對象名為sc,直接使用即可。跟Scala解釋器很像,在上面可以進行交互式操作。

接下來的內容可能需要你了解Scala語言,可以參照Scala極速入門

WordCount開胃菜

接下來我們來個實實在在的例子,作為介紹算子之前的開胃小菜。用過MapReduce的同學肯定寫過Java實現的WordCount程序,如果需要排序的話還要再鏈接一個排序任務,邏輯不復雜、代碼卻也不少。我們在Spark中用Scala語言可以怎么實現呢?我們直接在Spark-Shell中操作。

scala> sc.textFile("hdfs://.../wordcount.data").
     | flatMap(_ split " ").map((_, 1)).reduceByKey(_ + _).
     | map(x=>(x._2, x._1)).sortByKey(false).map(x=>(x._2,x._1)).
     | saveAsTextFile("hdfs://.../result")

在Scala解釋器中,如果輸入的不是一個完整的可執行的語句,然后直接敲了回車,會出現開始的|,表示可以繼續錄入,直到輸入一個完整的語句。也就是說我們剛剛用一行代碼,搞定了WordCount + 排序功能。我們在后文中在對代碼做一個具體的解釋。對比一下吭哧吭哧寫的《MapReduce統計詞語出現次數》(雖然功能有一點點差異),我只能有這樣的感慨:函數式編程,爽;Spark,帥!

常用算子

Spark初探

1. Spark輸入:

從集合中輸入:

val rdd1 = sc.parallelize(List("Java", "Scala", "Spark"))

從文件系統中輸入:

val rdd2 = sc.textFile("hdfs://.../wordcount.data")

2. cache

cache 將RDD 元素從磁盤緩存到內存,相當于persist(MEMORY_ONLY) 函數的功能。RDD再次使用的話,就直接從內存中讀取數據。

3. map(func)

Return a new distributed dataset formed by passing each element of the source through a function func.

將原來RDD中的每個數據項通過函數func映射為新的數據項。

val rdd2 = sc.parallelize(List(1,2,3,4))
val rdd3 = rdd2.map(_ + 2)

rdd3從rdd2轉換而來(rdd2中的每個數據項加2),rdd3中的數據項變為[3,4,5,6]。當然因為是transformations類型的算子,并不會立即執行。

4. filter(func)

Return a new dataset formed by selecting those elements of the source on which funcreturns true.

對原RDD中的數據進行過濾,每個元素經過func函數處理,返回true的保留。

val rdd4 = rdd3.filter(_ > 4)

rdd3中所有大于4的數據項組成了rdd4。

5. flatMap(func)

Similar to map, but each input item can be mapped to 0 or more output items (so funcshould return a Seq rather than a single item).

與map相似,只不過每個數據項通過函數func映射成0個或者多個數據項(即func要返回一個集合),并將新生成的RDD中的元素合并到一個集合中。

6. sample(withReplacementfractionseed)

Sample a fraction fraction of the data, with or without replacement, using a given random number generator seed.

對已有的RDD進行采樣,獲取子集。并且可以指定是否有放回采樣、采樣百分比、隨機種子。函數參數如下:

withReplacement = true,有放回抽樣;withReplacement =false,無放回抽樣。

fraction 采樣隨機比。

seed 采樣種子,也就是一定包含在采樣生成的rdd中。

7. groupByKey([numTasks])

When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable<V>) pairs. 
Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey or combineByKey will yield much better performance. 
Note: By default, the level of parallelism in the output depends on the number of partitions of the parent RDD. You can pass an optional numTasks argument to set a different number of tasks.

將含有相同key的數據合并到一個組,[numTasks]指定了分區的個數。

val rdd4 = sc.parallelize(List(("aa", 1), ("aa", 2), ("bb", 3), ("bb", 4)))
val rdd5 = rdd4.groupByKey

rdd5結果為Array[(String, Iterable[Int])] = Array((aa,CompactBuffer(1, 2)), (bb,CompactBuffer(3, 4)))

8. reduceByKey(func, [numTasks])

When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.

將相同的key依據函數func合并。

9. union(otherDataset)

Return a new dataset that contains the union of the elements in the source dataset and the argument.

將兩個RDD合并,要求兩個RDD中的數據項類型一致。

10. join(otherDataset, [numTasks])

When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. Outer joins are supported through leftOuterJoin,rightOuterJoin, and fullOuterJoin.

val rddtest1 = sc.parallelize(List(("James", 1), ("Wade", 2), ("Paul", 3)))
val rddtest2 = sc.parallelize(List(("James", 4), ("Wade", 5)))
val rddtest12 = rddtest1 join rddtest2

rddtest12結果為:Array[(String, (Int, Int))] = Array((James,(1,4)), (Wade,(2,5)))

11. cogroup(otherDataset, [numTasks])

When called on datasets of type (K, V) and (K, W), returns a dataset of (K, Iterable<V>, Iterable<W>) tuples. This operation is also called groupWith.

對在兩個RDD 中的Key-Value 類型的元素,每個RDD 相同Key 的元素分別聚合為一個集合,并且返回兩個RDD 中對應Key 的元素集合的迭代器。

使用上例中rddtest1 cogroup rddtest2,結果是:

Array((Paul,(CompactBuffer(3),CompactBuffer())), (James,(CompactBuffer(1),CompactBuffer(4))), (Wade,(CompactBuffer(2),CompactBuffer(5))))

12. sortByKey([ascending], [numTasks])

When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument.

按照key來排序,默認從小到大。如果加上參數false,則從大到小排序。

13. count()

Return the number of elements in the dataset.

返回數據項的個數。

14. collect()

Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data.

將分布式的RDD 返回為一個單機的足夠小的scala Array 數組。

15. countByKey()

Only available on RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with the count of each key.

用于key-value類型的RDD,返回每個key對應的個數。

16. lookup(key: K)

用于key-value類型的RDD,返回key對應的所有value值。

17. reduce(func)

Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel.

迭代遍歷每一個元素,并執行函數func。

val reduceRdd = sc.parallelize(List(1,2,3,4,5))
reduceRdd.reduce(_ + _)

計算結果為所有元素之和15。

18. saveAsTextFile(path)

Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file.

將文件輸出到文件系統。

注:上述算子中黑體的為Action類型的算子。

看完對算子的介紹,我們再看一下完整的程序。

if (args.length != 3) {
  println("Usage : java -jar code.jar dependency_jars file_location save_location")
  System.exit(0)
}

val jars = ListBuffer[String]()
args(0).split(',').map(jars += _)

val conf = new SparkConf()
conf.setMaster("spark://host:port")
    .setSparkHome("your-spark-home")
    .setAppName("WordCount")
    .setJars(jars)
    .set("spark.executor.memory","25g")
val sc = new SparkContext(conf)
sc.textFile(args(1))           // 從文件系統中讀取文件
  .flatMap(_ split " ")        // 將每一行數據,以空格為分隔符,拆分單詞
  .map((_, 1))                 // 每個詞語計數為1
  .reduceByKey(_ + _)          // 統計每個詞語的個數
  .map(x=>(x._2, x._1))        // key-value互換
  .sortByKey(false)            // 按照key來排序(從大到小)
  .map(x=>(x._2,x._1))         // key-value互換
  .saveAsTextFile(args(2))     // 將結果輸出到文件系統中

總結

本文簡單介紹了Spark中的一些概念,并結合了一些簡單的例子。內容寫的比較淺顯,隨著筆者的深入研究,也會在后續博客中對各部分內容做更深入的闡述。希望本篇能對看到的人有所幫助。

友情提示:

如果你沒有Scala基礎,可以參照Scala極速入門

與MapReduce程序對比,可以參照MapReduce統計詞語出現次數


 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!