用Apache Spark進行大數據處理——第一部分:入門介紹

jopen 9年前發布 | 62K 次閱讀 分布式/云計算/大數據 Apache Spark

什么是Spark

Apache Spark是一個圍繞速度、易用性和復雜分析構建的大數據處理框架。最初在2009年由加州大學伯克利分校的AMPLab開發,并于2010年成為Apache的開源項目之一。

與Hadoop和Storm等其他大數據和MapReduce技術相比,Spark有如下優勢。

首先,Spark為我們提供了一個全面、統一的框架用于管理各種有著不同性質(文本數據、圖表數據等)的數據集和數據源(批量數據或實時的流數據)的大數據處理的需求。

 

Spark可以將Hadoop集群中的應用在內存中的運行速度提升100倍,甚至能夠將應用在磁盤上的運行速度提升10倍。

Spark讓開發者可以快速的用Java、Scala或Python編寫程序。它本身自帶了一個超過80個高階操作符集合。而且還可以用它在shell中以交互式地查詢數據。

除了Map和Reduce操作之外,它還支持SQL查詢,流數據,機器學習和圖表數據處理。開發者可以在一個數據管道用例中單獨使用某一能力或者將這些能力結合在一起使用。

在這個Apache Spark文章系列的第一部分中,我們將了解到什么是Spark,它與典型的MapReduce解決方案的比較以及它如何為大數據處理提供了一套完整的工具。

Hadoop和Spark

Hadoop這項大數據處理技術大概已有十年歷史,而且被看做是首選的大數據集合處理的解決方案。MapReduce是一路計算的優秀解決方案,不 過對于需要多路計算和算法的用例來說,并非十分高效。數據處理流程中的每一步都需要一個Map階段和一個Reduce階段,而且如果要利用這一解決方案, 需要將所有用例都轉換成MapReduce模式。

在下一步開始之前,上一步的作業輸出數據必須要存儲到分布式文件系統中。因此,復制和磁盤存儲會導致這種方式速度變慢。另外Hadoop解決方案中 通常會包含難以安裝和管理的集群。而且為了處理不同的大數據用例,還需要集成多種不同的工具(如用于機器學習的Mahout和流數據處理的Storm)。

如果想要完成比較復雜的工作,就必須將一系列的MapReduce作業串聯起來然后順序執行這些作業。每一個作業都是高時延的,而且只有在前一個作業完成之后下一個作業才能開始啟動。

而Spark則允許程序開發者使用有向無環圖(DAG)開發復雜的多步數據管道。而且還支持跨有向無環圖的內存數據共享,以便不同的作業可以共同處理同一個數據。

Spark運行在現有的Hadoop分布式文件系統基礎之上(HDFS)提供額外的增強功能。它支持將Spark應用部署到現存的Hadoop v1集群(with SIMR – Spark-Inside-MapReduce)或Hadoop v2 YARN集群甚至是Apache Mesos之中。

我們應該將Spark看作是Hadoop MapReduce的一個替代品而不是Hadoop的替代品。其意圖并非是替代Hadoop,而是為了提供一個管理不同的大數據用例和需求的全面且統一的解決方案。

Spark特性

Spark通過在數據處理過程中成本更低的洗牌(Shuffle)方式,將MapReduce提升到一個更高的層次。利用內存數據存儲和接近實時的處理能力,Spark比其他的大數據處理技術的性能要快很多倍。

Spark還支持大數據查詢的延遲計算,這可以幫助優化大數據處理流程中的處理步驟。Spark還提供高級的API以提升開發者的生產力,除此之外還為大數據解決方案提供一致的體系架構模型。

Spark將中間結果保存在內存中而不是將其寫入磁盤,當需要多次處理同一數據集時,這一點特別實用。Spark的設計初衷就是既可以在內存中又可 以在磁盤上工作的執行引擎。當內存中的數據不適用時,Spark操作符就會執行外部操作。Spark可以用于處理大于集群內存容量總和的數據集。

