Hadoop InputFormat淺析(轉)
隨著越來越多的公司采用Hadoop,它所處理的問題類型也變得愈發多元化。隨著Hadoop適用場景數量的不斷膨脹,控制好怎樣執行以及何處執行map任務顯得至關重要。實現這種控制的方法之一就是自定義InputFormat實現。
InputFormat 類是Hadoop Map Reduce框架中的基礎類之一。該類主要用來定義兩件事情:
- 數據分割(Data splits)
- 記錄讀取器(Record reader)
數據分割 是Hadoop Map Reduce框架中的基礎概念之一,它定義了單個Map任務的大小及其可能的執行服務器信息。記錄讀取器 主要負責從輸入文件實際讀取數據并將它們(以鍵值對的形式)提交給mapper。盡管有不少文章介紹過怎樣實現自定義的記錄讀取器(例如,參考文章[1]),但是關于如何進行分割(split)的介紹卻相當粗略。這里我們將會解釋什么是分割,并介紹怎樣實現自定義分割來完成特定任務。
剖析分割
任何分割操作的實現都繼承自Apache抽象基類——InputSplit,它定義了分割的長度及位置。分割長度 是指分割數據的大小(以字節為單位),而分割位置 是分割所在的機器結點名稱組成的列表,其中待分割的數據都會于本地存在。分割位置可以方便調度器決定在哪個機器上執行此次分割。簡化后的[1]作業跟蹤器(job tracker)工作流程如下:
- 接受來自某個任務跟蹤器(task tracker)的心跳通信,得到該位置map的可用情況。
- 為隊列等候中的分割任務找到可用的“本地”結點。
- 向任務跟蹤器提交分割請求以待執行。
數據局部性(Locality)的相關程度因存儲機制和整體的執行策略的不同而不同。例如,在Hadoop分布式文件系統(HDFS)中,分割通常對應一個物理數據塊大小以及該數據塊物理定位所在的一系列機器(其中機器總數由復制因子定義)的位置。這就是FileInputFormat 計算分割的過程。
而HBase的實現則采用了另外一套方法。在HBase中,分割 對應于一系列屬于某個表區域(table region)的表鍵(table keys),而位置則為正在運行區域服務器的機器。
計算密集型應用
Map Reduce應用中有一類特殊的應用叫做計算密集型應用(Compute-Intensive application)。這類應用的特點在于Mapper.map()函數執行的時間要遠遠長于數據訪問的時間,且至少要差一個數量級。從技術角度來說,雖然這類應用仍然可以使用“標準”輸入格式的實現,但是它會帶來數據存放結點過少而集群內剩余結點沒能充分利用的問題(見圖1)。
(點擊圖片進行放大)
圖1:數據局部性情況下的結點使用圖
圖1中顯示了針對計算密集型應用,使用“標準”數據局部性導致的結點使用率上的巨大差異——有些結點(紅色標注)被過度使用,而其他結點(黃色和淺綠色標注)則使用不足。由此可見,在針對計算密集型應用時,需要重新思考對“局部性”概念的認識。在這種情況下,“局部性”意味著所有可用結點之間map任務的均勻分布——即最大化地使用集群機器的計算能力。
使用自定義InputFormat改變“局部性”
假定源數據以文件序列的形式存在,那么一個簡單的ComputeIntensiveSequenceFileInputFormat 類(見清單1)便可以實現將分割生成的結果均勻地分布在集群中的所有服務器上。
package com.navteq.fr.mapReduce.InputFormat;import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.StringTokenizer;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.ClusterStatus; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
public class ComputeIntensiveSequenceFileInputFormat<K, V> extends SequenceFileInputFormat<K, V> {
private static final double SPLIT_SLOP = 1.1; // 10% slop static final String NUM_INPUT_FILES = "mapreduce.input.num.files";
/** * Generate the list of files and make them into FileSplits. */ @Override public List<InputSplit> getSplits(JobContext job) throws IOException {
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); long maxSize = getMaxSplitSize(job);
// get servers in the cluster String[] servers = getActiveServersList(job); if(servers == null) return null; // generate splits List<InputSplit> splits = new ArrayList<InputSplit>(); List<FileStatus>files = listStatus(job); int currentServer = 0; for (FileStatus file: files) { Path path = file.getPath(); long length = file.getLen(); if ((length != 0) && isSplitable(job, path)) { long blockSize = file.getBlockSize(); long splitSize = computeSplitSize(blockSize, minSize, maxSize);
long bytesRemaining = length; while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { splits.add(new FileSplit(path, length-bytesRemaining, splitSize, new String[] {servers[currentServer]})); currentServer = getNextServer(currentServer, servers.length); bytesRemaining -= splitSize; }
if (bytesRemaining != 0) { splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining, new String[] {servers[currentServer]})); currentServer = getNextServer(currentServer, servers.length); } } else if (length != 0) { splits.add(new FileSplit(path, 0, length, new String[] {servers[currentServer]})); currentServer = getNextServer(currentServer, servers.length); } else { //Create empty hosts array for zero length files splits.add(new FileSplit(path, 0, length, new String[0])); } }
// Save the number of input files in the job-conf job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
return splits; }
private String[] getActiveServersList(JobContext context){
String [] servers = null; try { JobClient jc = new JobClient((JobConf)context.getConfiguration()); ClusterStatus status = jc.getClusterStatus(true); Collection<String> atc = status.getActiveTrackerNames(); servers = new String[atc.size()]; int s = 0; for(String serverInfo : atc){ StringTokenizer st = new StringTokenizer(serverInfo, ":"); String trackerName = st.nextToken(); StringTokenizer st1 = new StringTokenizer(trackerName, "_"); st1.nextToken(); servers[s++] = st1.nextToken(); } }catch (IOException e) { e.printStackTrace(); }
return servers;
}
private static int getNextServer(int current, int max){
current++; if(current >= max) current = 0; return current; } }
清單1:ComputeIntensiveSequenceFileInputFormat類
該類繼承自 SequenceFileInputFormat 并重寫了getSplits()方法,雖然計算分割的過程與FileInputFormat完全一樣,但是它為分割指定了“局部性”,以便從集群中找出可用的服務器。getSplits()利用到了以下兩個方法:
- getActiveServersList() 方法,返回集群中當前可用的服務器(名稱)組成的數組。
- getNextServer() 方法,返回服務器數組中下一個服務器索引,且當服務器數組中元素全部用盡時,返回數組頭部重新開始。
雖然上述實現(見清單1)將map任務的執行均勻地分布在了集群中的所有服務器上,但是它卻完全忽略了數據的實際位置。稍微好點的getSplits方法實現(見清單2)可以試圖將兩種策略結合在一起:既盡量多地放置針對數據的本地作業,且保持剩余作業在集群上的良好平衡。[2]
public List<InputSplit> getSplits(JobContext job) throws IOException {// get splits List<InputSplit> originalSplits = super.getSplits(job);
// Get active servers String[] servers = getActiveServersList(job); if(servers == null) return null; // reassign splits to active servers List<InputSplit> splits = new ArrayList<InputSplit>(originalSplits.size()); int numSplits = originalSplits.size(); int currentServer = 0; for(int i = 0; i < numSplits; i++, currentServer = i>getNextServer(currentServer, servers.length)){ String server = servers[currentServer]; // Current server boolean replaced = false; // For every remaining split for(InputSplit split : originalSplits){ FileSplit fs = (FileSplit)split; // For every split location for(String l : fs.getLocations()){ // If this split is local to the server if(l.equals(server)){ // Fix split location splits.add(new FileSplit(fs.getPath(), fs.getStart(), fs.getLength(), new String[] {server})); originalSplits.remove(split); replaced = true; break; } } if(replaced) break; } // If no local splits are found for this server if(!replaced){ // Assign first available split to it FileSplit fs = (FileSplit)splits.get(0); splits.add(new FileSplit(fs.getPath(), fs.getStart(), fs.getLength(), new String[] {server})); originalSplits.remove(0); } } return splits; }
清單2:優化過的getSplits方法
在此實現中,我們首先使用父類(FileInputSplit)來得到包含位置計算在內的分割以確保數據局部性。然后我們計算出可用的服務器列表,并為每一個存在的服務器嘗試分配與其同處本地的分割。
延遲公平調度
雖然清單1和清單2中的代碼都正確地計算出了分割位置,但當我們試圖在Hadoop集群上運行代碼時,就會發現結果與服務器之間產生均勻分布相去甚遠。參考文章[2]中很好的描述了我們觀察到的這個問題,并為該問題描述了一種解決方案——即延遲公平調度。
假設已經設置好了公平調度程序,那么下面的程序段應當加入到mapred-site.xml文件中以激活某個延遲調度程序[3]:
<property> <name>mapred.fairscheduler.locality.delay</name> <value>360000000</value> <property>
適當借助延遲公平調度程序,作業執行將可以利用整個集群(見圖2)。此外,根據我們的實驗,這種情況下的執行時間相比“數據局部性”的做法要節省約30%。
(點擊圖片進行放大)
圖2:執行局部性情況下的結點使用圖
其他注意事項
用于測試的計算作業共使用了96個split和mapper任務。測試集群擁有19個數據結點,其中每個數據結點擁有8個mapper槽,因此該集群共有152個可用槽。當該作業運行時,它并沒有充分利用集群中的所有槽。
Ganglia的兩份截圖都展示了我們所使用的測試集群,其中前三個結點為控制結點,而第四個結點為邊緣結點,主要用來啟動作業。圖中展示了中央處理器/機器的負載情況。在圖1中,有一些結點被過度使用(紅色顯示),而集群中的其他結點則未得到充分利用。在圖2中,雖然我們得到了更加平衡的分布,然而集群仍然未被充分利用。用于測試的作業也可以運行多線程,這么做會增加中央處理器的負載,但同時也會降低在每次Mapper.map()迭代上的整體時間花費。正如圖3所示,通過增加線程數量,我們可以更好地利用集群資源,并進一步減少完成作業所花費的時間。通過改變作業區域性,我們可以在不犧牲性能的情況下更好地利用群集處理遠程作業數據。
(點擊圖片進行放大)
圖3:使用多線程Map作業的執行區域性情況下的結點使用圖
即使機器中央處理器處于高負荷狀態,它仍然可以允許其他磁盤I/O密集型作業運行在開放槽中,要注意的是,這么做會帶來些許的性能下降。
自定義分割
本文中提到的方法對大文件非常適用,但是對于小文件而言,并沒有足夠的分割來讓其使用集群中的多臺機器。一種可行的方法是使用更小的分割塊,但是這么做會給集群命名結點帶來更多的負擔(內存需求方面)。一種更好的做法是修改清單1中的代碼,以使用自定義的塊大小(而不是文件塊大小)。這種方法可以計算出所需的分割塊數量,而不用理會實際的文件大小。
總結
在這篇文章中,我們已經展示了如何利用自定義的InputFormats來更緊密地控制Map Reduce中的map任務在可用服務器間的分布。這種控制對于一類特殊應用——計算密集型應用非常重要,控制過程將Hadoop Map Reduce做為通用的并行執行框架使用。
關于作者
Boris Lublinsky是NAVTEQ公司的首席架構師,在這家公司中他的工作是為大型數據管理、處理以及SOA定義架構愿景,并且實施各種NAVTEQ的項目。他還是InfoQ的SOA編輯,OASIS的SOA RA工作組的參與者。Boris是一位作者并經常發表演講,他最新的一本書叫做《Applied SOA》。
Michael Segel在過去二十多年里一直不斷與客戶合作,幫助他們發現并解決業務上的問題。Michael做過許多不同類型的工作,也在不同的行業圈摸打滾爬過。他是一位獨立顧問,并且總是期望能夠解決所有具有挑戰性的問題。Michael擁有俄亥俄州立大學的軟件工程學位。
參考
1. Boris Lublinsky, Mike Segel. Custom XML Records Reader.
[1] 這是一個簡化后的解釋。真實的調度算法要復雜得多;考慮更多的參數而不僅僅是分割的位置。
[2] 雖然我們將其列作一個選項,但是如果花費在Mapper.map()方法上的時間高與遠程訪問數據時間的一個或多個數量級時,清單1中的代碼將不會有任何性能上的提升。但盡管如此,它也許會帶來網絡利用率上的略微提高。
[3] 請注意這里的延遲以毫秒為單位,在改變該值之后需要重新啟動作業跟蹤器。