Hadoop Outputformat淺析(轉)
http://www.infoq.com/cn/articles/HadoopOutputFormat
Hadoop常常被用作大型數據處理生態系統中的一部分。它的優勢在于能夠批量地處理大量數據,并將結果以最好的方式與其他系統相集成。從高層次角度來看,整個過程就是Hadoop接收輸入文件、使用自定義轉換(Map-Reduce步驟)獲得內容流,以及將輸出文件的結果寫回磁盤。上個月InfoQ展示了怎樣在第一個步驟中,使用InputFormat類來更好地對接收輸入文件進行控制。而在本文中,我們將同大家一起探討怎樣自定義最后一個步驟——即怎樣寫入輸出文件。OutputFormat將Map/Reduce作業的輸出結果轉換為其他應用程序可讀的方式,從而輕松實現與其他系統的互操作。為了展示OutputFormts的實用性,我們將用兩個例子進行討論:如何拆分作業結果到不同目錄以及如何為提供快速鍵值查找的服務寫入文件。
OutputFormats是做什么的?
OutputFormt接口決定了在哪里以及怎樣持久化作業結果。Hadoop為不同類型的格式提供了一系列的類和接口,實現自定義操作只要繼承其中的某個類或接口即可。你可能已經熟悉了默認的OutputFormat,也就是TextOutputFormat,它是一種以行分隔,包含制表符界定的鍵值對的文本文件格式。盡管如此,對多數類型的數據而言,如再常見不過的數字,文本序列化會浪費一些空間,由此帶來的結果是運行時間更長且資源消耗更多。為了避免文本文件的弊端,Hadoop提供了SequenceFileOutputformat,它將對象表示成二進制形式而不再是文本文件,并將結果進行壓縮。下面是Hadoop提供的類層次結構:
- FileOutputFormat(實現OutputFormat接口)—— 所有OutputFormats的基類
- MapFileOutputFormat —— 一種使用部分索引鍵的格式
- SequenceFileOutputFormat —— 二進制鍵值數據的壓縮格式
- SequenceFileAsBinaryOutputFormat —— 原生二進制數據的壓縮格式
- TextOutputFormat —— 以行分隔、包含制表符定界的鍵值對的文本文件格式
- MultipleOutputFormat —— 使用鍵值對參數寫入文件的抽象類
- MultipleTextOutputFormat —— 輸出多個以標準行分割、制表符定界格式的文件
- MultipleSequenceFileOutputFormat —— 輸出多個壓縮格式的文件
OutputFormat提供了對RecordWriter的實現,從而指定如何序列化數據。 RecordWriter類可以處理包含單個鍵值對的作業,并將結果寫入到OutputFormat中準備好的位置。RecordWriter的實現主要包括兩個函數:“write”和“close”。“write”函數從Map/Reduce作業中取出鍵值對,并將其字節寫入磁盤。LineRecordWriter是默認使用的RecordWriter,它是前面提到的TextOutputFormat的一部分。它寫入的內容包括:
- 鍵(key)的字節 (由getBytes()函數返回)
- 一個用以定界的制表符
- 值(value)的字節(同樣由getBytes()函數返回)
- 一個換行符
“close”函數會關閉Hadoop到輸出文件的數據流。
我們已經討論了輸出數據的格式,下面我們關心的問題是數據存儲在何處?同樣,你或許看到過某個作業的輸出結果會以多個“部分”文件的方式存儲在輸出目錄中,如下:
|-- output-directory | |-- part-00000 | |-- part-00001 | |-- part-00002 | |-- part-00003 | |-- part-00004 '-- part-00005
默認情況下,當需要寫入數據時,每個進程都會在輸出目錄創建自己的文件。數據由reducers在作業結束時寫入(如果沒有reducers會由mapper寫入)。即使在本文后面提到的創建自定義輸出目錄時,我們仍會保持寫入“部分”文件,這么做可以讓多個進程同時寫入同一個目錄而互不干擾。
自定義OutputFormat
從前面我們已經看到,OutputFormat類的主要職責是決定數據的存儲位置以及寫入的方式。那么為什么要自定義這些行為呢?自定義數據位置的原因之一是為了將Map/Reduce作業輸出分離到不同的目錄。例如,假設需要處理一個包含世界范圍內的搜索請求的日志文件,并希望計算出每個國家的搜索頻度。你想要在不牽涉其他國家的前提下能夠查看某個特定國家的結果。也許以后在你的數據管道中,會用不同的進程來處理不同的國家,或者想要把某個特定國家的結果復制一份到該國的數據中心去。使用默認的OutputFormat時,所有的數據都會存儲在同一目錄下,這樣在不瀏覽的情況下是無從知曉“部分”文件的內容的。而通過使用自定義的OutputFormat,你可以為每個國家創建一個子目錄的布局,如下:
|-- output-directory | |-- France | | |-- part-00000 | | |-- part-00001 | | '-- part-00002 ... | | '-- Zimbabwe | |-- part-00000 | |-- part-00001 | '-- part-00002
其中每個部分文件都具有鍵值對(“搜索詞匯”=>頻度)。現在只要簡單地指定某個國家數據所在的路徑,就可以只讀取該國家的數據了。下面我們將看到怎樣繼承MultipleTextOutputFormat類,以獲得所需的行為。
自定義OutputFormat還有一些其他的原因,以名為ElephantDB的項目為例, 它將數據以一種面向消費應用程序的“本地”形式進行存儲。這個項目的設立是為了讓Map/Reduece作業結果可以像分布式服務一樣被查詢。ElephantDB寫入的并不是文本文件,而是使用自定義的OutputFormat將結果寫成BerkeleyDB文件,其中這些文件使用作業輸出的鍵進行索引。之后使用某個服務加載BerkeleyDB文件,可以提供低延滯的任意鍵查找。類似的系統還有HBase和Voldemort,它們可以存儲Hadoop生成的鍵值數據。ElephantDB重點關注的是怎樣與Hadoop批量式更新進行簡易緊密的集成。
多路輸出
為了解決上面的搜索日志的問題,我們繼承了MultipleTextOutputFormat類,并根據被寫入的鍵值來選擇輸出目錄。我們的Map/Reduce作業將會為搜索請求所在國家生成一個鍵,并為搜索詞匯及該搜索的頻度產生一個值。由于MultipleTextOutputFormat已經知道如何寫入文本文件,因此并不需要為OutputFormat實現序列化功能。清單1實現了該類:
1 package oddjob.hadoop; 2 3 import org.apache.hadoop.fs.Path; 4 import org.apache.hadoop.io.Text; 5 import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat; 6 7 public class MultipleTextOutputFormatByKey extends MultipleTextOutputFormat<Text, Text> { 8 9 /** 10 * Use they key as part of the path for the final output file. 11 */ 12 @Override 13 protected String generateFileNameForKeyValue(Text key, Text value, String leaf) { 14 return new Path(key.toString(), leaf).toString(); 15 } 16 17 /** 18 * When actually writing the data, discard the key since it is already in 19 * the file path. 20 */ 21 @Override 22 protected Text generateActualKey(Text key, Text value) { 23 return null; 24 } 25 }
清單1:MultipleTextOutputFormat子類樣例
MultipleTextOutputFormatByKey類的generateActualFileNameForKeyValue方法指定了作業輸出的存儲位置(第13行)。對于每組由Map/Reduce作業生成的鍵值對,該類會把鍵加入到路徑名稱中作為輸出。“leaf”參數就是我們之前看到的“part-0000”,它在每個reducer中都是獨一無二的,這樣可以允許不同進程同時寫入到輸出目錄而互不影響。例如,由第一個reducer產生的鍵為“France”、值為“soccer 5000”的結果會被寫入到“output-directory/France/part-00000”內的某個文件中。
要使用這個類,需確保Hadoop包含了這個自定義類的jar,并使用完整的類名作為“-outputformat”的參數:
hadoop jar hadoop-streaming.jar -libjars CustomOutputFormats.jar \ -outputformat oddjob.hadoop.MultipleTextOutputFormatByKey \ -input search-logs \ -output search-frequency-by-country \ -mapper parse-logs.py \ -reducer count-searches.py
清單1是oddjob項目中某個類的Java實現。oddjob是一個開源庫,提供了多種MultipleTextOutputFormat。雖然這個庫面向的是Hadoop的流特性,但是它也可以用在產生文本鍵值輸出的其他作業中。
為服務準備輸出
在我們的下一個例子中,必須實現兩個接口來自定義數據序列化以及文件存放的目錄結構,以使結果可被ElephantDB服務加載。正如前面所討論的,序列化部分會由RecordWriter的實現來處理。在LineRecordWriter類將字節流寫入輸出文件的同時,ElephantRecordWriter還包含了專門的邏輯用來選擇要寫入的文件以及使用第三方庫來格式化磁盤上的數據。
1 public class ElephantRecordWriter implements RecordWriter<IntWritable, ElephantRecordWritable> { 2 3 FileSystem _fs; 4 Args _args; 5 Map<Integer, LocalPersistence> _lps = new HashMap<Integer, LocalPersistence>(); 6 Progressable _progressable; 7 LocalElephantManager _localManager; 8 9 int _numWritten = 0; 10 long _lastCheckpoint = System.currentTimeMillis(); 11 12 public ElephantRecordWriter(Configuration conf, Args args, Progressable progressable) throws IOException { 13 _fs = Utils.getFS(args.outputDirHdfs, conf); 14 _args = args; 15 _progressable = progressable; 16 _localManager = new LocalElephantManager(_fs, args.spec, args.persistenceOptions, LocalElephantManager.getTmpDirs(conf)); 17 } 18 19 private String remoteUpdateDirForShard(int shard) { 20 if(_args.updateDirHdfs==null) return null; 21 else return _args.updateDirHdfs + "/" + shard; 22 } 23 24 public void write(IntWritable shard, ElephantRecordWritable record) throws IOException { 25 LocalPersistence lp = null; 26 LocalPersistenceFactory fact = _args.spec.getLPFactory(); 27 Map<String, Object> options = _args.persistenceOptions; 28 if(_lps.containsKey(shard.get())) { 29 lp = _lps.get(shard.get()); 30 } else { 31 String updateDir = remoteUpdateDirForShard(shard.get()); 32 String localShard = _localManager.downloadRemoteShard("" + shard.get(), updateDir); 33 lp = fact.openPersistenceForAppend(localShard, options); 34 _lps.put(shard.get(), lp); 35 progress(); 36 } 37 38 _args.updater.updateElephant(lp, record.key, record.val); 39 40 _numWritten++; 41 if(_numWritten % 25000 == 0) { 42 long now = System.currentTimeMillis(); 43 long delta = now - _lastCheckpoint; 44 _lastCheckpoint = now; 45 LOG.info("Wrote last 25000 records in " + delta + " ms"); 46 _localManager.progress(); 47 } 48 } 49 50 public void close(Reporter reporter) throws IOException { 51 for(Integer shard: _lps.keySet()) { 52 String lpDir = _localManager.localTmpDir("" + shard); 53 LOG.info("Closing LP for shard " + shard + " at " + lpDir); 54 _lps.get(shard).close(); 55 LOG.info("Closed LP for shard " + shard + " at " + lpDir); 56 progress(); 57 String remoteDir = _args.outputDirHdfs + "/" + shard; 58 if(_fs.exists(new Path(remoteDir))) { 59 LOG.info("Deleting existing shard " + shard + " at " + remoteDir); 60 _fs.delete(new Path(remoteDir), true); 61 LOG.info("Deleted existing shard " + shard + " at " + remoteDir); 62 } 63 LOG.info("Copying " + lpDir + " to " + remoteDir); 64 _fs.copyFromLocalFile(new Path(lpDir), new Path(remoteDir)); 65 LOG.info("Copied " + lpDir + " to " + remoteDir); 66 progress(); 67 } 68 _localManager.cleanup(); 69 } 70 71 private void progress() { 72 if(_progressable!=null) _progressable.progress(); 73 } 74 }
清單2:從ElephantDB中摘錄的某個RecordWriter子類
ElephantDB的工作方式是通過跨越若干個LocalPersistence對象(BerkeleyDB文件)來對數據進行分片(劃分)。ElephantRecordWriter類中的write函數拿到分片ID,并檢查該分片是否已經打開(第28行),如果沒有則打開并創建一個新的本地文件(第33行)。第38行的updateElephant調用將作業輸出的鍵值對寫入到BerkeleyDB文件。
當關閉ElephantRecordWriter時,該類在第64行會復制BerkeleyDB文件到HDFS中,且可以隨意選擇是否覆蓋舊文件。接下去的progress方法調用會通知Hadoop當前的RecordWriter正在按計劃進行,這有點類似于真實Map/Reduce作業中的狀態或計數器更新。
下一步是利用ElephantRecordWriter來實現OutputFormat。要理解此清單中的代碼,重點是了解Hadoop JobConf對象封裝了什么。顧名思義,JobConf對象包含了某項作業的全部設置,包括輸入輸出目錄,作業名稱以及mapper和reducer類。清單3展示了兩個自定義類是如何共同工作的:
1 public class ElephantOutputFormat implements OutputFormat<IntWritable, ElephantRecordWritable> { 2 public static Logger LOG = Logger.getLogger(ElephantOutputFormat.class); 3 4 public RecordWriter<IntWritable, ElephantRecordWritable> getRecordWriter(FileSystem fs, JobConf conf, String string, Progressable progressable) throws IOException { 5 return new ElephantRecordWriter(conf, (Args) Utils.getObject(conf, ARGS_CONF), progressable); 6 } 7 8 public void checkOutputSpecs(FileSystem fs, JobConf conf) throws IOException { 9 Args args = (Args) Utils.getObject(conf, ARGS_CONF); 10 fs = Utils.getFS(args.outputDirHdfs, conf); 11 if(conf.getBoolean("mapred.reduce.tasks.speculative.execution", true)) { 12 throw new InvalidJobConfException("Speculative execution should be false"); 13 } 14 if(fs.exists(new Path(args.outputDirHdfs))) { 15 throw new InvalidJobConfException("Output dir already exists " + args.outputDirHdfs); 16 } 17 if(args.updateDirHdfs!=null && !fs.exists(new Path(args.updateDirHdfs))) { 18 throw new InvalidJobConfException("Shards to update does not exist " + args.updateDirHdfs); 19 } 20 } 21 }
清單3:從ElephantDB中摘錄的某個OutputFormat實現
正如前面所看到的,OutputFormat有兩個職責,分別是決定數據的存儲位置以及數據寫入的方式。ElephantOutputFormat的數據存儲位置是通過檢查JobConf以及在第14和17行檢查確保該位置是一個合法目標位置后來決定的。至于數據的寫入方式,則是由getRecordWriter函數處理,它的返回結果是清單2中的ElephantRecordWriter對象。
從Hadoop的角度來看,當Map/Reduce作業結束并且每個reducer產生了鍵值對流的時候,這些類會派上用場。Hadoop會以作業配置為參數調用checkOutputSpecs。如果函數運行沒有拋出異常,它會接下去調用getRecordWriter以返回可以寫入流數據的對象。當所有的鍵值對都被寫入后,Hadoop會調用writer中的close函數,將數據提交到HDFS并結束該reducer的職責。
總結
OutputFormat是Hadoop框架中的重要組成部分。它們通過為目標消費應用程序產生合適的輸出來提供與其他系統和服務間的互操作。自定義作業輸出位置可以簡化并加速數據工作流;而自定義結果輸出方式可以讓其快速地工作于其他不同的環境下。雖然實現OutputFormat和覆寫幾個方法一樣簡單,但是它足夠靈活可以支持全新的磁盤上的數據格式。