事實上,Spark是一項非常值得學習的技術

jopen 8年前發布 | 35K 次閱讀 Spark

2013年年底,我第一次接觸到Spark,當時我對Spark所使用的Scala語言產生了較大的興趣。一段時間后,我做了一個預測泰坦尼克號船上人員生存概率的數據科學項目。事實證明這是一個更深入了解Spark概念和編程框架的絕佳途徑。 我強烈建議任何希望學習Spark的開發者都尋找一個項目入手。

事實上,Spark是一項非常值得學習的技術

如今,諸如亞馬遜、eBay和雅虎等公司都開始采用Spark技術。許多機構將Spark部署在上千個節點的集群中。據Spark FAQ中記錄,已知的最大集群節點個數已超過8000。 事實上,Spark是一項非常值得學習的技術。

本文主要介紹了Spark概念及一些實例。這些信息主要從Apache Spark網站和相關書籍中獲取。

什么是Apache Spark?

正如廣告所提到的,Spark是一個運算速度快如閃電的Apache項目。它有一個逐漸壯大的開源社區,同時它還是現今最熱門的Apache項目。

Spark提供了一個運算速度快的一般化數據處理平臺。Spark可以讓你的程序提高100倍的內存計算速度,或者10倍的磁盤計算速度(Hadoop)。去年的Daytona GraySort比賽中,Spark只用了Hadoop十分之一數量的機器就實現了其三倍多的速度。Spark已經成了處理PB級別數據運算速度最快的開源工具。

Spark還可以更加便捷地完成你的項目,為了更好地說明這個問題,我們首先看下如何實現大數據中的“Hello World!”案例。用Java語言編寫的MapReduce過程需要大約50行的代碼,而利用Spark你只需要以下幾行代碼:

sparkContext.textFile("hdfs://...") 
    .flatMap(line => line.split(" ")) 
    map(word => (word, 1)).reduceByKey(_+_) 
    .saveAsTextFile("hdfs://...") 

學習如何使用Apache Spark時,另一個重要的東西是其提供了脫機的交互式shell(REPL)。利用REPL,我們可以逐行檢測代碼是否有誤。考慮到Spark較為簡短的代碼風格,這使得即時數據分析任務成了容易實現的事情。

此外,Spark還提供了其他的一些特性:

  • 當前Spark提供了Scala、Java、Python和R的API接口
  • 較好地整合了Hadoop生態系統和數據儲存系統(HDFS, Amazon S3, HIVE, HBase, Cassandra等)
  • 既可以在Hadoop YARN或者Apache Mesos等集群上運行,也可以單機運行。

Spark核心組件可以和其他一些高效的軟件庫無縫連接使用。這些軟件庫包括SparkSQL, Spark Streming, MLlib(機器學習專用)和GraphX,下文將詳細介紹這些組件。其他一些軟件庫和擴展功能目前正處于開發過程中。

事實上,Spark是一項非常值得學習的技術

Spark核心組件

Spark是處理大規模數據的并行分布式基礎引擎。它主要負責以下幾個功能:

  • 內存管理和故障恢復
  • 制定并管理集群中的任務
  • 和數據儲存系統交互

Spark引入了RDD(Resilient Distributed Dataset)的概念,RDD是一個抽象的數據集,它提供對數據并行和容錯的處理。我們可以通過加載外部數據集或從驅動程序集中切分得到一個可以包含任意類型項目的RDD。

RDD支持兩種類型的運算:

  • 數據轉換(數據映射、過濾、合并等)在一個RDD上執行,而其結果被儲存到另外一個RDD中。
  • 數據運算(降維、計數等)則是通過在RDD中計算后才返回相應的結果。

Spark的數據轉換過程并不是實時返回運算結果。實際上,該過程知識記錄下需要執行的操作過程和相應的數據集。只有當執行數據運算過程且結果已經返回到驅動程序中時,Spark才執行數據轉換進程。該設計使得Spark可以更高效地執行任務。例如,如果一個大型數據集被轉換成許多子集并被傳輸到第一步的數據運算過程中,那么此時Spark只能處理并返回第一步的運算結果,并無法處理整個數據集的運算過程。

默認設定下,任何一個處理數據轉換過程的RDD將會在每次處理完數據運算后被還原。然而,你也可以使用高速緩存的方法將RDD保存下來,此時Spark會將內容儲存在集群中以便于下次更快捷地調用。

Spark SQL

SparkSQL是Spark的一個組件,它可以利用SQL或者Hive查詢語法來查詢數據。它起先被視為MapReduce的替代方案,現今SparkSQL已被整合到Spark堆棧中。為了提供對更多數據類型的支持,它將SQL語句納入系統中,這使其成為一個非常強大的工具。以下是Hive兼容查詢語句的實例:

// sc is an existing SparkContext 
val sqlContext = new org.apache.spark.sql.hive.HiveCONTEXT(sc) 
 
sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") 
sqlContext.sql("LOAD DATA LOCAL INPATH 'examoles/src/main/resources/kvl.txt' INTO TABLE src") 
 
// Queries are expressed in HiveQL 
sqlContext.sql("FROM src SELECT key, value").collect().foreach(println) 

Spark Streaming

