Sqoop Developer’s Guide v1.4.6 (Sqoop開發者指南,中文版)

jopen 8年前發布 | 34K 次閱讀 Java開發

1.介紹

如果你是一個開發者或者應用程序員,想要修改Sqoop或者使用Sqoop內部API構建一個擴展,你應該閱讀本文檔。以下章節描述了每個API的目的,哪里用到了內部API,實現其他數據庫的支持需要哪些API。

2.支持的發行版

本文檔適用于Sqoop v1.4.6。

3.Sqoop發行版

Apache Sqoop是Apache Software Foundation的一款開源軟件產品。Sqoop的開發位于:http://sqoop.apache.org/。在這個網址上,你可以獲取:

  • Sqoop的新發行版以及最新的源碼
  • 問題跟蹤器(issue tracker)
  • 包含Sqoop文檔的wiki

4.前提

Sqoop需要以下前置知識:

  • Java開發
    • 熟悉JDBC
    • 熟悉Hadoop的API(包括新的0.20+的MapReduce API)
    </li>
  • 關系數據庫管理系統和SQL
  • </ul>

    本文檔假定你使用Linux或類Linux環境。如果你使用Windows,你可以使用cygwin完成大部分任務。如果你使用Mac OS X,你應該會看到很少(如果有的話)兼容性錯誤。Sqoop主要在Linux上操作和測試。

    5.編譯Sqoop源碼

    你可以使用以下命令獲取Sqoop源碼:git clone https://git-wip-us.apache.org/repos/asf/sqoop.git

    Sqoop源碼存在一個git倉庫里。從倉庫檢出源碼的說明:TODO。編譯說明在COMPILING.txt文件中。

    6. 開發者API reference

    本節介紹為集成Sqoop或者修改Sqoop的應用開發者提供的API

    下面三個小節按照以下用例編寫:

    • 使用Sqoop及其公開庫生成的類
    • 編寫Sqoop擴展(即,新的ConnManager實現以支持與更多的數據庫交互)
    • 修改Sqoop的內核

    每一小節都會比上一小節更深入的描述。

    6.1. 外部API

    Sqoop自動生成要導入HDFSHadoop Distributed File System)的表對應的類。被導入表的每一列對應類中的一個成員變量;該類的一個實例對應表的一行。生成的類實現Hadoop的序列化API,即WritableDBWritable接口。也包含下列其他便于使用的方法:

    • parse()方法,用于解釋分隔文本字段
    • toString()方法,用于保留用戶選擇的分隔符

    自動生成類必須實現的方法集在抽象類com.cloudera.sqoop.lib.SqoopRecord中聲明。

    SqoopRecord實例可能依賴于Sqooppublic API。也就是com.cloudera.sqoop.lib包里的所有類。下面會簡要介紹。雖然Sqoop生成的類會依賴這些類,但是Sqoop客戶端不應該直接與這些類交互。因此,這些API被認為是公開的,而且改進它們的時候需要小心。

    • 通過使用可控的分隔符和引號字符,RecordParser類會將一行文本解析成一個字段清單。
    • 靜態類FieldFormatter提供了一個處理引號和轉義字符的方法,這個方法會在SqoopRecord.toString()的實現里用到。
    • ResultSetPreparedStatement對象和SqoopRecords的編組數據由JdbcWritableBridge實現。
    • BigDecimalSerializer包含一對方法,用于實現BigDecimal對象的序列號。

    Public API的完整描述參見Sqoop開發wikiSIP-4

    6.2. 擴展API

    本節介紹用于Sqoop擴展的API和基礎類,由此Sqoop可以兼容更多的數據庫提供商。

    盡管Sqoop使用JDBCDataDrivenDBInputFormat讀數據庫,但是不同數據庫廠商以及JDBC元數據支持的SQL使得我們必須為不同數據庫編寫不同的代碼。Sqoop的解決方法是引入ConnManager APIcom.cloudera.sqoop.manager.ConnManager)。

    ConnManager是一個抽象類,定義了與數據庫交互的所有方法。ConnManager的大多數實現會繼承自com.cloudera.sqoop.manager.SqlManager抽象類(而非直接繼承ConnManager),SqlManager類使用標準SQL實現多數操作。子類必須實現getConnection()方法,該方法返回與數據庫的實際的JDBC連接。子類也可以覆蓋任何其他方法。SqlManager類暴露了一個受保護的API,允許程序員選擇性的覆蓋。(譯者注:應該類似于模板方法設計模式)例如,getColNamesQuery()方法的存在,使得可以在不重寫getColNames()方法的大部分代碼的情況下,修改getColNames()方法使用的SQL查詢。

    ConnManager的實現從SqoopOptions類獲取很多配置。SqoopOptions類是可變的。SqoopOptions不保存具體的配置。而是持有一個Configuration的引用,這個引用是由Tool.getConf()在GenericOptionsParser解析命令行參數后返回的。允許使用擴展參數“-D any.specific.param=any.vaule”,而且不需要SqoopOptions的選項解析或者修改。這個Configuration形成了傳給任意工作流調用的MapReduce Job的Configuration的基礎,因此用戶可以在命令行中設置任何需要的定制Hadoop狀態。

    所有已有的ConnManager實現都是無狀態的。因此,實例化ConnManagers的系統在Sqoop的生命周期中可能實現同一個ConnManager的多個實例。目前認為實例化一個ConnManager是一個輕量級的操作,而且發生頻率相對較低。因此,ConnManagers在各個操作之間并沒有緩存。

    ConnManagers目前是由抽象類ManagerFactory(參見:http://issues.apache.org/jira/browse/MAPREDUCE-750)的實例來創建的。ManagerFactory的一個實現目前支持Sqoop的所有情況:com.cloudera.sqoop.manager.DefaultManagerFactory。擴展不應該修改DefaultManagerFactory。相反,一個ManagerFactory的擴展實現應該提供新的ConnManager。ManagerFactory只有一個方法,accept()。這個方法判斷是否可以給用戶的SqoopOptions實例化一個ConnManager。若可以,則返回ConnManager實例,否則,返回null。

    采用哪個ManagerFactory實現由sqoop-site.xml配置文件中的sqoop.connection.factories配置項指定的。擴展庫用戶可以安裝包含一個新ManagerFactory和ConnManager(s)的第三方庫,并配置sqoop-site.xml來使用這個新的ManagerFactory。DefaultManagerFactory主要是通過解析存儲在SqoopOptions中的連接字符串來區分數據庫。

    擴展提供者可能利用com.cloudera.sqoop.io,mapreduce和util包里面的類,來輔助實現。這些包和類在下面的章節中詳細介紹。

    6.2.1. HBase序列化擴展

     Sqoop支持從數據庫導入到Hbase。當向HBase復制數據時,必須將其轉換成HBase支持的格式。特別是:

    • 數據必須放置于HBase的一個(或多個)表中。
    • 輸入數據的列必須放置于一個列族(column family)中。
    • 值必須序列化為字節數組,以放入cells中。

    所有這些都是通過HBase客戶端API的Put語句完成的。Sqoop與HBase的交互在com.cloudera.sqoop.hbase包里實現。記錄從數據庫中反序列化,并從mapper發出。OutputFormat負責把結果插入HBase。這是通過PutTransformer接口完成的。PutTransformer有一個getPutCommand()方法,輸入一個Map<String,Object>代表數據集的字段。返回一個List<Put>描述如何把cells插入HBase。默認的PutTransformer實現是ToStringPutTransformer,它使用每個字段的基于字符串的表示,來將字段序列化進HBase。

    你可以通過實現PutTransformer接口來覆蓋這個實現,并把它加入到classpath用于map任務(例如,使用-libjars選項)。要讓Sqoop使用你的實現,需要設置sqoop.hbase.insert.put.transformer.class屬性來用-D標識你的類。

    在你的PutTransformer實現中,row key column和column family可以通過getRowKeyColumn()和getColumnFamily()方法獲取。你可以自行添加額外的Put操作;例如,注入額外的行表示一個二級索引。Sqoop會對--hbase-table指定的表執行所有Put操作。

    6.3. Sqoop內核

    本節描述Sqoop的內部架構。

    Sqoop是由com.cloudera.sqoop.Sqoop主類驅動。同一個包里還有另外幾個類;SqoopOptions(前面介紹過了)和ConnFactory(操作ManagerFactory實例)。

    6.3.1. 一般程序流程

     一般程序流程如下:

    com.cloudera.sqoop.Sqoop是主類,并且實現了Tool。新實例由ToolRunner啟動。Sqoop的第一個參數是標識要運行的SqoopTool的名字的字符串。這個SqoopTool來執行用于請求的操作(例如,import,export,codegen,等等)。

    SqoopTool API在SIP-1中完整描述。

    選中的SqoopTool會解析參數中剩余部分,把相應的字段設置到SqoopOptions類中。接下來執行其方法體。

    在SqoopTool的run()方法里,import、export或者其他操作會被執行。緊接著,基于SqoopOptions中的數據實例化一個ConnManager實例。ConnFactory用于從ManagerFactory獲取ConnManager;前面的章節已經描述過這個機制。導入、導出和其他大量數據的移動任務通常是執行一個MapReduce job在表上進行并行、可靠的操作。導入(import)操作不必通過MapReduce job執行;ConnManager.importTable()方法用于判斷怎樣執行import最好。每一個主要的操作實際上都是由ConnManager控制的,除了代碼生成,它是由CompilationManager和ClassWriter完成的。(二者都在com.cloudera.sqoop.orm包中。)導入Hive的操作,在importTable()完成后,由com.cloudera.sqoop.hive.HiveImport接管。與使用的ConnManager實現無關。

    ConnManager的importTable()方法接收一個ImportJobContext類型的參數,它包含了該方法的參數。這個類將來可能會擴展更多的參數,進一步管理import操作。類似的,exportTable()方法接收一個ExportJobContext類型的參數。這些類包含要導入或導出的表的名稱,一個SqoopOptions對象的引用,和其他相關數據。

    6.3.2 子包

    com.cloudera.sqoop包下面包含以下子包:

    • hive - 用于導入數據到Hive。
    • io - java.io.*接口(即,OutputStream和Writer)的實現。
    • lib - 外部公開API(前面介紹過了)。
    • manager - ConnManager和ManagerFactory接口和它們的實現。
    • mapreduce - 與new MapReduce API(0.20+)的連接類。
    • orm - 代碼自動生成。
    • tool - SqoopTool的實現類。
    • util - 工具類

    io包中包含OutputStream和BufferedWriter實現類,用于HDFS的直接寫入器(direct writers)。SplittableBufferedWriter允許對一個client打開一個BufferedWriter,當達到了目標的閾值,這個client會連續的寫入多個文件。這允許不可分割的壓縮庫(例如,gzip)與Sqoop import一起使用,同時仍然允許隨后的MapReduce作業使用每個數據集的多個輸入分片(splits)。大對象文件存儲(參見SIP-3)系統的代碼也位于io包里。

    mapreduce包中包含與Hadoop MapReduce直接交互的代碼。這個包的內容在下一節中詳細描述。

    orm包中包含類生成的代碼。它依賴于JDK的tools.jar,該jar包里提供了com.sun.tools.javac包。

    util包中包含Sqoop使用的各種工具:

    • ClassLoaderStack管理一組當前線程使用的ClassLoader實例。主要用于,在local(standalone)模式下運行MapReduce時,把自動生成的代碼加載到當前線程。
    • DirectImportUtils包含HDFS直接寫入器(direct HDFS writer)的便捷方法。
    • Executor啟動外部進程,并把它們連接到AsyncSink(請看下面的詳細描述)生成的流處理器。
    • ExportException在導出失敗時,ConnManagers拋出該異常。
    • ImportException在導入失敗時,ConnManagers拋出該異常。
    • JdbcUrl處理連接字符串的解析,連接字符串是類URL(URL-like)的,但并不完全一致。(特別是,JDBC連接字符串可能有multi:part:scheme://組件。)
    • PerfCounters用于估算傳輸率,展示給用戶。
    • ResultSetPrinter可以優化打印ResultSet。

    有幾個地方,Sqoop從外部進程讀取stdout。最直接的例子是LocalMySQLManager和DirectPostgresqlManager實現的direct-mode導入。在Runtime.exec()產生一個進程后,必須處理進程的stdout(Process.getInputStream())和可能的sdterr(Process.getErrorStream())。若從這兩個流中讀取足夠的數據失敗,外部進程寫入更多數據前,會導致外部進程阻塞。因此,二者必須都被處理,而且最好是異步的。

    在Sqoop用語中,“async sink”是一個線程,持有一個InputStream并讀取直至結束。由AsyncSink的實現類來完成。com.cloudera.sqoop.util.AsyncSink抽象類定義了這個工廠必須實現的操作。processStream()會創建另一個線程,立即開始處理從InputStream參數讀取的數據;必須讀取這個流直至結束。join()方法允許外部線程等待,知道這個處理完成。

    默認提供了一些AsyncSink的實現:LoggingAsyncSink處理諸如log4j INFO等的InputStream。NullAsyncSink消耗所有的輸入,但不做其他處理。

    利用外部進程的各種ConnManagers以內部類的形式實現了各自的AsyncSink實現類,這些實現類從數據庫工具讀取數據,并把數據發送到HDFS,可能還同時進行了格式轉換。

    6.3.3 與MapReduce的接口

    Sqoop調度MapReduce 作業(jobs)進行導入和導出。MapReduce作業的配置和執行按照一些通用步驟進行(配置InputFormat;配置OutputFormat;設置Mapper實現;等等...)。這些步驟在com.cloudera.sqoop.mapreduce.JobBase類中體現。JobBase允許用戶指定要使用哪些InputFormat,OutputFormat和Mapper。

    JobBase本身是ImportJobBase和ExportJobBase的子類,為import或export相關作業通用的配置步驟分別提供了更好的支持。ImportJobBase.runImport()會調用配置步驟,并運行一個作業來把一個表導入HDFS。

    這些基類也存在一些子類。例如,DataDrivenImportJob使用DataDrivenDBImportJob來執行導入。這是各種可用ConnManager實現最常用到的導入類型。MySQL使用不同的類(MySQLDumpImportJob)來運行direct-mode導入。它的定制Mapper和InputFormat實現也位于這個包里。


    英文原文:http://sqoop.apache.org/docs/1.4.6/SqoopDevGuide.html

    相關文章:Sqoop中文用戶手冊,http://www.zihou.me/html/2014/01/28/9114.html

    來自: http://blog.csdn.net//kingzone_2008/article/details/50298143

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!