Spark,一種快速數據分析替代方案

fmms 13年前發布 | 77K 次閱讀 Spark 分布式/云計算/大數據

雖然 Hadoop 在分布式數據分析方面備受關注,但是仍有一些替代產品提供了優于典型 Hadoop 平臺的令人關注的優勢。Spark 是一種可擴展的數據分析平臺,它整合了內存計算的基元,因此,相對于 Hadoop 的集群存儲方法,它在性能方面更具優勢。Spark 是在 Scala 語言中實現的,并且利用了該語言,為數據處理提供了獨一無二的環境。了解 Spark 的集群計算方法以及它與 Hadoop 的不同之處。

Spark 是一種與 Hadoop 相似的開源集群計算環境,但是兩者之間還存在一些不同之處,這些有用的不同之處使 Spark 在某些工作負載方面表現得更加優越,換句話說,Spark 啟用了內存分布數據集,除了能夠提供交互式查詢外,它還可以優化迭代工作負載。

Spark 是在 Scala 語言中實現的,它將 Scala 用作其應用程序框架。與 Hadoop 不同,Spark 和 Scala 能夠緊密集成,其中的 Scala 可以像操作本地集合對象一樣輕松地操作分布式數據集。

盡管創建 Spark 是為了支持分布式數據集上的迭代作業,但是實際上它是對 Hadoop 的補充,可以在 Hadoo 文件系統中并行運行。通過名為 Mesos 的第三方集群框架可以支持此行為。Spark 由加州大學伯克利分校 AMP 實驗室 (Algorithms, Machines, and People Lab) 開發,可用來構建大型的、低延遲的數據分析應用程序。

Spark 集群計算架構

雖然 Spark 與 Hadoop 有相似之處,但它提供了具有有用差異的一個新的集群計算框架。首先,Spark 是為集群計算中的特定類型的工作負載而設計,即那些在并行操作之間重用工作數據集(比如機器學習算法)的工作負載。為了優化這些類型的工作負 載,Spark 引進了內存集群計算的概念,可在內存集群計算中將數據集緩存在內存中,以縮短訪問延遲。

Spark 還引進了名為 彈性分布式數據集 (RDD) 的抽象。RDD 是分布在一組節點中的只讀對象集合。這些集合是彈性的,如果數據集一部分丟失,則可以對它們進行重建。重建部分數據集的過程依賴于容錯機制,該機制可以維護 “血統”(即充許基于數據衍生過程重建部分數據集的信息)。RDD 被表示為一個 Scala 對象,并且可以從文件中創建它;一個并行化的切片(遍布于節點之間);另一個 RDD 的轉換形式;并且最終會徹底改變現有 RDD 的持久性,比如請求緩存在內存中。

Spark 中的應用程序稱為驅動程序,這些驅動程序可實現在單一節點上執行的操作或在一組節點上并行執行的操作。與 Hadoop 類似,Spark 支持單節點集群或多節點集群。對于多節點操作,Spark 依賴于 Mesos 集群管理器。Mesos 為分布式應用程序的資源共享和隔離提供了一個有效平臺(參見 圖 1)。該設置充許 Spark 與 Hadoop 共存于節點的一個共享池中。


圖 1. Spark 依賴于 Mesos 集群管理器實現資源共享和隔離。
Spark,一種快速數據分析替代方案

Spark 編程模式

驅動程序可以在數據集上執行兩種類型的操作:動作和轉換。動作 會在數據集上執行一個計算,并向驅動程序返回一個值;而轉換 會從現有數據集中創建一個新的數據集。動作的示例包括執行一個 Reduce 操作(使用函數)以及在數據集上進行迭代(在每個元素上運行一個函數,類似于 Map 操作)。轉換示例包括 Map 操作和 Cache 操作(它請求新的數據集存儲在內存中)。

我們隨后就會看看這兩個操作的示例,但是,讓我們先來了解一下 Scala 語言。

Scala 簡介