Spark Streaming支持實時流式數據處理,比如Web服務器日志文件、推ter等社交網絡數據和類似Kafka的信息數據。Spark中,Spark Streaming接收輸入流數據并將其劃分成小子集。接下來,如下圖所示,這些數據被Spark引擎所處理并被整合成最終的結果。

事實上,Spark是一項非常值得學習的技術

Spark Streaming的API接口和Spark核心組件非常匹配,因此所有的編程人員可以輕易地處理流式數據。

MLlibMLlib是一個機器學習庫,它提供了為大規模集群計算所設計的分類、回歸、聚類和協同過濾等機器學習算法。其中一部分算法也適用于處理流式數據,比如普通線性二乘回歸估計和k均值聚類算法。值得注意的是,Apache Mahout(Hadoop的機器學習算法軟件庫)已經脫離MapReduce陣營轉而投向Spark MLlib中。

GraphX

事實上,Spark是一項非常值得學習的技術

GraphX是用于繪圖和執行繪圖并行計算的軟件庫,它為ETL(探索性分析和反復的繪圖計算)提供了一套統一的工具。除了繪圖操作技巧,它還提供了類似于PageRank的一般性繪圖算法。

如何使用Apache Spark?

推terUtils.createStream(...) 
    .filter(_.getText.contains("earthquake") || _.getText.contains("shaking")) 

現在我們已經回答了“什么是Apache Spark?”這個問題,接下來讓我們思考下Spark可以用來處理啥樣的問題呢?

最近,我偶然看到一篇文章。文章中提到利用推ter流式數據來檢測地震。有趣的是,實驗表明該技術可以比日本氣象局更快地告知百姓地震的情況。即使他們在文章使用了不同的方法,但我認為這是一個很好的例子,它可以用來檢驗我們如何利用簡化的片段代碼而不使用粘接代碼將Spark付諸實踐。

首先,我們必須將與“地震”或者“抖動”有關的推文過濾出來。我們可以非常輕易地利用Spark Streaming來實現該目標:

推terUtils.createStream(...) 
    .filter(_.getText.contains("earthquake") || _.getText.contains("shaking")) 

接下來我們可以對處理完的推文數據做一些語義分析,并判斷是否能反映出當前的地震情況。比如,類似于“地震”或者“現在正在晃動!”的推文將被視為具有正效應,而類似于“參加地震會議。”或者“昨天的地震真恐怖。”則被視為無影響效應。文章作者利用支持向量機模型來實現該目標,我們將在 此基礎上利用流式數據版本的模型來實現。下文是利用MLlib生成的代碼示例:

// We would prepare some earthquake tweet data and load it in LIBSVM format. 
val data = MLUtils.loadLibSVMFile(sc, "sample_earthquate_tweets.txt") 
 
// Split data into training (60%) and test (40%). 
val splits = data.randomSplit(Array(0.6,0.4),seed = 11L) 
val training = split(0).cache() 
val test = splits(1) 
 
// Run training algorithm to build the model 
val numIterations = 100 
val model = SVMWithSGD.train(training, numIterations) 
 
// Clear the default threshold. 
model.clearThreshold() 
 
// Compute raw scores on the test set. 
val scoreAndLabels = test.map{point =>  
    val score = model.predict(point.features) 
    (score, point.label)} 
 
// Get evaluation metrics. 
val metric = new BinaryClassificationMetrics(scoreAndLabels) 
val auROC = metrics.areaUnderROC() 
 
println("Area under ROC =" + auROC) 

如果我們關注模型的預測準確率,那么我們可以進一步對檢測到地震做出反應。需要注意的是,對于包含地理信息的推文,我們還可以獲取震源位置。利用這個信息,我們可以通過SparkSQL從現有的Hive table(儲存需要接收地震提醒的用戶信息)中提取出他們的郵箱地址并發送一封私人電子郵件:

//sc is an existing SparkContext. 
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) 
// sendEmail is a custom function 
sqlContext.sql("FROM earthquake_warning_users SELECT firstName, lastName, city, email") 
    .collect().foreach(sendEmail) 

Apache Spark的其他應用

除檢測地震情況外,Spark還有許多潛在的應用。

以下是Spark在大數據中的部分應用:

1.在游戲領域中,從實時的潛在游戲事件中迅速地挖掘出有價值的模式可以創造出巨大的商業利益,比如用戶返回率情況、如何制定定向廣告以及如何自動調整游戲的復雜度等。

2.在電子商務領域中,實時交易數據將被傳遞到k均值算法或者ALS等協同過濾流算法中。這些運算結果將和顧客評論等非結構化數據結合起來,用于不斷改進交易模式以適應新趨勢的發展。

3.在金融或證券領域中,Spark堆棧技術可以被應用到信用詐騙和風險管控系統中。通過獲取大量的歷史數據和其他一些外泄數據以及一些連接/請求信息(IP地理信息或時間信息),我們可以取得非常好的模型結果。

結論

總而言之,Spark幫助人們簡化了處理大規模數據的步驟流程。不管是處理結構化還是非結構化數據,Spark將許多復雜的功能(比如機器學習算法和圖算法)無縫地結合起來。Spark使得大量的從業者都可以進行大數據分析,讓我們一探究竟吧!

原文作者:RADEK OSTROWSKI

譯者:Fibears

來自: http://developer.51cto.com/art/201602/505664.htm

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!