Sqoop Developer’s Guide v1.4.6 (Sqoop開發者指南,中文版)
1.介紹
如果你是一個開發者或者應用程序員,想要修改Sqoop或者使用Sqoop內部API構建一個擴展,你應該閱讀本文檔。以下章節描述了每個API的目的,哪里用到了內部API,實現其他數據庫的支持需要哪些API。
2.支持的發行版
本文檔適用于Sqoop v1.4.6。
3.Sqoop發行版
Apache Sqoop是Apache Software Foundation的一款開源軟件產品。Sqoop的開發位于:http://sqoop.apache.org/。在這個網址上,你可以獲取:
- Sqoop的新發行版以及最新的源碼
- 問題跟蹤器(issue tracker)
- 包含Sqoop文檔的wiki
4.前提
Sqoop需要以下前置知識:
- Java開發
- 熟悉JDBC
- 熟悉Hadoop的API(包括新的0.20+的MapReduce API)
- 關系數據庫管理系統和SQL </ul>
- 使用Sqoop及其公開庫生成的類
- 編寫Sqoop擴展(即,新的ConnManager實現以支持與更多的數據庫交互)
- 修改Sqoop的內核
- parse()方法,用于解釋分隔文本字段
- toString()方法,用于保留用戶選擇的分隔符
- 通過使用可控的分隔符和引號字符,RecordParser類會將一行文本解析成一個字段清單。
- 靜態類FieldFormatter提供了一個處理引號和轉義字符的方法,這個方法會在SqoopRecord.toString()的實現里用到。
- ResultSet和PreparedStatement對象和SqoopRecords的編組數據由JdbcWritableBridge實現。
- BigDecimalSerializer包含一對方法,用于實現BigDecimal對象的序列號。
- 數據必須放置于HBase的一個(或多個)表中。
- 輸入數據的列必須放置于一個列族(column family)中。
- 值必須序列化為字節數組,以放入cells中。
- hive - 用于導入數據到Hive。
- io - java.io.*接口(即,OutputStream和Writer)的實現。
- lib - 外部公開API(前面介紹過了)。
- manager - ConnManager和ManagerFactory接口和它們的實現。
- mapreduce - 與new MapReduce API(0.20+)的連接類。
- orm - 代碼自動生成。
- tool - SqoopTool的實現類。
- util - 工具類
- ClassLoaderStack管理一組當前線程使用的ClassLoader實例。主要用于,在local(standalone)模式下運行MapReduce時,把自動生成的代碼加載到當前線程。
- DirectImportUtils包含HDFS直接寫入器(direct HDFS writer)的便捷方法。
- Executor啟動外部進程,并把它們連接到AsyncSink(請看下面的詳細描述)生成的流處理器。
- ExportException在導出失敗時,ConnManagers拋出該異常。
- ImportException在導入失敗時,ConnManagers拋出該異常。
- JdbcUrl處理連接字符串的解析,連接字符串是類URL(URL-like)的,但并不完全一致。(特別是,JDBC連接字符串可能有multi:part:scheme://組件。)
- PerfCounters用于估算傳輸率,展示給用戶。
- ResultSetPrinter可以優化打印ResultSet。
本文檔假定你使用Linux或類Linux環境。如果你使用Windows,你可以使用cygwin完成大部分任務。如果你使用Mac OS X,你應該會看到很少(如果有的話)兼容性錯誤。Sqoop主要在Linux上操作和測試。
5.編譯Sqoop源碼
你可以使用以下命令獲取Sqoop源碼:git clone https://git-wip-us.apache.org/repos/asf/sqoop.git
Sqoop源碼存在一個git倉庫里。從倉庫檢出源碼的說明:TODO。編譯說明在COMPILING.txt文件中。
6. 開發者API reference
本節介紹為集成Sqoop或者修改Sqoop的應用開發者提供的API。
下面三個小節按照以下用例編寫:
每一小節都會比上一小節更深入的描述。
6.1. 外部API
Sqoop自動生成要導入HDFS(Hadoop Distributed File System)的表對應的類。被導入表的每一列對應類中的一個成員變量;該類的一個實例對應表的一行。生成的類實現Hadoop的序列化API,即Writable和DBWritable接口。也包含下列其他便于使用的方法:
自動生成類必須實現的方法集在抽象類com.cloudera.sqoop.lib.SqoopRecord中聲明。
SqoopRecord實例可能依賴于Sqoop的public API。也就是com.cloudera.sqoop.lib包里的所有類。下面會簡要介紹。雖然Sqoop生成的類會依賴這些類,但是Sqoop客戶端不應該直接與這些類交互。因此,這些API被認為是公開的,而且改進它們的時候需要小心。
Public API的完整描述參見Sqoop開發wiki:SIP-4
6.2. 擴展API
本節介紹用于Sqoop擴展的API和基礎類,由此Sqoop可以兼容更多的數據庫提供商。
盡管Sqoop使用JDBC和DataDrivenDBInputFormat讀數據庫,但是不同數據庫廠商以及JDBC元數據支持的SQL使得我們必須為不同數據庫編寫不同的代碼。Sqoop的解決方法是引入ConnManager API(com.cloudera.sqoop.manager.ConnManager)。
ConnManager是一個抽象類,定義了與數據庫交互的所有方法。ConnManager的大多數實現會繼承自com.cloudera.sqoop.manager.SqlManager抽象類(而非直接繼承ConnManager),SqlManager類使用標準SQL實現多數操作。子類必須實現getConnection()方法,該方法返回與數據庫的實際的JDBC連接。子類也可以覆蓋任何其他方法。SqlManager類暴露了一個受保護的API,允許程序員選擇性的覆蓋。(譯者注:應該類似于模板方法設計模式)例如,getColNamesQuery()方法的存在,使得可以在不重寫getColNames()方法的大部分代碼的情況下,修改getColNames()方法使用的SQL查詢。
ConnManager的實現從SqoopOptions類獲取很多配置。SqoopOptions類是可變的。SqoopOptions不保存具體的配置。而是持有一個Configuration的引用,這個引用是由Tool.getConf()在GenericOptionsParser解析命令行參數后返回的。允許使用擴展參數“-D any.specific.param=any.vaule”,而且不需要SqoopOptions的選項解析或者修改。這個Configuration形成了傳給任意工作流調用的MapReduce Job的Configuration的基礎,因此用戶可以在命令行中設置任何需要的定制Hadoop狀態。
所有已有的ConnManager實現都是無狀態的。因此,實例化ConnManagers的系統在Sqoop的生命周期中可能實現同一個ConnManager的多個實例。目前認為實例化一個ConnManager是一個輕量級的操作,而且發生頻率相對較低。因此,ConnManagers在各個操作之間并沒有緩存。
ConnManagers目前是由抽象類ManagerFactory(參見:http://issues.apache.org/jira/browse/MAPREDUCE-750)的實例來創建的。ManagerFactory的一個實現目前支持Sqoop的所有情況:com.cloudera.sqoop.manager.DefaultManagerFactory。擴展不應該修改DefaultManagerFactory。相反,一個ManagerFactory的擴展實現應該提供新的ConnManager。ManagerFactory只有一個方法,accept()。這個方法判斷是否可以給用戶的SqoopOptions實例化一個ConnManager。若可以,則返回ConnManager實例,否則,返回null。
采用哪個ManagerFactory實現由sqoop-site.xml配置文件中的sqoop.connection.factories配置項指定的。擴展庫用戶可以安裝包含一個新ManagerFactory和ConnManager(s)的第三方庫,并配置sqoop-site.xml來使用這個新的ManagerFactory。DefaultManagerFactory主要是通過解析存儲在SqoopOptions中的連接字符串來區分數據庫。
擴展提供者可能利用com.cloudera.sqoop.io,mapreduce和util包里面的類,來輔助實現。這些包和類在下面的章節中詳細介紹。
6.2.1. HBase序列化擴展
Sqoop支持從數據庫導入到Hbase。當向HBase復制數據時,必須將其轉換成HBase支持的格式。特別是:
所有這些都是通過HBase客戶端API的Put語句完成的。Sqoop與HBase的交互在com.cloudera.sqoop.hbase包里實現。記錄從數據庫中反序列化,并從mapper發出。OutputFormat負責把結果插入HBase。這是通過PutTransformer接口完成的。PutTransformer有一個getPutCommand()方法,輸入一個Map<String,Object>代表數據集的字段。返回一個List<Put>描述如何把cells插入HBase。默認的PutTransformer實現是ToStringPutTransformer,它使用每個字段的基于字符串的表示,來將字段序列化進HBase。
你可以通過實現PutTransformer接口來覆蓋這個實現,并把它加入到classpath用于map任務(例如,使用-libjars選項)。要讓Sqoop使用你的實現,需要設置sqoop.hbase.insert.put.transformer.class屬性來用-D標識你的類。
在你的PutTransformer實現中,row key column和column family可以通過getRowKeyColumn()和getColumnFamily()方法獲取。你可以自行添加額外的Put操作;例如,注入額外的行表示一個二級索引。Sqoop會對--hbase-table指定的表執行所有Put操作。
6.3. Sqoop內核
本節描述Sqoop的內部架構。
Sqoop是由com.cloudera.sqoop.Sqoop主類驅動。同一個包里還有另外幾個類;SqoopOptions(前面介紹過了)和ConnFactory(操作ManagerFactory實例)。
6.3.1. 一般程序流程
一般程序流程如下:
com.cloudera.sqoop.Sqoop是主類,并且實現了Tool。新實例由ToolRunner啟動。Sqoop的第一個參數是標識要運行的SqoopTool的名字的字符串。這個SqoopTool來執行用于請求的操作(例如,import,export,codegen,等等)。
SqoopTool API在SIP-1中完整描述。
選中的SqoopTool會解析參數中剩余部分,把相應的字段設置到SqoopOptions類中。接下來執行其方法體。
在SqoopTool的run()方法里,import、export或者其他操作會被執行。緊接著,基于SqoopOptions中的數據實例化一個ConnManager實例。ConnFactory用于從ManagerFactory獲取ConnManager;前面的章節已經描述過這個機制。導入、導出和其他大量數據的移動任務通常是執行一個MapReduce job在表上進行并行、可靠的操作。導入(import)操作不必通過MapReduce job執行;ConnManager.importTable()方法用于判斷怎樣執行import最好。每一個主要的操作實際上都是由ConnManager控制的,除了代碼生成,它是由CompilationManager和ClassWriter完成的。(二者都在com.cloudera.sqoop.orm包中。)導入Hive的操作,在importTable()完成后,由com.cloudera.sqoop.hive.HiveImport接管。與使用的ConnManager實現無關。
ConnManager的importTable()方法接收一個ImportJobContext類型的參數,它包含了該方法的參數。這個類將來可能會擴展更多的參數,進一步管理import操作。類似的,exportTable()方法接收一個ExportJobContext類型的參數。這些類包含要導入或導出的表的名稱,一個SqoopOptions對象的引用,和其他相關數據。
6.3.2 子包
com.cloudera.sqoop包下面包含以下子包:
io包中包含OutputStream和BufferedWriter實現類,用于HDFS的直接寫入器(direct writers)。SplittableBufferedWriter允許對一個client打開一個BufferedWriter,當達到了目標的閾值,這個client會連續的寫入多個文件。這允許不可分割的壓縮庫(例如,gzip)與Sqoop import一起使用,同時仍然允許隨后的MapReduce作業使用每個數據集的多個輸入分片(splits)。大對象文件存儲(參見SIP-3)系統的代碼也位于io包里。
mapreduce包中包含與Hadoop MapReduce直接交互的代碼。這個包的內容在下一節中詳細描述。
orm包中包含類生成的代碼。它依賴于JDK的tools.jar,該jar包里提供了com.sun.tools.javac包。
util包中包含Sqoop使用的各種工具:
有幾個地方,Sqoop從外部進程讀取stdout。最直接的例子是LocalMySQLManager和DirectPostgresqlManager實現的direct-mode導入。在Runtime.exec()產生一個進程后,必須處理進程的stdout(Process.getInputStream())和可能的sdterr(Process.getErrorStream())。若從這兩個流中讀取足夠的數據失敗,外部進程寫入更多數據前,會導致外部進程阻塞。因此,二者必須都被處理,而且最好是異步的。
在Sqoop用語中,“async sink”是一個線程,持有一個InputStream并讀取直至結束。由AsyncSink的實現類來完成。com.cloudera.sqoop.util.AsyncSink抽象類定義了這個工廠必須實現的操作。processStream()會創建另一個線程,立即開始處理從InputStream參數讀取的數據;必須讀取這個流直至結束。join()方法允許外部線程等待,知道這個處理完成。
默認提供了一些AsyncSink的實現:LoggingAsyncSink處理諸如log4j INFO等的InputStream。NullAsyncSink消耗所有的輸入,但不做其他處理。
利用外部進程的各種ConnManagers以內部類的形式實現了各自的AsyncSink實現類,這些實現類從數據庫工具讀取數據,并把數據發送到HDFS,可能還同時進行了格式轉換。
6.3.3 與MapReduce的接口
Sqoop調度MapReduce 作業(jobs)進行導入和導出。MapReduce作業的配置和執行按照一些通用步驟進行(配置InputFormat;配置OutputFormat;設置Mapper實現;等等...)。這些步驟在com.cloudera.sqoop.mapreduce.JobBase類中體現。JobBase允許用戶指定要使用哪些InputFormat,OutputFormat和Mapper。
JobBase本身是ImportJobBase和ExportJobBase的子類,為import或export相關作業通用的配置步驟分別提供了更好的支持。ImportJobBase.runImport()會調用配置步驟,并運行一個作業來把一個表導入HDFS。
這些基類也存在一些子類。例如,DataDrivenImportJob使用DataDrivenDBImportJob來執行導入。這是各種可用ConnManager實現最常用到的導入類型。MySQL使用不同的類(MySQLDumpImportJob)來運行direct-mode導入。它的定制Mapper和InputFormat實現也位于這個包里。
英文原文:http://sqoop.apache.org/docs/1.4.6/SqoopDevGuide.html
相關文章:Sqoop中文用戶手冊,http://www.zihou.me/html/2014/01/28/9114.html
來自: http://blog.csdn.net//kingzone_2008/article/details/50298143