Spark SQL 初探: 使用大數據分析2000萬數據

jopen 10年前發布 | 177K 次閱讀 Spark SQL 分布式/云計算/大數據

目錄 [?]

  1. 安裝和配置Spark
  2. Spark初試
  3. 使用Spark SQL分析數據
  4. </ol> </div>

        <p>去年網上曾放出個2000W的開房記錄的數據庫, 不知真假。 最近在學習Spark, 所以特意從網上找來數據測試一下, 
    

    這是一個絕佳的大數據素材。 如果數據涉及到個人隱私,請盡快刪除, 本站不提供此類數據。你可以寫個隨機程序生成2000W的測試數據, 以CSV格式。</p>

    Spark是UC Berkeley AMP lab所開源的類Hadoop MapReduce的通用的并行計算框架,Spark基于map reduce算法實現的分布式計算,擁有Hadoop MapReduce所具有的優點;但不同于MapReduce的是Job中間輸出和結果可以保存在內存中,從而不再需要讀寫HDFS,因此Spark能更 好地適用于數據挖掘與機器學習等需要迭代的map reduce的算法。

    Spark是一個高效的分布式計算系統,相比Hadoop,它在性能上比Hadoop要高100倍。Spark提供比Hadoop更上層的API, 同樣的算法在Spark中實現往往只有Hadoop的1/10或者1/100的長度。Shark類似“SQL on Spark”,是一個在Spark上數據倉庫的實現,在兼容Hive的情況下,性能最高可以達到Hive的一百倍。

    Apache Spark 是在 Scala 語言中實現的,它將 Scala 用作其應用程序框架。與 Hadoop 不同,Spark 和 Scala 能夠緊密集成,其中的 Scala 可以像操作本地集合對象一樣輕松地操作分布式數據集。

    2014年處, Apache 基金會宣布旗下的 Apache Spark 項目成為基金會的頂級項目,擁有頂級域名 http://spark.apache.org/。 Spark 的用戶包括:阿里巴巴、Cloudera、Databricks、IBM、英特爾和雅虎等知名廠商。

    Spark SQL是支持在Spark中使用Sql、HiveSql、Scaca中的關系型查詢表達式。它的核心組件是一個新增的RDD類型SchemaRDD,它把 行對象用一個Schema來描述行里面的所有列的數據類型,它就像是關系型數據庫里面的一張表。它可以從原有的RDD創建,也可以是Parquet文件, 最重要的是它可以支持用HiveQL從hive里面讀取數據。

    在2014年7月1日的Spark Summit上,Databricks宣布終止對Shark的開發,將重點放到Spark SQL上。在會議上,Databricks表示,Shark更多是對Hive的改造,替換了Hive的物理執行引擎,因此會有一個很快的速度。然而,不容 忽視的是,Shark繼承了大量的Hive代碼,因此給優化和維護帶來了大量的麻煩。隨著性能優化和先進分析整合的進一步加深,基于MapReduce設 計的部分無疑成為了整個項目的瓶頸。 詳細內容請參看 Shark, Spark SQL, Hive on Spark, and the future of SQL on Spark

    當前Spark SQL還處于alpha階段,一些API在將將來的版本中可能會有所改變。

    我也翻譯幾篇重要的Spark文檔,你可以在我的網站找到。 Spark翻譯文檔

    本文主要介紹了下面幾個知識點:

    • Spark讀取文件夾的文件
    • Spark filter和map使用
    • Spark sql語句調用
    • 自定義Spark sql的函數
    • </ul>

      提前講一下,我也是最近才學習Spark及其相關的技術如Scala,下面的例子純粹為了驗證性的試驗, 相信例子代碼很很多優化的地方。

      安裝和配置Spark

      當前最新的Spark版本為1.1.1, 因為我們以Standalone方式運行Spark,所以直接隨便挑一個版本, 比如spark-1.1.1-bin-hadoop2.4.tgz, 解壓到你的機器上。
      我使用的CentOS 6.4。 具體來講,它是我筆記本的一個虛擬機, 4個核, 4G內存。

      在/opt解壓它, 命令行中進入解壓后的目錄/opt/spark-1.1.1-bin-hadoop2.4。

      運行./bin/spark-shell就可以啟動一個交互式的spark shell控制臺, 在其中可以執行scala代碼。

      Spark初試

      因為我們以本地單機的形式測試Spark, 你需要配置以下你的spark, 否則在分析大數據時很容易出現內存不夠的問題。
      在我的機器上, conf文件夾下復制一份spark-defaults.conf,將使用的內存增大一些:

      spark.executor.memory 2g
      spark.driver.memory 2g

      啟動shark-shell的時候設置使用4個核。

      [root@colobu conf]# ./bin/spark-shell --master local[4]

      根據 Spark 快速入門 中的介紹運行個例子測試一下:

      scala> val textFile = sc.textFile("README.md")
      14/12/11 13:52:00 INFO MemoryStore: ensureFreeSpace(163705) called with curMem=0, maxMem=1111794647
      14/12/11 13:52:00 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 159.9 KB, free 1060.1 MB)
      textFile: org.apache.spark.rdd.RDD[String] = README.md MappedRDD[1] at textFile at <console>:12
      scala> textFile.count()

      這個例子從Spark解壓目錄下的README.md文件創建一個RDD,并統計此文件有多少行。

      再看一個拋針法計算PI值的例子。

      val NUM_SAMPLES=1000000
      val count = sc.parallelize(1 to NUM_SAMPLES).map{i =>
      val x = Math.random()
      val y = Math.random()
      if (x*x + y*y < 1) 1 else 0
      }.reduce(_ + _)
      println("Pi 值大約為 " + 4.0 * count / NUM_SAMPLES)

      結果為:

      Pi 值大約為 3.141408

      到目前為止,我們搭建好了一個Spark環境, 并簡單進行了測試。 下一步我們使用Spark SQL分析前面所說的數據。

      使用Spark SQL分析數據

      這一步,我們使用Spark SQL按照星座對2000W數據進行分組統計, 看看哪個星座的人最喜歡開房。
      當然, 使用純Spark也可以完成我們的分析, 因為實際Spark SQL最終是利用Spark來完成的。
      實際測試中發現這些數據并不是完全遵守一個schema, 有些數據的格式是不對的, 有些數據的數據項也是錯誤的。 在代碼中我們要剔除那么干擾數據。
      反正我們用這個數據測試者玩, 并沒有嚴格的要求去整理哪些錯誤數據。

      先看代碼:

      val sqlContext = new org.apache.spark.sql.SQLContext(sc)
      import sqlContext.createSchemaRDD
      case class Customer(name: String, gender: String, ctfId: String, birthday: String, address: String)
      val customer = sc.textFile("/mnt/share/2000W/*.csv").map(_.split(",")).filter(line => line.length > 7).map(p => Customer(p(0), p(5), p(4), p(6), p(7))).distinct()
      customer.registerTempTable("customer")
      def toInt(s: String):Int = {
      try {
      s.toInt
      } catch {
      case e:Exception => 9999
      }
      }
      def myfun(birthday: String) : String = {
      var rt = "未知"
      if (birthday.length == 8) {
      val md = toInt(birthday.substring(4))
      if (md >= 120 & md <= 219)
      rt = "水瓶座"
      else if (md >= 220 & md <= 320)
      rt = "雙魚座"
      else if (md >= 321 & md <= 420)
      rt = "白羊座"
      else if (md >= 421 & md <= 521)
      rt = "金牛座"
      else if (md >= 522 & md <= 621)
      rt = "雙子座"
      else if (md >= 622 & md <= 722)
      rt = "巨蟹座"
      else if (md >= 723 & md <= 823)
      rt = "獅子座"
      else if (md >= 824 & md <= 923)
      rt = "處女座"
      else if (md >= 924 & md <= 1023)
      rt = "天秤座"
      else if (md >= 1024 & md <= 1122)
      rt = "天蝎座"
      else if (md >= 1123 & md <= 1222)
      rt = "射手座"
      else if ((md >= 1223 & md <= 1231) | (md >= 101 & md <= 119))
      rt = "摩蝎座"
      else
      rt = "未知"
      }
      rt
      }
      sqlContext.registerFunction("constellation", (x:String) => myfun(x))
      var result = sqlContext.sql("SELECT constellation(birthday), count(constellation(birthday)) FROM customer group by constellation(birthday)")
      result.collect().foreach(println)

      為了使用spark sql,你需要引入 sqlContext.createSchemaRDD. Spark sql一個核心對象就是SchemaRDD。 上面的import可以隱式的將一個RDD轉換成SchemaRDD。
      接著定義了Customer類,用來映射每一行的數據, 我們只使用每一行很少的信息, 像地址,email等都沒用到。
      接下來從2000W文件夾中讀取所有的csv文件, 創建一個RDD并注冊表customer。
      因為沒有一個內建的函數可以將出生一起映射為星座, 所以我們需要定義一個映射函數myfun, 并把它注冊到SparkContext中。 這樣我們就可以在sql語句中使用這個函數。 類似地,字符串的length函數當前也不支持, 你可以增加一個這樣的函數。 因為有的日期不正確,所有特別增加了一個”未知”的星座。 錯誤數據可能有兩種, 一是日期出錯, 而是此行格式不對,將其它字段映射成了出生日期。 我們在分析的時候忽略它們好了。

      然后執行一個分組的sql語句。這個sql語句查詢結果類型為SchemaRDD, 也繼承了RDD所有的操作。
      最后將結果打印出來。

      [雙子座,1406018]
      [雙魚座,1509839]
      [摩蝎座,2404812]
      [金牛座,1406225]
      [水瓶座,1635358]
      [巨蟹座,1498077]
      [處女座,1666009]
      [天秤座,1896544]
      [白羊座,1409838]
      [射手座,1614915]
      [未知,160483]
      [獅子座,1613529]

      看起來魔蝎座的人最喜歡開房了, 明顯比其它星座的人要多。

      我們也可以分析一下開房的男女比例:

      ......
      result = sqlContext.sql("SELECT gender, count(gender) FROM customer where gender = 'F' or gender = 'M' group by gender")
      result.collect().foreach(println)

      結果顯示男女開房的人數大約是2:1

      [F,6475461]
      [M,12763926]
      來自:http://colobu.com/2014/12/11/spark-sql-quick-start/

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!