Scala 可能是 Internet 上不為人知的秘密之一。您可以在一些最繁忙的 Internet 網站(如 推ter、LinkedIn 和 Foursquare,Foursquare 使用了名為 Lift 的 Web 應用程序框架)的制作過程中看到 Scala 的身影。還有證據表明,許多金融機構已開始關注 Scala 的性能(比如 EDF Trading 公司將 Scala 用于衍生產品定價)。

Scala 是一種多范式語言,它以一種流暢的、讓人感到舒服的方法支持與命令式、函數式和面向對象的語言相關的語言特性。從面向對象的角度來看,Scala 中的每個值都是一個對象。同樣,從函數觀點來看,每個函數都是一個值。Scala 也是屬于靜態類型,它有一個既有表現力又很安全的類型系統。

此外,Scala 是一種虛擬機 (VM) 語言,并且可以通過 Scala 編譯器生成的字節碼,直接運行在使用 Java Runtime Environment V2 的 Java? Virtual Machine (JVM) 上。該設置充許 Scala 運行在運行 JVM 的任何地方(要求一個額外的 Scala 運行時庫)。它還充許 Scala 利用大量現存的 Java 庫以及現有的 Java 代碼。

最后,Scala 具有可擴展性。該語言(它實際上代表了可擴展語言)被定義為可直接集成到語言中的簡單擴展。

舉例說明 Scala

讓我們來看一些實際的 Scala 語言示例。Scala 提供自身的解釋器,充許您以交互方式試用該語言。Scala 的有用處理已超出本文所涉及的范圍,但是您可以在 參考資料 中找到更多相關信息的鏈接。

清單 1 通過 Scala 自身提供的解釋器開始了快速了解 Scala 語言之旅。啟用 Scala 后,系統會給出提示,通過該提示,您可以以交互方式評估表達式和程序。我們首先創建了兩個變量,一個是不可變變量(即 vals,稱作單賦值),另一個變量是可變變量 (vars)。注意,當您試圖更改 b(您的 var)時,您可以成功地執行此操作,但是,當您試圖更改 val 時,則會返回一個錯誤。


清單 1. Scala 中的簡單變量
              
$ scala Welcome to Scala version 2.8.1.final (OpenJDK Client VM, Java 1.6.0_20).
Type in expressions to have them evaluated.
Type :help for more information.

scala> val a = 1 a: Int = 1

scala> var b = 2 b: Int = 2

scala> b = b + a b: Int = 3

scala> a = 2 




       




        6: error: reassignment to val
       a = 2
         ^




       

接下來,創建一個簡單的方法來計算和返回 Int 的平方值。在 Scala 中定義一個方法得先從 def 開始,后跟方法名稱和參數列表,然后,要將它設置為語句的數量(在本示例中為 1)。無需指定任何返回值,因為可以從方法本身推斷出該值。注意,這類似于為變量賦值。在一個名為 3 的對象和一個名為 res0 的結果變量(Scala 解釋器會自動為您創建該變量)上,我演示了這個過程。這些都顯示在 清單 2 中。


清單 2. Scala 中的一個簡單方法
              
scala> def square(x: Int) = x*x square: (x: Int)Int

scala> square(3) res0: Int = 9

scala> square(res0) res1: Int = 81

接下來,讓我們看一下 Scala 中的一個簡單類的構建過程(參見 清單 3)。定義一個簡單的 Dog 類來接收一個 String 參數(您的名稱構造函數)。注意,這里的類直接采用了該參數(無需在類的正文中定義類參數)。還有一個定義該參數的方法,可在調用參數時發送一個字符串。您要創建一個新的類實例,然后調用您的方法。注意,解釋器會插入一些豎線:它們不屬于代碼。


清單 3. Scala 中的一個簡單的類
              

scala> class Dog( name: String ) {      |   def bark() = println(name + " barked")      | } defined class Dog

scala> val stubby = new Dog("Stubby") stubby: Dog = Dog@1dd5a3d

scala> stubby.bark Stubby barked

