用Apache Spark進行大數據處理——第二部分:Spark SQL
在Apache Spark文章系列的前一篇文章中,我們學習了什么是Apache Spark框架,以及如何用該框架幫助組織處理大數據處理分析的需求。
Spark SQL,作為Apache Spark大數據框架的一部分,主要用于結構化數據處理和對Spark數據執行類SQL的查詢。通過Spark SQL,可以針對不同格式的數據執行ETL操作(如JSON,Parquet,數據庫)然后完成特定的查詢操作。
在這一文章系列的第二篇中,我們將討論Spark SQL庫,如何使用Spark SQL庫對存儲在批處理文件、JSON數據集或Hive表中的數據執行SQL查詢。
Spark大數據處理框架目前最新的版本是上個月發布的Spark 1.3。這一版本之前,Spark SQL模塊一直處于“Alpha”狀態,現在該團隊已經從Spark SQL庫上將這一標簽移除。這一版本中包含了許多新的功能特性,其中一部分如下:
- 數據框架(DataFrame):Spark新版本中提供了可以作為分布式SQL查詢引擎的程序化抽象DataFrame。
- 數據源(Data Sources):隨著數據源API的增加,Spark SQL可以便捷地處理以多種不同格式存儲的結構化數據,如Parquet,JSON以及Apache Avro庫。
- JDBC服務器(JDBC Server):內置的JDBC服務器可以便捷地連接到存儲在關系型數據庫表中的結構化數據并利用傳統的商業智能(BI)工具進行大數據分析。 </ul>
- 已有的RDD
- 結構化數據文件
- JSON數據集
- Hive表
- 外部數據庫 </ul>
- Scala(https://spark.apache.org/docs/1.3.0/api/scala/index.html#org.apache.spark.sql.package)
- Java(https://spark.apache.org/docs/1.3.0/api/java/index.html?org/apache/spark/sql/api/java/package-summary.html)
- Python(https://spark.apache.org/docs/1.3.0/api/python/pyspark.sql.html) </ul>
Spark SQL組件
使用Spark SQL時,最主要的兩個組件就是DataFrame和SQLContext。
首先,我們來了解一下DataFrame。
DataFrame
DataFrame是一個分布式的,按照命名列的形式組織的數據集合。DataFrame基于R語言中的data frame概念,與關系型數據庫中的數據庫表類似。
之前版本的Spark SQL API中的SchemaRDD已經更名為DataFrame。
通過調用將DataFrame的內容作為行RDD(RDD of Rows)返回的rdd方法,可以將DataFrame轉換成RDD。
可以通過如下數據源創建DataFrame:
Spark SQL和DataFrame API已經在下述幾種程序設計語言中實現:
本文中所涉及的Spark SQL代碼示例均使用Spark Scala Shell程序。
SQLContext
Spark SQL提供SQLContext封裝Spark中的所有關系型功能。可以用之前的示例中的現有SparkContext創建SQLContext。下述代碼片段展示了如何創建一個SQLContext對象。
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
此外,Spark SQL中的HiveContext可以提供SQLContext所提供功能的超集。可以在用HiveQL解析器編寫查詢語句以及從Hive表中讀取數據時使用。
在Spark程序中使用HiveContext無需既有的Hive環境。
JDBC數據源
Spark SQL庫的其他功能還包括數據源,如JDBC數據源。
JDBC數據源可用于通過JDBC API讀取關系型數據庫中的數據。相比于使用JdbcRDD,應該將JDBC數據源的方式作為首選,因為JDBC數據源能夠將結果作為DataFrame對象返回,直接用Spark SQL處理或與其他數據源連接。
Spark SQL示例應用
在上一篇文章中,我們學習了如何在本地環境中安裝Spark框架,如何啟動Spark框架并用Spark Scala Shell與其交互。如需安裝最新版本的Spark,可以從Spark網站下載該軟件。
對于本文中的代碼示例,我們將使用相同的Spark Shell執行Spark SQL程序。這些代碼示例適用于Windows環境。
為了確保Spark Shell程序有足夠的內存,可以在運行spark-shell命令時,加入driver-memory命令行參數,如下所示:
spark-shell.cmd --driver-memory 1G
Spark SQL應用
Spark Shell啟動后,就可以用Spark SQL API執行數據分析查詢。
在第一個示例中,我們將從文本文件中加載用戶數據并從數據集中創建一個DataFrame對象。然后運行DataFrame函數,執行特定的數據選擇查詢。
文本文件customers.txt中的內容如下:
100, John Smith, Austin, TX, 78727 200, Joe Johnson, Dallas, TX, 75201 300, Bob Jones, Houston, TX, 77028 400, Andy Davis, San Antonio, TX, 78227 500, James Williams, Austin, TX, 78727
下述代碼片段展示了可以在Spark Shell終端執行的Spark SQL命令。
// 首先用已有的Spark Context對象創建SQLContext對象 val sqlContext = new org.apache.spark.sql.SQLContext(sc)// 導入語句,可以隱式地將RDD轉化成DataFrame import sqlContext.implicits._
// 創建一個表示客戶的自定義類 case class Customer(customer_id: Int, name: String, city: String, state: String, zip_code: String)
// 用數據集文本文件創建一個Customer對象的DataFrame val dfCustomers = sc.textFile("data/customers.txt").map(_.split(",")).map(p => Customer(p(0).trim.toInt, p(1), p(2), p(3), p(4))).toDF()
// 將DataFrame注冊為一個表 dfCustomers.registerTempTable("customers")
// 顯示DataFrame的內容 dfCustomers.show()
// 打印DF模式 dfCustomers.printSchema()
// 選擇客戶名稱列 dfCustomers.select("name").show()
// 選擇客戶名稱和城市列 dfCustomers.select("name", "city").show()
// 根據id選擇客戶 dfCustomers.filter(dfCustomers("customer_id").equalTo(500)).show()
// 根據郵政編碼統計客戶數量 dfCustomers.groupBy("zip_code").count().show()</pre>
在上一示例中,模式是通過反射而得來的。我們也可以通過編程的方式指定數據集的模式。這種方法在由于數據的結構以字符串的形式編碼而無法提前定義定制類的情況下非常實用。
如下代碼示例展示了如何使用新的數據類型類StructType,StringType和StructField指定模式。
// // 用編程的方式指定模式 //// 用已有的Spark Context對象創建SQLContext對象 val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// 創建RDD對象 val rddCustomers = sc.textFile("data/customers.txt")
// 用字符串編碼模式 val schemaString = "customer_id name city state zip_code"
// 導入Spark SQL數據類型和Row import org.apache.spark.sql._
import org.apache.spark.sql.types._;
// 用模式字符串生成模式對象 val schema = StructType(schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
// 將RDD(rddCustomers)記錄轉化成Row。 val rowRDD = rddCustomers.map(_.split(",")).map(p => Row(p(0).trim,p(1),p(2),p(3),p(4)))
// 將模式應用于RDD對象。 val dfCustomers = sqlContext.createDataFrame(rowRDD, schema)
// 將DataFrame注冊為表 dfCustomers.registerTempTable("customers")
// 用sqlContext對象提供的sql方法執行SQL語句。 val custNames = sqlContext.sql("SELECT name FROM customers")
// SQL查詢的返回結果為DataFrame對象,支持所有通用的RDD操作。 // 可以按照順序訪問結果行的各個列。 custNames.map(t => "Name: " + t(0)).collect().foreach(println)
// 用sqlContext對象提供的sql方法執行SQL語句。 val customersByCity = sqlContext.sql("SELECT name,zip_code FROM customers ORDER BY zip_code")
// SQL查詢的返回結果為DataFrame對象,支持所有通用的RDD操作。 // 可以按照順序訪問結果行的各個列。 customersByCity.map(t => t(0) + "," + t(1)).collect().foreach(println)</pre>
除了文本文件之外,也可以從其他數據源中加載數據,如JSON數據文件,Hive表,甚至可以通過JDBC數據源加載關系型數據庫表中的數據。
如上所示,Spark SQL提供了十分友好的SQL接口,可以與來自多種不同數據源的數據進行交互,而且所采用的語法也是團隊熟知的SQL查詢語法。這對于非技術類的項目成員,如數據分析師以及數據庫管理員來說,非常實用。
總結
本文中,我們了解到Apache Spark SQL如何用熟知的SQL查詢語法提供與Spark數據交互的SQL接口。Spark SQL是一個功能強大的庫,組織中的非技術團隊成員,如業務分析師和數據分析師,都可以用Spark SQL執行數據分析。
下一篇文章中,我們將討論可用于處理實時數據或流數據的Spark Streaming庫。Spark Streaming庫是任何一個組織的整體數據處理和管理生命周期中另外一個重要的組成部分,因為流數據處理可為我們提供對系統的實時觀察。這對于欺詐檢測、在線交易系統、事件處理解決方案等用例來說至關重要。
參考文獻
- Spark主站
- Spark SQL網站
- Spark SQL程序設計指南
- 用Apache Spark進行大數據處理——第一部分:入門介紹 </ul>
關于作者
Srini Penchikala目 前是一家金融服務機構的軟件架構師,這個機構位于德克薩斯州的奧斯汀。他在軟件系統架構、設計和開發方面有超過20年的經驗。Srini目前正在撰寫一本 關于NoSQL數據庫模式的書。他還是曼寧出版社出版的《Spring Roo in Action》一書的合著者(http://www.manning.com/SpringRooinAction)。他還曾經出席各種會議,如 JavaOne,SEI Architecture Technology Conference(SATURN),IT Architect Conference(ITARC),No Fluff Just Stuff,NoSQL Now和Project World Conference等。Srini還在InfoQ,The ServerSide,OReilly Network(ONJava),DevX Java,java.net以及JavaWorld等網站上發表過很多關于軟件系統架構、安全和風險管理以及NoSQL數據庫等方面的文章。他還是 InfoQ NoSQL數據庫社區的責任編輯(http://www.infoq.com/author/Srini-Penchikala)。
查看英文原文:Big Data Processing with Apache Spark - Part 2: Spark SQL
來自: