Spark SQL 代碼簡要閱讀(基于Spark 1.1.0)

jopen 10年前發布 | 19K 次閱讀 Spark SQL 分布式/云計算/大數據

    Spark SQL允許相關的查詢如SQL,HiveQL或Scala運行在spark上。其核心組件是一個新的RDD:SchemaRDD,SchemaRDDs由 行對象組成,并包含一個描述此行對象的每一列的數據類型的schema。SchemaRDD和傳統關系型數據庫的表類似。SchemaRDD可以通過已有 的RDD、Parquet(列式存儲格式)類型文件、JSON數據集,或通過運行HiveQL獲取存儲在Apache Hive中的數據。社區文檔介紹:https://spark.apache.org/docs/latest/sql-programming-guide.html

一個簡單的例子

下面是一個使用Spark SQL的簡單例子。

// sc is an existing SparkContext.

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// createSchemaRDD is used to implicitly convert an RDD to a SchemaRDD.

import sqlContext.createSchemaRDD

// Define the schema using a case class.

// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,

// you can use custom classes that implement the Product interface.

case class Person(name: String, age: Int)

// Create an RDD of Person objects and register it as a table.

val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt))

people.registerTempTable("people")

// SQL statements can be run by using the sql methods provided by sqlContext.

val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

// The results of SQL queries are SchemaRDDs and support all the normal RDD operations.

// The columns of a row in the result can be accessed by ordinal.

teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

上述例子中定義了一個People類,通過加載people.txt文件并轉換為一個SchemaRDD,然后調用registerTempTable方法注冊為一個Table。后續便可以通過SQL對該表查詢并生成新的SchemaRDDSchemaRDD繼承于RDD,于是可以進行轉換和Actions操作。

 

Spark SQL代碼流程

從Spark SQL到RDD的DAG關系主要可以分為以下五步。

  1. 初始化,包括sqlContext,sqlContext包括Spark SQL執行的上下文與流程;定義并注冊Table,定義Table的字段與類型,然后注冊,注冊實際上就是把Table的元數據存儲在內存SimpleCatalog對象中。
  2. 解析SQL,并生成LogicalPlan(邏輯計劃)。代碼調用流程為:

    SQLContext.sql

    SQLContext .parseSql

    catalyst.SqlParser

    SqlLexical. Scanner

    最終通過SqlLexical. Scanner完成詞法語法的解析并生成LogicalPlan。

     

  3. 由邏輯計劃LogicalPlan生成QueryExecution。代碼調用流程為:

    New SchemaRDDLike

    sqlContext.executePlan(baseLogicalPlan)

    生成QueryExecution

     

  4. QueryExecution轉換為物理計劃SparkPlan,代碼調用流程為:

    SparkContext.runJob

    RDD.getDependencies

    SQLContext .QueryExecution.toRDD

    QueryExecution.prepareForExecution

    RuleExecutor. Apply

    Exchange.AddExchange. apply => SparkPlan

     

  5. 物理計劃SparkPlan轉換為RDD,通過調用SparkPlan.execute把樹形結果的物理計劃轉換為RDD的DAG關系。

     

Spark SQL關鍵類圖

其中右側的LogicalPlan為邏輯計劃,左邊的SparkPlan為物理計劃相關的類。

Spark SQL 代碼簡要閱讀(基于Spark 1.1.0)

來自:http://www.cnblogs.com/shenh062326/p/4133501.html

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!