scala>

完成上述操作后,只需輸入 :quit 即可退出 Scala 解釋器。

安裝 Scala 和 Spark

第一步是下載和配置 Scala。清單 4 中顯示的命令闡述了 Scala 安裝的下載和準備工作。使用 Scala v2.8,因為這是經過證實的 Spark 所需的版本。


清單 4. 安裝 Scala
              
$ wget http://www.scala-lang.org/downloads/distrib/files/scala-2.8.1.final.tgz $ sudo tar xvfz scala-2.8.1.final.tgz --directory /opt/            

要使 Scala 可視化,請將下列行添加至您的 .bashrc 中(如果您正使用 Bash 作為 shell):

export SCALA_HOME=/opt/scala-2.8.1.final
export PATH=$SCALA_HOME/bin:$PATH

接著可以對您的安裝進行測試,如 清單 5 所示。這組命令會將更改加載至 bashrc 文件中,接著快速測試 Scala 解釋器 shell。


清單 5. 配置和運行交互式 Scala
              
$ scala Welcome to Scala version 2.8.1.final (OpenJDK Client VM, Java 1.6.0_20).
Type in expressions to have them evaluated.
Type :help for more information.

scala> println("Scala is installed!") Scala is installed!

scala> :quit $ 

如清單中所示,現在應該看到一個 Scala 提示。您可以通過輸入 :quit 執行退出。注意,Scala 要在 JVM 的上下文中執行操作,所以您會需要 JVM。我使用的是 Ubuntu,它在默認情況下會提供 OpenJDK。

接下來,請獲取最新的 Spark 框架副本。為此,請使用 清單 6 中的腳本。


清單 6. 下載和安裝 Spark 框架
              

wget https://github.com/mesos/spark/tarball/0.3-scala-2.8/
mesos-spark-0.3-scala-2.8-0-gc86af80.tar.gz
$ sudo tar xvfz mesos-spark-0.3-scala-2.8-0-gc86af80.tar.gz

接下來,使用下列行將 spark 配置設置在 Scala 的根目錄 ./conf/spar-env.sh 中:

export SCALA_HOME=/opt/scala-2.8.1.final

設置的最后一步是使用簡單的構建工具 (sbt) 更新您的分布。sbt 是一款針對 Scala 的構建工具,用于 Spark 分布中。您可以在 mesos-spark-c86af80 子目錄中執行更新和變異步驟,如下所示:

$ sbt/sbt update compile             

注意,在執行此步驟時,需要連接至 Internet。當完成此操作后,請執行 Spark 快速檢測,如 清單 7 所示。 在該測試中,需要運行 SparkPi 示例,它會計算 pi 的估值(通過單位平方中的任意點采樣)。所顯示的格式需要樣例程序 (spark.examples.SparkPi) 和主機參數,該參數定義了 Mesos 主機(在此例中,是您的本地主機,因為它是一個單節點集群)和要使用的線程數量。注意,在 清單 7 中,執行了兩個任務,而且這兩個任務被序列化(任務 0 開始和結束之后,任務 1 再開始)。


清單 7. 對 Spark 執行快速檢測
              