Spark會嘗試在內存中存儲盡可能多的數據然后將其寫入磁盤。它可以將某個數據集的一部分存入內存而剩余部分存入磁盤。開發者需要根據數據和用例評估對內存的需求。Spark的性能優勢得益于這種內存中的數據存儲。

Spark的其他特性包括:

  • 支持比Map和Reduce更多的函數。
  • 優化任意操作算子圖(operator graphs)。
  • 可以幫助優化整體數據處理流程的大數據查詢的延遲計算。
  • 提供簡明、一致的Scala,Java和Python API。
  • 提供交互式Scala和Python Shell。目前暫不支持Java。

Spark是用Scala程序設計語言編寫而成,運行于Java虛擬機(JVM)環境之上。目前支持如下程序設計語言編寫Spark應用:

  • Scala
  • Java
  • Python
  • Clojure
  • R

Spark生態系統

除了Spark核心API之外,Spark生態系統中還包括其他附加庫,可以在大數據分析和機器學習領域提供更多的能力。

這些庫包括:

  • Spark Streaming:
    • Spark Streaming基于微批量方式的計算和處理,可以用于處理實時的流數據。它使用DStream,簡單來說就是一個彈性分布式數據集(RDD)系列,處理實時數據。
    </li>
  • Spark SQL:
    • Spark SQL可以通過JDBC API將Spark數據集暴露出去,而且還可以用傳統的BI和可視化工具在Spark數據上執行類似SQL的查詢。用戶還可以用Spark SQL對不同格式的數據(如JSON,Parquet以及數據庫等)執行ETL,將其轉化,然后暴露給特定的查詢。
    • </ul> </li>
    • Spark MLlib:
      • MLlib是一個可擴展的Spark機器學習庫,由通用的學習算法和工具組成,包括二元分類、線性回歸、聚類、協同過濾、梯度下降以及底層優化原語。
      • </ul> </li>
      • Spark GraphX:
        • GraphX是用于圖計算和并行圖計算的 新的(alpha)Spark API。通過引入彈性分布式屬性圖(Resilient Distributed Property Graph),一種頂點和邊都帶有屬性的有向多重圖,擴展了Spark RDD。為了支持圖計算,GraphX暴露了一個基礎操作符集合(如subgraph,joinVertices和aggregateMessages) 和一個經過優化的Pregel API變體。此外,GraphX還包括一個持續增長的用于簡化圖分析任務的圖算法和構建器集合。
        • </ul> </li> </ul>

          除了這些庫以外,還有一些其他的庫,如BlinkDB和Tachyon。

          BlinkDB是一個近似查詢引擎,用于在海量數據上執行交互式SQL查詢。BlinkDB可以通過犧牲數據精度來提升查詢響應時間。通過在數據樣本上執行查詢并展示包含有意義的錯誤線注解的結果,操作大數據集合。

          Tachyon是一個以內存為中心的 分布式文件系統,能夠提供內存級別速度的跨集群框架(如Spark和MapReduce)的可信文件共享。它將工作集文件緩存在內存中,從而避免到磁盤中 加載需要經常讀取的數據集。通過這一機制,不同的作業/查詢和框架可以以內存級的速度訪問緩存的文件。
          此外,還有一些用于與其他產品集成的適配器,如Cassandra(Spark Cassandra 連接器)和R(SparkR)。Cassandra Connector可用于訪問存儲在Cassandra數據庫中的數據并在這些數據上執行數據分析。

          下圖展示了在Spark生態系統中,這些不同的庫之間的相互關聯。

          用Apache Spark進行大數據處理——第一部分:入門介紹

          圖1. Spark框架中的庫

          我們將在這一系列文章中逐步探索這些Spark庫

          Spark體系架構

          Spark體系架構包括如下三個主要組件:

          • 數據存儲
          • API
          • 管理框架

          接下來讓我們詳細了解一下這些組件。

          數據存儲:

          Spark用HDFS文件系統存儲數據。它可用于存儲任何兼容于Hadoop的數據源,包括HDFS,HBase,Cassandra等。

          API

          利用API,應用開發者可以用標準的API接口創建基于Spark的應用。Spark提供Scala,Java和Python三種程序設計語言的API。

          下面是三種語言Spark API的網站鏈接。

          資源管理:

          Spark既可以部署在一個單獨的服務器也可以部署在像Mesos或YARN這樣的分布式計算框架之上。

          下圖2展示了Spark體系架構模型中的各個組件。

          用Apache Spark進行大數據處理——第一部分:入門介紹

          圖2 Spark體系架構

          彈性分布式數據集

          彈性分布式數據集(基于Matei的研究論文)或RDD是Spark框架中的核心概念。可以將RDD視作數據庫中的一張表。其中可以保存任何類型的數據。Spark將數據存儲在不同分區上的RDD之中。

          RDD可以幫助重新安排計算并優化數據處理過程。

          此外,它還具有容錯性,因為RDD知道如何重新創建和重新計算數據集。

          RDD是不可變的。你可以用變換(Transformation)修改RDD,但是這個變換所返回的是一個全新的RDD,而原有的RDD仍然保持不變。

          RDD支持兩種類型的操作:

          • 變換(Transformation)
          • 行動(Action)

          變換:變換的返回值是一個新的RDD集合,而不是單個值。調用一個變換方法,不會有任何求值計算,它只獲取一個RDD作為參數,然后返回一個新的RDD。

          變換函數包括:map,filter,flatMap,groupByKey,reduceByKey,aggregateByKey,pipe和coalesce。

          行動:行動操作計算并返回一個新的值。當在一個RDD對象上調用行動函數時,會在這一時刻計算全部的數據處理查詢并返回結果值。

          行動操作包括:reduce,collect,count,first,take,countByKey以及foreach。

          如何安裝Spark

          安裝和使用Spark有幾種不同方式。你可以在自己的電腦上將Spark作為一個獨立的框架安裝或者從諸如Cloudera,HortonWorks或MapR之類的供應商處獲取一個Spark虛擬機鏡像直接使用。或者你也可以使用在云端環境(如Databricks Cloud)安裝并配置好的Spark。

          在本文中,我們將把Spark作為一個獨立的框架安裝并在本地啟動它。最近Spark剛剛發布了1.2.0版本。我們將用這一版本完成示例應用的代碼展示。

          如何運行Spark

          當你在本地機器安裝了Spark或使用了基于云端的Spark后,有幾種不同的方式可以連接到Spark引擎。

          下表展示了不同的Spark運行模式所需的Master URL參數。

          用Apache Spark進行大數據處理——第一部分:入門介紹

          如何與Spark交互

          Spark啟動并運行后,可以用Spark shell連接到Spark引擎進行交互式數據分析。Spark shell支持Scala和Python兩種語言。Java不支持交互式的Shell,因此這一功能暫未在Java語言中實現。

          可以用spark-shell.cmd和pyspark.cmd命令分別運行Scala版本和Python版本的Spark Shell。

          Spark網頁控制臺

          不論Spark運行在哪一種模式下,都可以通過訪問Spark網頁控制臺查看Spark的作業結果和其他的統計數據,控制臺的URL地址如下:

          http://localhost:4040

          Spark控制臺如下圖3所示,包括Stages,Storage,Environment和Executors四個標簽頁

          (點擊查看大圖)

          用Apache Spark進行大數據處理——第一部分:入門介紹

          圖3. Spark網頁控制臺

          共享變量

          Spark提供兩種類型的共享變量可以提升集群環境中的Spark程序運行效率。分別是廣播變量和累加器。

          廣播變量:廣播變量可以在每臺機器上緩存只讀變量而不需要為各個任務發送該變量的拷貝。他們可以讓大的輸入數據集的集群拷貝中的節點更加高效。

          下面的代碼片段展示了如何使用廣播變量。

          //
          // Broadcast Variables
          //
          val broadcastVar = sc.broadcast(Array(1, 2, 3))
          broadcastVar.value

          累加器:只有在使用相關操作時才會添加累加器,因此它可以很好地支持并行。累加器可用于實現計數(就像在MapReduce中那樣)或求和。可以用add方法將運行在集群上的任務添加到一個累加器變量中。不過這些任務無法讀取變量的值。只有驅動程序才能夠讀取累加器的值。

          下面的代碼片段展示了如何使用累加器共享變量:

          //
          // Accumulators
          //

          val accum = sc.accumulator(0, "My Accumulator")

          sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)

          accum.value</pre>

          Spark應用示例

          本篇文章中所涉及的示例應用是一個簡單的字數統計應用。這與學習用Hadoop進行大數據處理時的示例應用相同。我們將在一個文本文件上執行一些數 據分析查詢。本示例中的文本文件和數據集都很小,不過無須修改任何代碼,示例中所用到的Spark查詢同樣可以用到大容量數據集之上。

          為了讓討論盡量簡單,我們將使用Spark Scala Shell。

          首先讓我們看一下如何在你自己的電腦上安裝Spark。

          前提條件:

          • 為了讓Spark能夠在本機正常工作,你需要安裝Java開發工具包(JDK)。這將包含在下面的第一步中。
          • 同樣還需要在電腦上安裝Spark軟件。下面的第二步將介紹如何完成這項工作。

          注:下面這些指令都是以Windows環境為例。如果你使用不同的操作系統環境,需要相應的修改系統變量和目錄路徑已匹配你的環境。

          I. 安裝JDK

          1)從Oracle網站上下載JDK。推薦使用JDK 1.7版本

          將JDK安裝到一個沒有空格的目錄下。對于Windows用戶,需要將JDK安裝到像c:\dev這樣的文件夾下,而不能安裝到“c: \Program Files”文件夾下。“c:\Program Files”文件夾的名字中包含空格,如果軟件安裝到這個文件夾下會導致一些問題。

          注:不要在“c:\Program Files”文件夾中安裝JDK或(第二步中所描述的)Spark軟件。

          2)完成JDK安裝后,切換至JDK 1.7目錄下的”bin“文件夾,然后鍵入如下命令,驗證JDK是否正確安裝:

          java -version

          如果JDK安裝正確,上述命令將顯示Java版本。

          II. 安裝Spark軟件:

          Spark網站上下載最新版本 的Spark。在本文發表時,最新的Spark版本是1.2。你可以根據Hadoop的版本選擇一個特定的Spark版本安裝。我下載了與Hadoop 2.4或更高版本匹配的Spark,文件名是spark-1.2.0-bin-hadoop2.4.tgz。

          將安裝文件解壓到本地文件夾中(如:c:\dev)。

          為了驗證Spark安裝的正確性,切換至Spark文件夾然后用如下命令啟動Spark Shell。這是Windows環境下的命令。如果使用Linux或Mac OS,請相應地編輯命令以便能夠在相應的平臺上正確運行。

          c:
          cd c:\dev\spark-1.2.0-bin-hadoop2.4
          bin\spark-shell

          如果Spark安裝正確,就能夠在控制臺的輸出中看到如下信息。

          ….
          15/01/17 23:17:46 INFO HttpServer: Starting HTTP Server
          15/01/17 23:17:46 INFO Utils: Successfully started service 'HTTP class server' on port 58132.
          Welcome to
                __              
               / /   __/ /
              \ \/  \/ _ `/ /  '/
             /__/ ._/\,// //\\   version 1.2.0
                /_/

          Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_71) Type in expressions to have them evaluated. Type :help for more information. …. 15/01/17 23:17:53 INFO BlockManagerMaster: Registered BlockManager 15/01/17 23:17:53 INFO SparkILoop: Created spark context.. Spark context available as sc.</pre>

          可以鍵入如下命令檢查Spark Shell是否工作正常。

          sc.version

          (或)

          sc.appName

          完成上述步驟之后,可以鍵入如下命令退出Spark Shell窗口:

          :quit

          如果想啟動Spark Python Shell,需要先在電腦上安裝Python。你可以下載并安裝Anaconda,這是一個免費的Python發行版本,其中包括了一些比較流行的科學、數學、工程和數據分析方面的Python包。

          然后可以運行如下命令啟動Spark Python Shell:

          c:
          cd c:\dev\spark-1.2.0-bin-hadoop2.4
          bin\pyspark

          Spark示例應用

          完成Spark安裝并啟動后,就可以用Spark API執行數據分析查詢了。

          這些從文本文件中讀取并處理數據的命令都很簡單。我們將在這一系列文章的后續文章中向大家介紹更高級的Spark框架使用的用例。

          首先讓我們用Spark API運行流行的Word Count示例。如果還沒有運行Spark Scala Shell,首先打開一個Scala Shell窗口。這個示例的相關命令如下所示:

          import org.apache.spark.SparkContext
          import org.apache.spark.SparkContext._

          val txtFile = "README.md" val txtData = sc.textFile(txtFile) txtData.cache()</pre>

          我們可以調用cache函數將上一步生成的RDD對象保存到緩存中,在此之后Spark就不需要在每次數據查詢時都重新計算。需要注意的 是,cache()是一個延遲操作。在我們調用cache時,Spark并不會馬上將數據存儲到內存中。只有當在某個RDD上調用一個行動時,才會真正執 行這個操作。

          現在,我們可以調用count函數,看一下在文本文件中有多少行數據。

          txtData.count()

          然后,我們可以執行如下命令進行字數統計。在文本文件中統計數據會顯示在每個單詞的后面。

          val wcData = txtData.flatMap(l => l.split(" ")).map(word => (word, 1)).reduceByKey( + )

          wcData.collect().foreach(println)</pre>

          如果想查看更多關于如何使用Spark核心API的代碼示例,請參考網站上的Spark文檔

          后續計劃

          在后續的系列文章中,我們將從Spark SQL開始,學習更多關于Spark生態系統的其他部分。之后,我們將繼續了解Spark Streaming,Spark MLlib和Spark GraphX。我們也會有機會學習像Tachyon和BlinkDB等框架。

          小結

          在本文中,我們了解了Apache Spark框架如何通過其標準API幫助完成大數據處理和分析工作。我們還對Spark和傳統的MapReduce實現(如Apache Hadoop)進行了比較。Spark與Hadoop基于相同的HDFS文件存儲系統,因此如果你已經在Hadoop上進行了大量投資和基礎設施建設,可 以一起使用Spark和MapReduce。

          此外,也可以將Spark處理與Spark SQL、機器學習以及Spark Streaming結合在一起。關于這方面的內容我們將在后續的文章中介紹。

          利用Spark的一些集成功能和適配器,我們可以將其他技術與Spark結合在一起。其中一個案例就是將Spark、Kafka和Apache Cassandra結合在一起,其中Kafka負責輸入的流式數據,Spark完成計算,最后Cassandra NoSQL數據庫用于保存計算結果數據。

          不過需要牢記的是,Spark生態系統仍不成熟,在安全和與BI工具集成等領域仍然需要進一步的改進。

          參考文獻

          關于作者

          用Apache Spark進行大數據處理——第一部分:入門介紹Srini Penchikala目 前是一家金融服務機構的軟件架構師,這個機構位于德克薩斯州的奧斯汀。他在軟件系統架構、設計和開發方面有超過20年的經驗。Srini目前正在撰寫一本 關于NoSQL數據庫模式的書。他還是曼寧出版社出版的《Spring Roo in Action》一書的合著者(http://www.manning.com/SpringRooinAction)。他還曾經出席各種會議,如 JavaOne,SEI Architecture Technology Conference(SATURN),IT Architect Conference(ITARC),No Fluff Just Stuff,NoSQL Now和Project World Conference等。Srini還在InfoQ,The ServerSide,OReilly Network(ONJava),DevX Java,java.net以及JavaWorld等網站上發表過很多關于軟件系統架構、安全和風險管理以及NoSQL數據庫等方面的文章。他還是InfoQ NoSQL數據庫社區的責任編輯

           

          查看英文原文:Big Data Processing with Apache Spark – Part 1: Introduction

          來自:http://www.infoq.com/cn/articles/apache-spark-introduction

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!