Spark介紹

jopen 11年前發布 | 109K 次閱讀 Spark 分布式/云計算/大數據

  • Spark是UC Berkeley AMP lab所開源的類Hadoop MapReduce的通用的并行計算框架,Spark基于map reduce算法實現的分布式計算,擁有Hadoop MapReduce所具有的優點;但不同于MapReduce的是Job中間輸出和結果可以保存在內存中,從而不再需要讀寫HDFS,因此Spark能更好地適用于數據挖掘與機器學習等需要迭代的map reduce的算法。
BDAS</span>

提到Spark Streaming,我們不得不說一下BDAS(Berkeley Data Analytics Stack),這個伯克利大學提出的關于數據分析的軟件棧。從它的視角來看,目前的大數據處理可以分為如以下三個類型。    

</div> </blockquote>

    • 復雜的批量數據處理(batch data processing),通常的時間跨度在數十分鐘到數小時之間。
    • 基于歷史數據的交互式查詢(interactive query),通常的時間跨度在數十秒到數分鐘之間。
    • 基于實時數據流的數據處理(streaming data processing),通常的時間跨度在數百毫秒到數秒之間。        
    • </ul> </ul> </div>

      目 前已有很多相對成熟的開源軟件來處理以上三種情景,我們可以利用MapReduce來進行批量數據處理,可以用Impala來進行交互式查詢,對于流式數 據處理,我們可以采用Storm。對于大多數互聯網公司來說,一般都會同時遇到以上三種情景,那么在使用的過程中這些公司可能會遇到如下的不便。    


      </blockquote>

        • 三種情景的輸入輸出數據無法無縫共享,需要進行格式相互轉換。
        • 每一個開源軟件都需要一個開發和維護團隊,提高了成本。
        • 在同一個集群中對各個系統協調資源分配比較困難。        
        • </ul> </ul> </div>

          BDAS 就是以Spark為基礎的一套軟件棧,利用基于內存的通用計算模型將以上三種情景一網打盡,同時支持Batch、Interactive、 Streaming的處理,且兼容支持HDFS和S3等分布式文件系統,可以部署在YARN和Mesos等流行的集群資源管理器之上。BDAS的構架如圖 1所示,其中Spark可以替代MapReduce進行批處理,利用其基于內存的特點,特別擅長迭代式和交互式數據處理;Shark處理大規模數據的 SQL查詢,兼容Hive的HQL。本文要重點介紹的Spark    Streaming,在整個BDAS中進行大規模流式處理。
          </blockquote>





          </div> Spark與Hadoop的對比</span></span>

          • Spark的中間數據放到內存中,對于迭代運算效率更高
            • Spark更適合于迭代運算比較多的ML和DM運算。因為在Spark里面,有RDD的抽象概念。
            </li>

          • Spark比Hadoop更通用
            • Spark 提供的數據集操作類型有很多種,不像Hadoop只提供了Map和Reduce兩種操作。比如 map, filter, flatMap, sample, groupByKey, reduceByKey, union, join, cogroup, mapValues,sort,partionBy 等多種操作類型,Spark把這些操作稱為Transformations。同時還提供 Count,collect, reduce, lookup, save等多種actions操作。
            • 這些多種多樣的數據集操作類型,給給開發上層應用的用戶提供了方便。各個處理節點之間的通信模型不再像Hadoop那樣就是唯一的Data Shuffle一種模式。用戶可以命名,物化,控制中間結果的存儲、分區等。可以說編程模型比Hadoop更靈活。
            • 不過由于RDD的特性,Spark不適用那種異步細粒度更新狀態的應用,例如web服務的存儲或者是增量的web爬蟲和索引。就是對于那種增量修改的應用模型不適合。
            • </ul> </li>

            • 容錯性
              • 在分布式數據集計算時通過checkpoint來實現容錯,而checkpoint有兩種方式,一個是checkpoint data,一個是logging the updates。用戶可以控制采用哪種方式來實現容錯。
              • </ul> </li>

              • 可用性
                • Spark通過提供豐富的Scala, Java,Python API及交互式Shell來提高可用性。

                • </ul> </li> </ul> Spark與Hadoop的結合</span></span>

                  • Spark可以直接對HDFS進行數據的讀寫,同樣支持Spark on YARN。Spark可以與MapReduce運行于同集群中,共享存儲資源與計算,數據倉庫Shark實現上借用Hive,幾乎與Hive完全兼容。
                  Spark的適用場景</span></span>

                  • Spark是基于內存的迭代計算框架,適用于需要多次操作特定數據集的應用場合。需要反復操作的次數越多,所需讀取的數據量越大,受益越大,數據量小但是計算密集度較大的場合,受益就相對較小
                  • 由于RDD的特性,Spark不適用那種異步細粒度更新狀態的應用,例如web服務的存儲或者是增量的web爬蟲和索引。就是對于那種增量修改的應用模型不適合。
                  • 總的來說Spark的適用面比較廣泛且比較通用。
                  • </ul> 運行模式</span></span>

                    • Local(多用于測試)
                    • Standalone模式
                    • Amazon EC2 
                    • Mesoes模式
                    • yarn模式
                    • </ul> Spark生態系統</span></span>

                      Spark介紹

                      • Shark ( Hive on Spark):

                        Shark基本上就是在Spark的框架基礎上提供和Hive一樣的HiveQL命令接口,為了最大程度的保持和Hive的兼容性,Shark使用了 Hive的API來實現query Parsing和 Logic Plan generation,最后的PhysicalPlan execution階段用Spark代替Hadoop MapReduce。通過配置Shark參數,Shark可以自動在內存中緩存特定的RDD,實現數據重用,進而加快特定數據集的檢索。同時,Shark 通過UDF用戶自定義函數實現特定的數據分析學習算法,使得SQL數據查詢和運算分析能結合在一起,最大化RDD的重復使用。</span></span></li>

                      • Spark streaming: 構 建在Spark上處理Stream數據的框架,基本的原理是將Stream數據分成小的時間片斷(幾秒),以類似batch批量處理的方式來處理這小部分 數據。Spark

                        Streaming構建在Spark上,一方面是因為Spark的低延遲執行引擎(100ms+)可以用于實時計算,另一方面相比基于Record的其它 處理框架(如Storm),RDD數據集更容易做高效的容錯處理。此外小批量處理的方式使得它可以同時兼容批量和實時數據處理的邏輯和算法。方便了一些需 要歷史數據和實時數據聯合分析的特定應用場合。</span></span></li>

                      • Bagel: Pregel on Spark,可以用Spark進行圖計算,這是個非常有用的小項目。Bagel自帶了一個例子,實現了Google的PageRank算法。
                      • </ul> 在業界的使用</span></span>

                        Spark項目在2009年啟動,2010年開源, 現在使用的有:Berkeley, Princeton, Klout, Foursquare, Conviva, Quantifind, Yahoo! Research & others, 淘寶等,豆瓣也在使用Spark的python克隆版Dpark。


                        </blockquote>

                        Spark核心概念

                        Resilient Distributed Dataset (RDD)彈性分布數據集

                        • RDD 是Spark的最基本抽象,是對分布式內存的抽象使用,實現了以操作本地集合的方式來操作分布式數據集的抽象實現。RDD是Spark最核心的東西,它表 示已被分區,不可變的并能夠被并行操作的數據集合,不同的數據集格式對應不同的RDD實現。RDD必須是可序列化的。RDD可以cache到內存中,每次 對RDD數據集的操作之后的結果,都可以存放到內存中,下一個操作可以直接從內存中輸入,省去了MapReduce大量的磁盤IO操作。這對于迭代運算比 較常見的機器學習算法, 交互式數據挖掘來說,效率提升比較大。
                        • RDD的特點:
                          • 它是在集群節點上的不可變的、已分區的集合對象。
                          • 通過并行轉換的方式來創建如(map, filter, join, etc)。
                          • 失敗自動重建。
                          • 可以控制存儲級別(內存、磁盤等)來進行重用。
                          • 必須是可序列化的。
                          • 是靜態類型的。
                          • </ul> </li>

                          • RDD的好處
                            • RDD只能從持久存儲或通過Transformations操作產生,相比于分布式共享內存(DSM)可以更高效實現容錯,對于丟失部分數據分區只需根據它的lineage就可重新計算出來,而不需要做特定的Checkpoint。
                            • RDD的不變性,可以實現類Hadoop MapReduce的推測式執行。
                            • RDD的數據分區特性,可以通過數據的本地性來提高性能,這與Hadoop MapReduce是一樣的。
                            • RDD都是可序列化的,在內存不足時可自動降級為磁盤存儲,把RDD存儲于磁盤上,這時性能會有大的下降但不會差于現在的MapReduce。
                            • </ul> </li>

                            • RDD的存儲與分區
                              • 用戶可以選擇不同的存儲級別存儲RDD以便重用。
                              • 當前RDD默認是存儲于內存,但當內存不足時,RDD會spill到disk。
                              • RDD在需要進行分區把數據分布于集群中時會根據每條記錄Key進行分區(如Hash 分區),以此保證兩個數據集在Join時能高效。
                              • </ul> </li>

                              • RDD的內部表示
                                在RDD的內部實現中每個RDD都可以使用5個方面的特性來表示:
                                • 分區列表(數據塊列表)
                                • 計算每個分片的函數(根據父RDD計算出此RDD)
                                • 對父RDD的依賴列表
                                • 對key-value RDD的Partitioner【可選】
                                • 每個數據分片的預定義地址列表(如HDFS上的數據塊的地址)【可選】
                                • </ul> </li>


                                • RDD的存儲級別
                                  RDD
                                  根據useDiskuseMemorydeserializedreplication四個參數的組合提供了11種存儲級別:
                                • </ul> </div>

                                      val NONE = new StorageLevel(false, false, false)
                                  </div>

                                      val DISK_ONLY = new StorageLevel(true, false, false)
                                  </div>

                                      val DISK_ONLY_2 = new StorageLevel(true, false, false, 2)
                                  </div>

                                      val MEMORY_ONLY = new StorageLevel(false, true, true)
                                  </div>

                                      val MEMORY_ONLY_2 = new StorageLevel(false, true, true, 2)
                                  </div>

                                      val MEMORY_ONLY_SER = new StorageLevel(false, true, false)
                                  </div>

                                      val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, 2)
                                  </div>

                                      val MEMORY_AND_DISK = new StorageLevel(true, true, true)
                                  </div>

                                      val MEMORY_AND_DISK_2 = new StorageLevel(true, true, true, 2)
                                  </div>

                                      val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false)
                                  </div>

                                      val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false,2)
                                  </div> </blockquote>

                                  •  RDD定義了各種操作,不同類型的數據由不同的RDD類抽象表示,不同的操作也由RDD進行抽實現。
                                  RDD的生成</span>
                                  </span>
                                  • RDD有兩種創建方式:</span>

                                    1、從Hadoop文件系統(或與Hadoop兼容的其它存儲系統)輸入(例如HDFS)創建。

                                    </div>

                                    2、從父RDD轉換得到新RDD。

                                    </li>

                                  • 下面來看一從Hadoop文件系統生成RDD的方式,如:val file = spark.textFile("hdfs://...")file變量就是RDD(實際是HadoopRDD實例),生成的它的核心代碼如下:


                                  • ·  1
                                    2
                                    3
                                    4
                                    5
                                    6
                                    7
                                    8
                                    9

                                    ·   // SparkContext根據文件/目錄及可選的分片數創建RDD, 這里我們可以看到SparkHadoop MapReduce很像 
                                        // 
                                    需要InputFormat, KeyValue的類型,其實Spark使用的HadoopInputFormat, Writable類型。 


                                        def textFile(path: String, minSplits: Int = defaultMinSplits): RDD[String] = { 
                                            hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], 
                                            classOf[Text], minSplits) .map(pair => pair._2.toString) }
                                     
                                        // 
                                    根據Hadoop配置,及InputFormat等創建HadoopRDD  
                                        new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits)    
                                     

                                    ·          

                                    1. RDD進行計算時,RDDHDFS讀取數據時與Hadoop MapReduce幾乎一樣的:

                                    ·  1
                                    2
                                    3
                                    4
                                    5
                                    6
                                    7
                                    8
                                    9
                                    10
                                    11
                                    12
                                    13
                                    14
                                    15
                                    16
                                    17

                                    ·      // 根據hadoop配置和分片從InputFormat中獲取RecordReader進行數據的讀取。 
                                        reader = fmt.getRecordReader(split.inputSplit.value, conf,Reporter.NULL)
                                     
                                        val key: K = reader.createKey()
                                        val value: V = reader.createValue()
                                     
                                        //
                                    使用Hadoop MapReduceRecordReader讀取數據,每個KeyValue對以元組返回。
                                        override def getNext() = {
                                        try {
                                          finished = !reader.next(key, value)
                                        } catch {
                                          case eof: EOFException =>
                                            finished = true
                                        }
                                          (key, value)
                                        }
                                     


                                  RDD的轉換與操作
                                  • 對于RDD可以有兩種計算方式:轉換(返回值還是一個RDD)與操作(返回值不是一個RDD)。
                                  • 轉 換(Transformations) (如:map, filter, groupBy, join等),Transformations操作是Lazy的,也就是說從一個RDD轉換生成另一個RDD的操作不是馬上執行,Spark在遇到 Transformations操作時只會記錄需要這樣的操作,并不會去執行,需要等到有Actions操作的時候才會真正啟動計算過程進行計算。
                                  • 操作(Actions) (如:count, collect, save等),Actions操作會返回結果或把RDD數據寫到存儲系統中。Actions是觸發Spark啟動計算的動因。
                                  下面使用一個例子來示例說明Transformations與Actions在Spark的使用。

                                  ·  1
                                  2
                                  3
                                  4
                                  5
                                  6
                                  7
                                  8
                                  9
                                  10
                                  11
                                  12
                                  13
                                  14

                                  ·      val sc = new SparkContext(master, "Example",System.getenv("SPARK_HOME"), 
                                          Seq(System.getenv("SPARK_TEST_JAR")))
                                   
                                      val rdd_A = sc.textFile(hdfs://.....)
                                      val rdd_B = rdd_A.flatMap((line => line.split("\\s+"))).map(word => (word,1))
                                   
                                      val rdd_C = sc.textFile(hdfs://.....)
                                      val rdd_D = rdd_C.map(line => (line.substring(10), 1))
                                      val rdd_E = rdd_D.reduceByKey((a, b) => a + b)
                                   
                                      val rdd_F = rdd_B.jion(rdd_E)
                                   
                                      rdd_F.saveAsSequenceFile(hdfs://....)


                                  Lineage(血統)

                                  • 利 用內存加快數據加載,在眾多的其它的In-Memory類數據庫或Cache類系統中也有實現,Spark的主要區別在于它處理分布式運算環境下的數據容 錯性(節點實效/數據丟失)問題時采用的方案。為了保證RDD中數據的魯棒性,RDD數據集通過所謂的血統關系(Lineage)記住了它是如何從其它 RDD中演變過來的。相比其它系統的細顆粒度的內存數據更新級別的備份或者LOG機制,RDD的Lineage記錄的是粗顆粒度的特定數據轉換 (Transformation)操作(filter, map, join etc.)行為。當這個RDD的部分分區數據丟失時,它可以通過Lineage獲取足夠的信息來重新運算和恢復丟失的數據分區。這種粗顆粒的數據模型,限 制了Spark的運用場合,但同時相比細顆粒度的數據模型,也帶來了性能的提升。
                                  • RDD 在Lineage依賴方面分為兩種Narrow Dependencies與Wide Dependencies用來解決數據容錯的高效性。Narrow Dependencies是指父RDD的每一個分區最多被一個子RDD的分區所用,表現為一個父RDD的分區對應于一個子RDD的分區或多個父RDD的分 區對應于一個子RDD的分區,也就是說一個父RDD的一個分區不可能對應一個子RDD的多個分區。Wide Dependencies是指子RDD的分區依賴于父RDD的多個分區或所有分區,也就是說存在一個父RDD的一個分區對應一個子RDD的多個分區。對與 Wide Dependencies,這種計算的輸入和輸出在不同的節點上,lineage方法對與輸入節點完好,而輸出節點宕機時,通過重新計算,這種情況下,這 種方法容錯是有效的,否則無效,因為無法重試,需要向上其祖先追溯看是否可以重試(這就是lineage,血統的意思),Narrow Dependencies對于數據的重算開銷要遠小于Wide Dependencies的數據重算開銷。
                                  容錯

                                  • 在 RDD計算,通過checkpint進行容錯,做checkpoint有兩種方式,一個是checkpoint data,一個是logging the updates。用戶可以控制采用哪種方式來實現容錯,默認是logging the updates方式,通過記錄跟蹤所有生成RDD的轉換(transformations)也就是記錄每個RDD的lineage(血統)來重新計算生成 丟失的分區數據。
                                  資源管理與作業調度

                                  • Spark 對于資源管理與作業調度可以使用Standalone(獨立模式),Apache Mesos及Hadoop YARN來實現。 Spark on Yarn在Spark0.6時引用,但真正可用是在現在的branch-0.8版本。Spark on Yarn遵循YARN的官方規范實現,得益于Spark天生支持多種Scheduler和Executor的良好設計,對YARN的支持也就非常容 易,Spark on Yarn的大致框架圖。 
                                  • 讓Spark運行于YARN上與Hadoop共用集群資源可以提高資源利用率。

                                   編程接口
                                  • Spark 通過與編程語言集成的方式暴露RDD的操作,類似于DryadLINQ和FlumeJava,每個數據集都表示為RDD對象,對數據集的操作就表示成對 RDD對象的操作。Spark主要的編程語言是Scala,選擇Scala是因為它的簡潔性(Scala可以很方便在交互式下使用)和性能(JVM上的靜 態強類型語言)。
                                  • Spark 和Hadoop MapReduce類似,由Master(類似于MapReduce的Jobtracker)和Workers(Spark的Slave工作節點)組成。 用戶編寫的Spark程序被稱為Driver程序,Dirver程序會連接master并定義了對各RDD的轉換與操作,而對RDD的轉換與操作通過 Scala閉包(字面量函數)來表示,Scala使用Java對象來表示閉包且都是可序列化的,以此把對RDD的閉包操作發送到各Workers節點。 Workers存儲著數據分塊和享有集群內存,是運行在工作節點上的守護進程,當它收到對RDD的操作時,根據數據分片信息進行本地化數據操作,生成新的 數據分片、返回結果或把RDD寫入存儲系統。 
                                  Scala
                                  • Spark 使用Scala開發,默認使用Scala作為編程語言。編寫Spark程序比編寫Hadoop MapReduce程序要簡單的多,SparK提供了Spark-Shell,可以在Spark-Shell測試程序。寫SparK程序的一般步驟就是創 建或使用(SparkContext)實例,使用SparkContext創建RDD,然后就是對RDD進行操作。如:

                                   

                                  ·  1
                                  2
                                  3
                                  4

                                  ·      val sc = new SparkContext(master, appName, [sparkHome], [jars])
                                      val textFile = sc.textFile("hdfs://.....") 
                                      textFile.map(....).filter(.....).....
                                   

                                      

                                  Java

                                  • Spark支持Java編程,但對于使用Java就沒有了Spark-Shell這樣方便的工具,其它與Scala編程是一樣的,因為都是JVM上的語言,ScalaJava可以互操作,Java編程接口其實就是對Scala的封裝。如:

                                  ·  1
                                  2
                                  3
                                  4
                                  5
                                  6
                                  7
                                  8
                                  9
                                  10

                                  ·      JavaSparkContext sc = new JavaSparkContext(...);  
                                      JavaRDD lines = ctx.textFile("hdfs://..."); 
                                      JavaRDD words = lines.flatMap( 
                                        new FlatMapFunction<StringString>() { 
                                           public Iterable call(String s) { 
                                              return Arrays.asList(s.split(" ")); 
                                           } 
                                         } 
                                      );
                                      


                                  Python

                                  • 現在Spark也提供了Python編程接口,Spark使用py4j來實現pythonjava的互操作,從而實現使用python編寫Spark程序。Spark也同樣提供了pyspark,一個Sparkpython shell,可以以交互式的方式使用Python編寫Spark程序。 如:

                                  ·  1
                                  2
                                  3
                                  4
                                  5

                                  ·      from pyspark import SparkContext 
                                      sc = SparkContext("local", "Job Name", pyFiles=['MyFile.py', 'lib.zip', 'app.egg']) 
                                      words = sc.textFile("/usr/share/dict/words") 
                                      words.filter(lambda w: w.startswith("spar")).take(5)
                                   



                                   使用示例


                                  Standalone模式

                                  為方便Spark的推廣使用,Spark提供了Standalone模式,Spark一開始就設計運行于Apache Mesos資源管理框架上,這是非常好的設計,但是卻帶了部署測試的復雜性。為了讓Spark能更方便的部署和嘗試,Spark因此提供了Standalone運行模式,它由一個Spark Master和多個Spark worker組成,與Hadoop MapReduce1很相似,就連集群啟動方式都幾乎是一樣。

                                  Standalone模式運行Spark集群

                                  o    下載Scala2.9.3,并配置SCALA_HOME

                                  o    下載Spark代碼(可以使用源碼編譯也可以下載編譯好的版本)這里下載 編譯好的版本(http://spark-project.org/download/spark-0.7.3-prebuilt-cdh4.tgz

                                  o    解壓spark-0.7.3-prebuilt-cdh4.tgz安裝包

                                  1. 修改配置(conf/* slaves: 配置工作節點的主機名 spark-env.sh:配置環境變量。

                                  ·  1
                                  2
                                  3
                                  4
                                  5
                                  6
                                  7
                                  8
                                  9
                                  10

                                  ·  SCALA_HOME=/home/spark/scala-2.9.3 
                                  JAVA_HOME=/home/spark/jdk1.6.0_45 
                                  SPARK_MASTER_IP=spark1             
                                  SPARK_MASTER_PORT=30111 
                                  SPARK_MASTER_WEBUI_PORT=30118 
                                  SPARK_WORKER_CORES=2 SPARK_WORKER_MEMORY=4g 
                                  SPARK_WORKER_PORT=30333 
                                  SPARK_WORKER_WEBUI_PORT=30119 
                                  SPARK_WORKER_INSTANCES=1
                                   

                                  o     

                                  o    Hadoop配置copyconf目錄下

                                  o    master主機上對其它機器做ssh無密碼登錄

                                  o    把配置好的Spark程序使用scp copy到其它機器

                                  1. master啟動集群

                                  ·  1
                                  2

                                  ·  $SPARK_HOME/start-all.sh
                                   

                                  o     

                                  yarn模式

                                  ·         Spark-shell現在還不支持Yarn模式,使用Yarn模式運行,需要把Spark程序全部打包成一個jar包提交到Yarn上運行。目錄只有branch-0.8版本才真正支持Yarn

                                  ·         Yarn模式運行Spark

                                  1. 下載Spark代碼.


                                  ·  1
                                  2

                                  ·  git clone git://github.com/mesos/spark 
                                   

                                  o     

                                  1. 切換到branch-0.8


                                  ·  1
                                  2
                                  3

                                  ·  cd spark 
                                  git checkout -b yarn --track origin/yarn 
                                   

                                  o     

                                  1. 使用sbt編譯Spark

                                  ·  1
                                  2
                                  3
                                  4

                                  ·  $SPARK_HOME/sbt/sbt 
                                  package 
                                  > assembly
                                   

                                  o     

                                  o    Hadoop yarn配置copyconf目錄下

                                  1. 運行測試

                                  ·  1
                                  2
                                  3
                                  4

                                  ·  SPARK_JAR=./core/target/scala-2.9.3/spark-core-assembly-0.8.0-SNAPSHOT.jar \ 
                                  ./run spark.deploy.yarn.Client --jar examples/target/scala-2.9.3/ \ 
                                  --class spark.examples.SparkPi --args yarn-standalone
                                      

                                  使用Spark-shell

                                  ·         Spark-shell使用很簡單,當SparkStandalon模式運行后,使用$SPARK_HOME/spark-shell進入shell即可,在Spark-shellSparkContext已經創建好了,實例名為sc可以直接使用,還有一個需要注意的是,在Standalone模式下,Spark默認使用的調度器的FIFO調度器而不是公平調度,而Spark-shell作為一個Spark程序一直運行在Spark上,其它的Spark程序就只能排隊等待,也就是說同一時間只能有一個Spark-shell在運行。

                                  1. Spark-shell上寫程序非常簡單,就像在Scala Shell上寫程序一樣。

                                  ·  1
                                  2
                                  3
                                  4
                                  5
                                  6
                                  7
                                  8
                                  9

                                  ·      scala> val textFile = sc.textFile("hdfs://hadoop1:2323/user/data") 
                                      textFile: spark.RDD[String] = spark.MappedRDD@2ee9b6e3
                                   
                                      scala> textFile.count() // Number of items in this RDD
                                      res0: Long = 21374
                                   
                                      scala> textFile.first() // First item in this RDD
                                      res1: String = # Spark
                                   

                                  ·          

                                  編寫Driver程序

                                  1. SparkSpark程序稱為Driver程序,編寫Driver程序很簡單幾乎與在Spark-shell上寫程序是一樣的,不同的地方就是SparkContext需要自己創建。如WorkCount程序如下:

                                  ·  1
                                  2
                                  3
                                  4
                                  5
                                  6
                                  7
                                  8
                                  9
                                  10
                                  11
                                  12
                                  13
                                  14
                                  15
                                  16
                                  17
                                  18
                                  19
                                  20
                                  21
                                  22
                                  23
                                  24
                                  25
                                  26

                                  ·  import spark.SparkContext
                                  import SparkContext._
                                   
                                  object WordCount {
                                    def main(args: Array[String]) {
                                      if (args.length ==0 ){
                                        println("usage is org.test.WordCount <master>")
                                      }
                                      println("the args: ")
                                      args.foreach(println)
                                   
                                      val hdfsPath = "hdfs://hadoop1:8020"
                                   
                                      // create the SparkContext
                                   args(0)yarn傳入appMaster地址
                                      val sc = new SparkContext(args(0), "WrodCount",
                                      System.getenv("SPARK_HOME"),Seq(System.getenv("SPARK_TEST_JAR")))
                                   
                                      val textFile = sc.textFile(hdfsPath + args(1))
                                   
                                      val result = textFile.flatMap(line => line.split("\\s+"))
                                          .map(word => (word, 1)).reduceByKey(_ + _)
                                   
                                      result.saveAsTextFile(hdfsPath + args(2))
                                    }
                                  }
                                   

                                   本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
                                   轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
                                   本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!