Spark SQL編程指南(Python)
前言
Spark SQL允許我們在Spark環境中使用SQL或者Hive SQL執行關系型查詢。它的核心是一個特殊類型的Spark RDD:SchemaRDD。
SchemaRDD類似于傳統關系型數據庫的一張表,由兩部分組成:
Rows:數據行對象
Schema:數據行模式:列名、列數據類型、列可否為空等
Schema可以通過四種方式被創建:
(1)Existing RDD
(2)Parquet File
(3)JSON Dataset
(4)By running Hive SQL
考慮到Parquet File尚未在平臺開始使用,因此暫時僅討論其它三項。
注意:Spark SQL is currently an alpha component.
SQLContext(HiveContext)
Spark SQL的入口點為SQLContext,SQLContext的初始化依賴于SparkContext,代碼示例如下:

SQLContext目前僅僅使用一個簡單的SQL解析器,功能有限,而且目前很多的數據倉庫是建立在Hive之上的,因此Spark為我們提供了另一個選擇:HiveContext。
HiveContext使用相對比較完善的HiveQL解析器,可以使用HiveUDF,可以訪問現有Hive數據倉庫中的數據,且適配SQLContext的所有數據源,推薦使用。
HiveContext初始化過程相似,如下:

數據源
Spark SQL(SchemaRDD)的數據源可以簡單理解為就是普通的Spark RDD,所有可以應用于Spark RDD的操作均可以應用于SchemaRDD;此外,SchemaRDD還可以“注冊”為一張臨時表,然后通過SQL(Hive SQL)分析其中的數據(實際就是Spark RDD關聯的數據)。
SchemaRDD
SchemaRDD的數據源實際就是Spark RDD,但是Spark RDD與SchemaRDD還是有區別的,Spark RDD相對于SchemaRDD而言缺失“Schema”,因此Spark提供兩種方式完成Spark RDD到SchemaRDD的轉換,實際就是為Spark RDD應用“Schema”。
(1)使用反射推斷Schema
如果一個Spark RDD的數據類型為Row,則Spark可以通過反射推斷出該Spark RDD的Schema,并將其轉換為一個SchemaRDD。
Spark使用反射推斷某個Spark RDD的Schema時,僅僅使用這個Spark RDD的第一條數據(Row),因此必須保證這條數據的完整性。
Row的構建過程需要一個鍵值對列表,
Row(id = 1, name = "a", age = 28)
這個鍵值對列表已經明確定義出數據行的列名、列值,推斷僅作用于列類型。
代碼示例


處理邏輯可以分為以下幾步:
a. 創建一個字符串列表datas,用于模擬數據源;
b. 對datas執行“parallelize”操作,將其轉換為Spark RDD source,數據類型為字符串;
c. 將Spark RDD source中的每一條數據進行切片(split)后轉換為Spark RDD rows,數據類型為Row;
至此Spark RDD rows已經具備轉換為SchemaRDD的條件:它的數據類型為Row。
d. 使用HiveContext推斷rows的Schema,將其轉換為SchemaRDD people;
通過people.printSchema(),我們可以查看推斷Schema的結果:

e. 將SchemaRDD people注冊為一張臨時表“people”;
f. 執行SQL查詢語句:select * from people where age > 28 and age < 30,并將查詢結果保存至Spark RDD results,通過results.printSchema()的輸出結果:

可以看出Spark RDD results實際也是SchemaRDD,因此我們可以繼續將其注冊為一張臨時表;
g. 將SchemaRDD results注冊為一張臨時表“people”,并執行SQL查詢語句:select name from people2,并將查詢結果保存至 Spark RDD results2,通過f我們可以知道results2實際也是SchemaRDD,results2.printSchema()的輸出結果:

SchemaRDD results2的數據類型為Row,受到查詢語句(select name)的影響,其僅包含一列數據,列名為name。
h. SchemaRDD也可以執行所有Spark RDD的操作,這里我們通過map將results2中的name值轉換為大寫形式,最終的輸出結果:

