Spark SQL 初探: 使用大數據分析2000萬數據
目錄 [?]
- 安裝和配置Spark
- Spark初試
- 使用Spark SQL分析數據 </ol> </div>
- Spark讀取文件夾的文件
- Spark filter和map使用
- Spark sql語句調用
- 自定義Spark sql的函數 </ul>
<p>去年網上曾放出個2000W的開房記錄的數據庫, 不知真假。 最近在學習Spark, 所以特意從網上找來數據測試一下,
這是一個絕佳的大數據素材。 如果數據涉及到個人隱私,請盡快刪除, 本站不提供此類數據。你可以寫個隨機程序生成2000W的測試數據, 以CSV格式。</p>
Spark是UC Berkeley AMP lab所開源的類Hadoop MapReduce的通用的并行計算框架,Spark基于map reduce算法實現的分布式計算,擁有Hadoop MapReduce所具有的優點;但不同于MapReduce的是Job中間輸出和結果可以保存在內存中,從而不再需要讀寫HDFS,因此Spark能更 好地適用于數據挖掘與機器學習等需要迭代的map reduce的算法。
Spark是一個高效的分布式計算系統,相比Hadoop,它在性能上比Hadoop要高100倍。Spark提供比Hadoop更上層的API, 同樣的算法在Spark中實現往往只有Hadoop的1/10或者1/100的長度。Shark類似“SQL on Spark”,是一個在Spark上數據倉庫的實現,在兼容Hive的情況下,性能最高可以達到Hive的一百倍。
Apache Spark 是在 Scala 語言中實現的,它將 Scala 用作其應用程序框架。與 Hadoop 不同,Spark 和 Scala 能夠緊密集成,其中的 Scala 可以像操作本地集合對象一樣輕松地操作分布式數據集。
2014年處, Apache 基金會宣布旗下的 Apache Spark 項目成為基金會的頂級項目,擁有頂級域名 http://spark.apache.org/。 Spark 的用戶包括:阿里巴巴、Cloudera、Databricks、IBM、英特爾和雅虎等知名廠商。
Spark SQL是支持在Spark中使用Sql、HiveSql、Scaca中的關系型查詢表達式。它的核心組件是一個新增的RDD類型SchemaRDD,它把 行對象用一個Schema來描述行里面的所有列的數據類型,它就像是關系型數據庫里面的一張表。它可以從原有的RDD創建,也可以是Parquet文件, 最重要的是它可以支持用HiveQL從hive里面讀取數據。
在2014年7月1日的Spark Summit上,Databricks宣布終止對Shark的開發,將重點放到Spark SQL上。在會議上,Databricks表示,Shark更多是對Hive的改造,替換了Hive的物理執行引擎,因此會有一個很快的速度。然而,不容 忽視的是,Shark繼承了大量的Hive代碼,因此給優化和維護帶來了大量的麻煩。隨著性能優化和先進分析整合的進一步加深,基于MapReduce設 計的部分無疑成為了整個項目的瓶頸。 詳細內容請參看 Shark, Spark SQL, Hive on Spark, and the future of SQL on Spark
當前Spark SQL還處于alpha階段,一些API在將將來的版本中可能會有所改變。
我也翻譯幾篇重要的Spark文檔,你可以在我的網站找到。 Spark翻譯文檔
本文主要介紹了下面幾個知識點:
提前講一下,我也是最近才學習Spark及其相關的技術如Scala,下面的例子純粹為了驗證性的試驗, 相信例子代碼很很多優化的地方。
安裝和配置Spark
當前最新的Spark版本為1.1.1, 因為我們以Standalone方式運行Spark,所以直接隨便挑一個版本, 比如spark-1.1.1-bin-hadoop2.4.tgz, 解壓到你的機器上。
我使用的CentOS 6.4。 具體來講,它是我筆記本的一個虛擬機, 4個核, 4G內存。
在/opt解壓它, 命令行中進入解壓后的目錄/opt/spark-1.1.1-bin-hadoop2.4。
運行./bin/spark-shell
就可以啟動一個交互式的spark shell控制臺, 在其中可以執行scala代碼。
Spark初試
因為我們以本地單機的形式測試Spark, 你需要配置以下你的spark, 否則在分析大數據時很容易出現內存不夠的問題。
在我的機器上, conf文件夾下復制一份spark-defaults.conf,將使用的內存增大一些:
spark.executor.memory 2g
spark.driver.memory 2g
啟動shark-shell的時候設置使用4個核。
[root@colobu conf]# ./bin/spark-shell --master local[4]
根據 Spark 快速入門 中的介紹運行個例子測試一下:
scala> val textFile = sc.textFile("README.md")
14/12/11 13:52:00 INFO MemoryStore: ensureFreeSpace(163705) called with curMem=0, maxMem=1111794647
14/12/11 13:52:00 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 159.9 KB, free 1060.1 MB)
textFile: org.apache.spark.rdd.RDD[String] = README.md MappedRDD[1] at textFile at <console>:12
scala> textFile.count()
這個例子從Spark解壓目錄下的README.md文件創建一個RDD,并統計此文件有多少行。
再看一個拋針法計算PI值的例子。
val NUM_SAMPLES=1000000
val count = sc.parallelize(1 to NUM_SAMPLES).map{i =>
val x = Math.random()
val y = Math.random()
if (x*x + y*y < 1) 1 else 0
}.reduce(_ + _)
println("Pi 值大約為 " + 4.0 * count / NUM_SAMPLES)
結果為:
Pi 值大約為 3.141408
到目前為止,我們搭建好了一個Spark環境, 并簡單進行了測試。 下一步我們使用Spark SQL分析前面所說的數據。
使用Spark SQL分析數據
這一步,我們使用Spark SQL按照星座對2000W數據進行分組統計, 看看哪個星座的人最喜歡開房。
當然, 使用純Spark也可以完成我們的分析, 因為實際Spark SQL最終是利用Spark來完成的。
實際測試中發現這些數據并不是完全遵守一個schema, 有些數據的格式是不對的, 有些數據的數據項也是錯誤的。 在代碼中我們要剔除那么干擾數據。
反正我們用這個數據測試者玩, 并沒有嚴格的要求去整理哪些錯誤數據。
先看代碼:
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.createSchemaRDD
case class Customer(name: String, gender: String, ctfId: String, birthday: String, address: String)
val customer = sc.textFile("/mnt/share/2000W/*.csv").map(_.split(",")).filter(line => line.length > 7).map(p => Customer(p(0), p(5), p(4), p(6), p(7))).distinct()
customer.registerTempTable("customer")
def toInt(s: String):Int = {
try {
s.toInt
} catch {
case e:Exception => 9999
}
}
def myfun(birthday: String) : String = {
var rt = "未知"
if (birthday.length == 8) {
val md = toInt(birthday.substring(4))
if (md >= 120 & md <= 219)
rt = "水瓶座"
else if (md >= 220 & md <= 320)
rt = "雙魚座"
else if (md >= 321 & md <= 420)
rt = "白羊座"
else if (md >= 421 & md <= 521)
rt = "金牛座"
else if (md >= 522 & md <= 621)
rt = "雙子座"
else if (md >= 622 & md <= 722)
rt = "巨蟹座"
else if (md >= 723 & md <= 823)
rt = "獅子座"
else if (md >= 824 & md <= 923)
rt = "處女座"
else if (md >= 924 & md <= 1023)
rt = "天秤座"
else if (md >= 1024 & md <= 1122)
rt = "天蝎座"
else if (md >= 1123 & md <= 1222)
rt = "射手座"
else if ((md >= 1223 & md <= 1231) | (md >= 101 & md <= 119))
rt = "摩蝎座"
else
rt = "未知"
}
rt
}
sqlContext.registerFunction("constellation", (x:String) => myfun(x))
var result = sqlContext.sql("SELECT constellation(birthday), count(constellation(birthday)) FROM customer group by constellation(birthday)")
result.collect().foreach(println)
為了使用spark sql,你需要引入 sqlContext.createSchemaRDD
. Spark sql一個核心對象就是SchemaRDD
。 上面的import
可以隱式的將一個RDD轉換成SchemaRDD。
接著定義了Customer
類,用來映射每一行的數據, 我們只使用每一行很少的信息, 像地址,email等都沒用到。
接下來從2000W文件夾中讀取所有的csv文件, 創建一個RDD并注冊表customer。
因為沒有一個內建的函數可以將出生一起映射為星座, 所以我們需要定義一個映射函數myfun
,
并把它注冊到SparkContext中。 這樣我們就可以在sql語句中使用這個函數。 類似地,字符串的length函數當前也不支持,
你可以增加一個這樣的函數。 因為有的日期不正確,所有特別增加了一個”未知”的星座。 錯誤數據可能有兩種, 一是日期出錯,
而是此行格式不對,將其它字段映射成了出生日期。 我們在分析的時候忽略它們好了。
然后執行一個分組的sql語句。這個sql語句查詢結果類型為SchemaRDD, 也繼承了RDD所有的操作。
最后將結果打印出來。
[雙子座,1406018]
[雙魚座,1509839]
[摩蝎座,2404812]
[金牛座,1406225]
[水瓶座,1635358]
[巨蟹座,1498077]
[處女座,1666009]
[天秤座,1896544]
[白羊座,1409838]
[射手座,1614915]
[未知,160483]
[獅子座,1613529]
看起來魔蝎座的人最喜歡開房了, 明顯比其它星座的人要多。
我們也可以分析一下開房的男女比例:
......
result = sqlContext.sql("SELECT gender, count(gender) FROM customer where gender = 'F' or gender = 'M' group by gender")
result.collect().foreach(println)
結果顯示男女開房的人數大約是2:1
[F,6475461] [M,12763926]來自:http://colobu.com/2014/12/11/spark-sql-quick-start/