$ ./run spark.examples.SparkPi local[1] 11/08/26 19:52:33 INFO spark.CacheTrackerActor: Registered actor on port 50501
11/08/26 19:52:33 INFO spark.MapOutputTrackerActor: Registered actor on port 50501
11/08/26 19:52:33 INFO spark.SparkContext: Starting job...
11/08/26 19:52:33 INFO spark.CacheTracker: Registering RDD ID 0 with cache
11/08/26 19:52:33 INFO spark.CacheTrackerActor: Registering RDD 0 with 2 partitions
11/08/26 19:52:33 INFO spark.CacheTrackerActor: Asked for current cache locations
11/08/26 19:52:33 INFO spark.LocalScheduler: Final stage: Stage 0
11/08/26 19:52:33 INFO spark.LocalScheduler: Parents of final stage: List()
11/08/26 19:52:33 INFO spark.LocalScheduler: Missing parents: List()
11/08/26 19:52:33 INFO spark.LocalScheduler: Submitting Stage 0, which has no missing ...
11/08/26 19:52:33 INFO spark.LocalScheduler: Running task 0
11/08/26 19:52:33 INFO spark.LocalScheduler: Size of task 0 is 1385 bytes
11/08/26 19:52:33 INFO spark.LocalScheduler: Finished task 0
11/08/26 19:52:33 INFO spark.LocalScheduler: Running task 1
11/08/26 19:52:33 INFO spark.LocalScheduler: Completed ResultTask(0, 0)
11/08/26 19:52:33 INFO spark.LocalScheduler: Size of task 1 is 1385 bytes
11/08/26 19:52:33 INFO spark.LocalScheduler: Finished task 1
11/08/26 19:52:33 INFO spark.LocalScheduler: Completed ResultTask(0, 1)
11/08/26 19:52:33 INFO spark.SparkContext: Job finished in 0.145892763 s
Pi is roughly 3.14952
$ 

通過增加線程數量,您不僅可以增加線程執行的并行化,還可以用更少的時間執行作業(如 清單 8 所示)。


清單 8. 對包含兩個線程的 Spark 執行另一個快速檢測
              
$ ./run spark.examples.SparkPi local[2] 11/08/26 20:04:30 INFO spark.MapOutputTrackerActor: Registered actor on port 50501
11/08/26 20:04:30 INFO spark.CacheTrackerActor: Registered actor on port 50501
11/08/26 20:04:30 INFO spark.SparkContext: Starting job...
11/08/26 20:04:30 INFO spark.CacheTracker: Registering RDD ID 0 with cache
11/08/26 20:04:30 INFO spark.CacheTrackerActor: Registering RDD 0 with 2 partitions
11/08/26 20:04:30 INFO spark.CacheTrackerActor: Asked for current cache locations
11/08/26 20:04:30 INFO spark.LocalScheduler: Final stage: Stage 0
11/08/26 20:04:30 INFO spark.LocalScheduler: Parents of final stage: List()
11/08/26 20:04:30 INFO spark.LocalScheduler: Missing parents: List()
11/08/26 20:04:30 INFO spark.LocalScheduler: Submitting Stage 0, which has no missing ...
11/08/26 20:04:30 INFO spark.LocalScheduler: Running task 0
11/08/26 20:04:30 INFO spark.LocalScheduler: Running task 1
11/08/26 20:04:30 INFO spark.LocalScheduler: Size of task 1 is 1385 bytes
11/08/26 20:04:30 INFO spark.LocalScheduler: Size of task 0 is 1385 bytes
11/08/26 20:04:30 INFO spark.LocalScheduler: Finished task 0
11/08/26 20:04:30 INFO spark.LocalScheduler: Finished task 1
11/08/26 20:04:30 INFO spark.LocalScheduler: Completed ResultTask(0, 1)
11/08/26 20:04:30 INFO spark.LocalScheduler: Completed ResultTask(0, 0)
11/08/26 20:04:30 INFO spark.SparkContext: Job finished in 0.101287331 s
Pi is roughly 3.14052
$ 

使用 Scala 構建一個簡單的 Spark 應用程序

要構建 Spark 應用程序,您需要單一 Java 歸檔 (JAR) 文件形式的 Spark 及其依賴關系。使用 sbt 在 Spark 的頂級目錄中創建該 JAR 文件,如下所示:

$ sbt/sbt assembly

結果產生一個文件 ./core/target/scala_2.8.1/"Spark Core-assembly-0.3.jar"。將該文件添加至您的 CLASSPATH 中,以便可以訪問它。在本示例中,不會用到此 JAR 文件,因為您將會使用 Scala 解釋器運行它,而不是對其進行編譯。

在本示例中,使用了標準的 MapReduce 轉換(如 清單 9 所示)。該示例從執行必要的 Spark 類導入開始。接著,需要定義您的類 (SparkTest) 及其主方法,用它解析稍后使用的參數。這些參數定義了執行 Spark 的環境(在本例中,該環境是一個單節點集群)。接下來,要創建 SparkContext 對象,它會告知 Spark 如何對您的集群進行訪問。該對象需要兩個參數:Mesos 主機名稱(已傳入)以及您分配給作業的名稱 (SparkTest)。解析命令行中的切片數量,它會告知 Spark 用于作業的線程數量。要設置的最后一項是指定用于 MapReduce 操作的文本文件。

最后,您將了解 Spark 示例的實質,它是由一組轉換組成。使用您的文件時,可調用 flatMap 方法返回一個 RDD(通過指定的函數將文本行分解為標記)。然后通過 map 方法(該方法創建了鍵值對)傳遞此 RDD ,最終通過 ReduceByKey 方法合并鍵值對。合并操作是通過將鍵值對傳遞給 _ + _ 匿名函數來完成的。該函數只采用兩個參數(密鑰和值),并返回將兩者合并所產生的結果(一個 String 和一個 Int)。接著以文本文件的形式發送該值(到輸出目錄)。


清單 9. Scala/Spark 中的 MapReduce (SparkTest.scala)
              
import spark.SparkContext
import SparkContext._

object SparkTest {

  def main( args: Array[String]) {

    if (args.length == 0) {
      System.err.println("Usage: SparkTest 




       




         [




        




         ]")
      System.exit(1)
    }

    val spark = new SparkContext(args(0), "SparkTest")
    val slices = if (args.length > 1) args(1).toInt else 2

    val myFile = spark.textFile("test.txt")
    val counts = myFile.flatMap(line => line.split(" "))
                        .map(word => (word, 1))
                        .reduceByKey(_ + _)

    counts.saveAsTextFile("out.txt")

  }

}

SparkTest.main(args)




        




       

要執行您的腳本,只需要執行以下命令:

$ scala SparkTest.scala local[1]             

您可以在輸出目錄中找到 MapReduce 測試文件(如 output/part-00000)。

其他的大數據分析框架

自從開發了 Hadoop 后,市場上推出了許多值得關注的其他大數據分析平臺。這些平臺范圍廣闊,從簡單的基于腳本的產品到與 Hadoop 類似的生產環境。

名為 bashreduce 的平臺是這些平臺中最簡單的平臺之一,顧名思義,它充許您在 Bash 環境中的多個機器上執行 MapReduce 類型的操作。bashreduce 依賴于您計劃使用的機器集群的 Secure Shell(無密碼),并以腳本的形式存在,通過它,您可以使用 UNIX®-style 工具(sortawknetcat 等)請求作業。

GraphLab 是另一個受人關注的 MapReduce 抽象實現,它側重于機器學習算法的并行實現。在 GraphLab 中,Map 階段會定義一些可單獨(在獨立主機上)執行的計算指令,而 Reduce 階段會對結果進行合并。

最后,大數據場景的一個新成員是來自 推ter 的 Storm(通過收購 BackType 獲得)。Storm 被定義為 “實時處理的 Hadoop”,它主要側重于流處理和持續計算(流處理可以得出計算的結果)。Storm 是用 Clojure 語言(Lisp 語言的一種方言)編寫的,但它支持用任何語言(比如 Ruby 和 Python)編寫的應用程序。推ter 于 2011 年 9 月以開源形式發布 Storm。

結束語

Spark 是不斷壯大的大數據分析解決方案家族中備受關注的新增成員。它不僅為分布數據集的處理提供一個有效框架,而且以高效的方式(通過簡潔的 Scala 腳本)處理分布數據集。Spark 和 Scala 都處在積極發展階段。不過,由于關鍵 Internet 屬性中采用了它們,兩者似乎都已從受人關注的開源軟件過渡成為基礎 Web 技術。

文章出處:IBM developerWorks

 本文由用戶 fmms 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!