上述示例說明以下三點:
a. 我們可以將一個數據類型為Row的Spark RDD轉換為一個SchemaRDD;
b. SchemaRDD可以注冊為一張臨時表執行SQL查詢語句,其查詢結果也是一個SchemaRDD;
c. SchemaRDD可以執行所有Spark RDD的操作。
(2)通過編碼指定Schema
使用反射推斷Schema的方式要求我們必須能夠構建一個數據類型為Row的Spark RDD,然后再將其轉換為SchemaRDD;某些情況下我們可能需要更為靈活的方式控制SchemaRDD構建過程,這正是通過編碼指定Schema的意義所在。
通過編碼指定Schema分為三步:
a. 構建一個數據類型為tuple或list的Spark RDD;
b. 構建Schema,需要匹配a中的tuple或list;
c.將b中的Schema應用于a中的Spark RDD。
代碼示例


代碼處理邏輯正好對應著上述三步,最終的輸出結果:

其中需要注意id、age的數據類型被聲明為IntegerType,因此數據源(字符串)中的數據需要做強制類型轉換處理。
JSON Datasets
Spark能夠自動推斷出Json數據集的“數據模式”(Schema),并將它加載為一個SchemaRDD實例。這種“自動”的行為是通過下述兩種方法實現的:
jsonFile:從一個文件目錄中加載數據,這個目錄中的文件的每一行均為一個JSON字符串(如果JSON字符串“跨行”,則可能導致解析錯誤);
jsonRDD:從一個已經存在的RDD中加載數據,這個RDD中的每一個元素均為一個JSON字符串;
代碼示例

可以得出以下兩點:
a. 如果數據輸入是JSON字符串的文本文件,我們可以直接使用jsonFile構建Spark RDD,實際就是SchemaRDD;
b. 如果某個Spark RDD的數據類型是字符串,且字符串均是JSON格式的字符串形式,則可以使用jsonRDD將其轉換為一個SchemaRDD。
Hive Tables
Hive Tables已經是“表”,因此我們無需創建或轉換,直接使用SQL查詢即可。
官方代碼示例

Hive UDF(Register Function)
Spark SQL使用HiveContext時可以支持Hive UDF,這里的UFD包含Hive本身內建的UDF,也包括我們自己擴展的UDF(實測Spark-1.2.0-cdh5.3.2版本下無法正常使用自己擴展的UDF(Permanent Function),已通過擴展源碼修復)。
這里重點介紹Spark SQL的Register Function,也就是說可以動態創建函數用于SQL查詢,其實際作用類似于Hive UDF。
代碼示例


代碼的處理邏輯與前大體類似,即首先通過編碼創建SchemaRDD people,然后將其注冊為一張表(注意這里使用了另一種方式:HiveContext registerRDDAsTable),最后執行查詢語句并打印結果。
特別的是查詢語句中使用到了一個名為“myfunc”的自定義SQL函數,而這個函數并不是預先存在的(如Hive UDF),它是在我們應用的運行期間被動態創建并注冊的,注冊過程使用到了HiveContext registerFunction。
對于Python而言,自定義函數的創建過程實際可分為兩步:
(1)定義Python Function;
(2)將(1)中定義好的Python Function注冊為SQL函數,注冊時的命名可與Function的名稱不同。
也可以使用Lambda表達式將定義Function與注冊過程同時完成,如上述示例。
我們自定義的SQL函數可以與Hive UDF共同使用,如下示例:

其中func.iptolocationbysina是Hive UDF(Permanent Function),mychange是自定義SQL函數。
從上面的兩個示例可以看出,自定義SQL函數遠比Hive UDF靈活。Hive UDF的創建過程比較復雜,需要使用Java語言完成編碼并部署為jar,且在使用函數之前需要以temporaty function或permanent function的形式存在,每一次Hive UDF的更新都需要重新編碼并更新jar;而自定義SQL函數是運行期間動態創建的,而使用Python編碼時Function的創建及更新非常簡便,推薦使用。
總結
Spark SQL為我們提供了強大的數據分析能力,主要體現在以下三個方面:
(1)Spark RDD可以通過反射推斷Schema或編碼指定Schema的方式轉換為SchemaRDD,將SchemaRDD創建為“數據表”之后,允許我們以SQL語句的形式分析數據,節約大量編碼工作量;
(2)Spark SQL允許我們在應用運行期間根據需求動態創建自定義SQL函數,擴充SQL的數據處理能力;
(3)SchemaRDD可以執行所有Spark RDD的操作,如果SQL無法表述我們的計算邏輯時,我們可以通過Spark RDD豐富的API完成。
來自:http://diptech.sinaapp.com/
來自:http://diptech.sinaapp.com/
本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!