Hadoop:分布式計算平臺初探
Hadoop是一個開發和運行處理大規模數據的軟件平臺,是Appach的一個用java語言實現開源軟件框架,實現在大量計算機組成的集群中對海 量數據進行分布式計算。Hadoop框架中最核心設計就是:MapReduce和HDFS。MapReduce提供了對數據的計算,HDFS提供了海量數 據的存儲。
MapReduce
MapReduce的思想是由Google的一篇論文所提及而被廣為流傳的,簡單的一句話解釋MapReduce就是“任務的分解與結果的匯總”。 HDFS是Hadoop分布式文件系統(Hadoop Distributed File System)的縮寫,為分布式計算存儲提供了底層支持。
MapReduce從它名字上來看就大致可以看出個緣由,兩個動詞Map和Reduce,“Map(展開)”就是將一個任務分解成為多個任 務,“Reduce”就是將分解后多任務處理的結果匯總起來,得出最后的分析結果。不論是現實社會,還是在程序設計中,一項工作往往可以被拆分成為多個任 務,任務之間的關系可以分為兩種:一種是不相關的任務,可以并行執行;另一種是任務之間有相互的依賴,先后順序不能夠顛倒,這類任務是無法并行處理的。
在分布式系統中,機器集群就可以看作硬件資源池,將并行的任務拆分,然后交由每一個空閑機器資源去處理,能夠極大地提高計算效率,同時這種資源無關 性,對于計算集群的擴展無疑提供了最好的設計保證。任務分解處理以后,那就需要將處理以后的結果再匯總起來,這就是Reduce要做的工作。
上圖是論文里給出的流程圖。一切都是從最上方的User Program開始的,User Program鏈接了MapReduce庫,實現了最基本的Map函數和Reduce函數。圖中執行的順序都用數字標記了。
- MapReduce庫先把user program的輸入文件劃分為M份(M為用戶定義),每一份通常有16MB到64MB,如圖左方所示分成了split0~4;然后使用fork將用戶進程拷貝到集群內其它機器上。
- user program的副本中有一個稱為master,其余稱為worker,master是負責調度的,為空閑worker分配作業(Map作業或者Reduce作業),worker的數量也是可以由用戶指定的。
- 被分配了Map作業的worker,開始讀取對應分片的輸入數據,Map作業數量是由M決定的,和split一一對應;Map作業從輸入數據中抽取出鍵值對,每一個鍵值對都作為參數傳遞給map函數,map函數產生的中間鍵值對被緩存在內存中。
- 緩存的中間鍵值對會被定期寫入本地磁盤,而且被分為R個區,R的大小是由用戶定義的,將來每個區會對應一個Reduce作業;這些中間鍵值對的位置會被通報給master,master負責將信息轉發給Reduce worker。
- master通知分配了Reduce作業的worker它負責的分區在什么位置(肯定不止一個地方,每個Map作業產生的中間鍵值對都可能映射到 所有R個不同分區),當Reduce worker把所有它負責的中間鍵值對都讀過來后,先對它們進行排序,使得相同鍵的鍵值對聚集在一起。因為不同的鍵可能會映射到同一個分區也就是同一個 Reduce作業(誰讓分區少呢),所以排序是必須的。
- reduce worker遍歷排序后的中間鍵值對,對于每個唯一的鍵,都將鍵與關聯的值傳遞給reduce函數,reduce函數產生的輸出會添加到這個分區的輸出文件中。
- 當所有的Map和Reduce作業都完成了,master喚醒正版的user program,MapReduce函數調用返回user program的代碼。 </ol>
- kvoffsets緩沖區,也叫偏移量索引數組,用于保存key/value信息在位置索引kvindices中的偏移量。當kvoffsets 的使用率超過io.sort.spill.percent(默認為80%)后,便會觸發一次SpillThread線程的“溢寫”操作,也就是開始一次 Spill階段的操作。
- kvindices緩沖區,也叫位置索引數組,用于保存key/value在數據緩沖區kvbuffer中的起始位置。
- kvbuffer即數據緩沖區,用于保存實際的key/value的值。默認情況下該緩沖區最多可以使用io.sort.mb的95%,當 kvbuffer使用率超過io.sort.spill.percent(默認為80%)后,便會出發一次SpillThread線程的“溢寫”操作,也 就是開始一次Spill階段的操作。 </ul>
- 對于整個集群有單一的命名空間。
- 數據一致性。適合一次寫入多次讀取的模型,客戶端在文件沒有被成功創建之前無法看到文件存在。
- 文件會被分割成多個文件塊,每個文件塊被分配存儲到數據節點上,而且根據配置會由復制文件塊來保證數據的安全性。 </ol>
- Client向NameNode發起文件寫入的請求。
- NameNode根據文件大小和文件塊配置情況,返回給Client它所管理部分DataNode的信息。
- Client將文件劃分為多個Block,根據DataNode的地址信息,按順序寫入到每一個DataNode塊中。 </ol>
- Client向NameNode發起文件讀取的請求。
- NameNode返回文件存儲的DataNode的信息。
- Client讀取文件信息。 </ol>
- NameNode發現部分文件的Block不符合最小復制數或者部分DataNode失效。
- 通知DataNode相互復制Block。
- DataNode開始直接相互復制。 </ol>
- Block的放置:默認不配置。一個Block會有三份備份,一份放在NameNode指定的DataNode,另一份放在與指定 DataNode非同一Rack上的DataNode,最后一份放在與指定DataNode同一Rack上的DataNode上。備份無非就是為了數據安 全,考慮同一Rack的失敗情況以及不同Rack之間數據拷貝性能問題就采用這種配置方式。
- 心跳檢測DataNode的健康狀況,如果發現問題就采取數據備份的方式來保證數據的安全性。
- 數據復制(場景為DataNode失敗、需要平衡DataNode的存儲利用率和需要平衡DataNode數據交互壓力等情況):這里先說一下, 使用HDFS的balancer命令,可以配置一個Threshold來平衡每一個DataNode磁盤利用率。例如設置了Threshold為10%, 那么執行balancer命令的時候,首先統計所有DataNode的磁盤利用率的均值,然后判斷如果某一個DataNode的磁盤利用率超過這個均值 Threshold以上,那么將會把這個DataNode的block轉移到磁盤利用率低的DataNode,這對于新節點的加入來說十分有用。
- 數據交驗:采用CRC32作數據交驗。在文件Block寫入的時候除了寫入數據還會寫入交驗信息,在讀取的時候需要交驗后再讀入。
- NameNode是單點:如果失敗的話,任務處理信息將會紀錄在本地文件系統和遠端的文件系統中。
- 數據管道性的寫入:當客戶端要寫入文件到DataNode上,首先客戶端讀取一個Block然后寫到第一個DataNode上,然后由第一個 DataNode傳遞到備份的DataNode上,一直到所有需要寫入這個Block的NataNode都成功寫入,客戶端才會繼續開始寫下一個 Block。
- 安全模式:在分布式文件系統啟動的時候,開始的時候會有安全模式,當分布式文件系統處于安全模式的情況下,文件系統中的內容不允許修改也不允許刪 除,直到安全模式結束。安全模式主要是為了系統啟動的時候檢查各個DataNode上數據塊的有效性,同時根據策略必要的復制或者刪除部分數據塊。運行期 通過命令也可以進入安全模式。在實踐過程中,系統啟動的時候去修改和刪除文件也會有安全模式不允許修改的出錯提示,只需要等待一會兒即可。 </ol>
- 高可靠性。Hadoop按位存儲和處理數據的能力值得人們信賴。
- 高擴展性。Hadoop是在可用的計算機集簇間分配數據并完成計算任務的,這些集簇可以方便地擴展到數以千計的節點中。
- 高效性。Hadoop能夠在節點之間動態地移動數據,并保證各個節點的動態平衡,因此處理速度非常快。
- 高容錯性。Hadoop能夠自動保存數據的多個副本,并且能夠自動將失敗的任務重新分配。
- 低成本。與一體機、商用數據倉庫以及QlikView、Yonghong Z-Suite等數據集市相比,hadoop是開源的,項目的軟件成本因此會大大降低。 </ul>
所有執行完畢后,MapReduce輸出放在了R個分區的輸出文件中(分別對應一個Reduce作業)。用戶通常并不需要合并這R個文件,而是將其 作為輸入交給另一個MapReduce程序處理。整個過程中,輸入數據是來自底層分布式文件系統(HDFS)的,中間數據是放在本地文件系統的,最終輸出 數據是寫入底層分布式文件系統(HDFS)的。而且我們要注意Map/Reduce作業和map/reduce函數的區別:Map作業處理一個輸入數據的 分片,可能需要調用多次map函數來處理每個輸入鍵值對;Reduce作業處理一個分區的中間鍵值對,期間要對每個不同的鍵調用一次reduce函 數,Reduce作業最終也對應一個輸出文件。
上訴流程分為三個階段。第一階段是準備階段,包括1、2,主角是MapReduce庫,完成拆分作業和拷貝用戶程序等任務;第二階段是運行階段,包 括3、4、5、6,主角是用戶定義的map和reduce函數,每個小作業都獨立運行著;第三階段是掃尾階段,這時作業已經完成,作業結果被放在輸出文件 里,就看用戶想怎么處理這些輸出了。
在Map前還可能會對輸入的數據有Split(分割)的過程,保證任務并行效率,在Map之后還會有Shuffle(混合)的過程,對于提高 Reduce的效率以及減小數據傳輸的壓力有很大的幫助。Shuffle過程是MapReduce的核心,也被稱為奇跡發生的地方。要想理解 MapReduce,Shuffle是必須要了解的。
MapReduce的Shuffle過程介紹
Shuffle的本義是洗牌、混洗,把一組有一定規則的數據盡量轉換成一組無規則的數據,越隨機越好。MapReduce中的Shuffle更像是 洗牌的逆過程,把一組無規則的數據盡量轉換成一組具有一定規則的數據。為什么MapReduce計算模型需要Shuffle過程?我們都知道 MapReduce計算模型一般包括兩個重要的階段:Map是映射,負責數據的過濾分發;Reduce是規約,負責數據的計算歸并。Reduce的數據來 源于Map,Map的輸出即是Reduce的輸入,Reduce需要通過Shuffle來獲取數據。
可能大家更熟悉的是Java API里的Collections.shuffle(List)方法,它會隨機地打亂參數list里的元素順序。如果你不知道MapReduce里Shuffle是什么,那么請看這張圖:
實際上,從Map Task任務中的map()方法中的最后一步調用即輸出中間數據開始,一直到Reduce Task任務開始執行reduce()方法結束,這一中間處理過程就被稱為MapReduce的Shuffle。Shuffle過程分為兩個階段:Map 端的shuffle階段和Reduce端的Shuffle階段。
從Map輸出到Reduce輸入的整個過程可以廣義地稱為Shuffle。Shuffle橫跨Map端和Reduce端,在Map端包括Spill過程,在Reduce端包括copy和sort過程,如圖所示:
Spill過程包括輸出、排序、溢寫、合并等步驟,如圖所示:
1、Collect階段
每個Map任務不斷地以
Kvbuffer的存放指針bufindex是一直悶著頭地向上增長,比如bufindex初始值為0,一個Int型的key寫完之后,bufindex增長為4,一個Int型的value寫完之后,bufindex增長為8。
索引是對
Kvbuffer的大小雖然可以通過參數設置,但是總共就那么大,
關于Spill觸發的條件,也就是Kvbuffer用到什么程度開始Spill,還是要講究一下的。如果把Kvbuffer用得死死得,一點縫都不 剩的時候再開始Spill,那Map任務就需要等Spill完成騰出空間之后才能繼續寫數據;如果Kvbuffer只是滿到一定程度,比如80%的時候就 開始Spill,那在Spill的同時,Map任務還能繼續寫數據,如果Spill夠快,Map可能都不需要為空閑空間而發愁。兩利相衡取其大,一般選擇 后者。
Spill這個重要的過程是由Spill線程承擔,Spill線程從Map任務接到“命令”之后就開始正式干活,干的活叫SortAndSpill,原來不僅僅是Spill,在Spill之前還有個頗具爭議性的Sort。
在Map Task任務的業務處理方法map()中,最后一步通過OutputCollector.collect(key,value)或 context.write(key,value)輸出Map Task的中間處理結果,在相關的collect(key,value)方法中,會調用Partitioner.getPartition(K2 key, V2 value, int numPartitions)方法獲得輸出的key/value對應的分區號(分區號可以認為對應著一個要執行Reduce Task的節點),然后將
當緩沖區中的數據使用率達到一定閥值后,觸發一次Spill操作,將環形緩沖區中的部分數據寫到磁盤上,生成一個臨時的Linux本地數據的 spill文件;然后在緩沖區的使用率再次達到閥值后,再次生成一個spill文件。直到數據處理完畢,在磁盤上會生成很多的臨時文件。
MapOutputBuffer內部存數的數據采用了兩個索引結構,涉及三個環形內存緩沖區。下來看一下兩級索引結構:
三個環形緩沖區:
2、Sort階段
先把Kvbuffer中的數據按照partition值和key兩個關鍵字升序排序,移動的只是索引數據,排序結果是Kvmeta中數據按照partition為單位聚集在一起,同一partition內的按照key有序。
3、Spill階段
當緩沖區的使用率達到一定閥值后,觸發一次“溢寫”操作,將環形緩沖區中的部分數據寫到Linux的本地磁盤。需要特別注意的是,在將數據寫磁盤之 前,先要對要寫磁盤的數據進行一次排序操作,先按
Spill線程為這次Spill過程創建一個磁盤文件:從所有的本地目錄中輪訓查找能存儲這么大空間的目錄,找到之后在其中創建一個類似于 “spill12.out”的文件。Spill線程根據排過序的Kvmeta挨個partition的把
所有的partition對應的數據都放在這個文件里,雖然是順序存放的,但是怎么直接知道某個partition在這個文件中存放的起始位置呢? 強大的索引又出場了。有一個三元組記錄某個partition對應的數據在這個文件中的索引:起始位置、原始數據長度、壓縮之后的數據長度,一個 partition對應一個三元組。然后把這些索引信息存放在內存中,如果內存中放不下了,后續的索引信息就需要寫到磁盤文件中了:從所有的本地目錄中輪 訓查找能存儲這么大空間的目錄,找到之后在其中創建一個類似于“spill12.out.index”的文件,文件中不光存儲了索引數據,還存儲了 crc32的校驗數據。(spill12.out.index不一定在磁盤上創建,如果內存(默認1M空間)中能放得下就放在內存中,即使在磁盤上創建 了,和spill12.out文件也不一定在同一個目錄下。)
每一次Spill過程就會最少生成一個out文件,有時還會生成index文件,Spill的次數也烙印在文件名中。索引文件和數據文件的對應關系如下圖所示:
話分兩端,在Spill線程如火如荼的進行SortAndSpill工作的同時,Map任務不會因此而停歇,而是一無既往地進行著數據輸出。Map 還是把數據寫到kvbuffer中,那問題就來了:
Map任務總要把輸出的數據寫到磁盤上,即使輸出數據量很小在內存中全部能裝得下,在最后也會把數據刷到磁盤上。
4、Combine階段
待Map Task任務的所有數據都處理完后,會對任務產生的所有中間數據文件做一次合并操作,以確保一個Map Task最終只生成一個中間數據文件。
5、Copy階段。
Reduce任務通過HTTP向各個Map任務拖取它所需要的數據。每個節點都會啟動一個常駐的HTTP server,其中一項服務就是響應Reduce拖取Map數據。當有MapOutput的HTTP請求過來的時候,HTTP server就讀取相應的Map輸出文件中對應這個Reduce部分的數據通過網絡流輸出給Reduce。
Reduce任務拖取某個Map對應的數據,如果在內存中能放得下這次數據的話就直接把數據寫到內存中。Reduce要向每個Map去拖取數據,在 內存中每個Map對應一塊數據,當內存中存儲的Map數據占用空間達到一定程度的時候,開始啟動內存中merge,把內存中的數據merge輸出到磁盤上 一個文件中。
如果在內存中不能放得下這個Map的數據的話,直接把Map數據寫到磁盤上,在本地目錄創建一個文件,從HTTP流中讀取數據然后寫到磁盤,使用的 緩存區大小是64K。拖一個Map數據過來就會創建一個文件,當文件數量達到一定閾值時,開始啟動磁盤文件merge,把這些文件合并輸出到一個文件。
有些Map的數據較小是可以放在內存中的,有些Map的數據較大需要放在磁盤上,這樣最后Reduce任務拖過來的數據有些放在內存中了有些放在磁盤上,最后會對這些來一個全局合并。
默認情況下,當整個MapReduce作業的所有已執行完成的Map Task任務數超過Map Task總數的5%后,JobTracker便會開始調度執行Reduce Task任務。然后Reduce Task任務默認啟動mapred.reduce.parallel.copies(默認為5)個MapOutputCopier線程到已完成的Map Task任務節點上分別copy一份屬于自己的數據。 這些copy的數據會首先保存的內存緩沖區中,當內沖緩沖區的使用率達到一定閥值后,則寫到磁盤 上。
5、Merge階段
在遠程copy數據的同時,Reduce Task在后臺啟動了兩個后臺線程對內存和磁盤上的數據文件做合并操作,以防止內存使用過多或磁盤生的文件過多。
Map任務如果輸出數據量很大,可能會進行好幾次Spill,out文件和Index文件會產生很多,分布在不同的磁盤上。最后把這些文件進行合并的merge過程閃亮登場。
Merge過程怎么知道產生的Spill文件都在哪了呢?從所有的本地目錄上掃描得到產生的Spill文件,然后把路徑存儲在一個數組里。 Merge過程又怎么知道Spill的索引信息呢?沒錯,也是從所有的本地目錄上掃描得到Index文件,然后把索引信息存儲在一個列表里。到這里,又遇 到了一個值得納悶的地方。在之前Spill過程中的時候為什么不直接把這些信息存儲在內存中呢,何必又多了這步掃描的操作?特別是Spill的索引數據, 之前當內存超限之后就把數據寫到磁盤,現在又要從磁盤把這些數據讀出來,還是需要裝到更多的內存中。之所以多此一舉,是因為這時kvbuffer這個內存 大戶已經不再使用可以回收,有內存空間來裝這些數據了。(對于內存空間較大的土豪來說,用內存來省卻這兩個io步驟還是值得考慮的。)
然后為merge過程創建一個叫file.out的文件和一個叫file.out.Index的文件用來存儲最終的輸出和索引。
一個partition一個partition的進行合并輸出。對于某個partition來說,從索引列表中查詢這個partition對應的所 有索引信息,每個對應一個段插入到段列表中。也就是這個partition對應一個段列表,記錄所有的Spill文件中對應的這個partition那段 數據的文件名、起始位置、長度等等。
然后對這個partition對應的所有的segment進行合并,目標是合并成一個segment。當這個partition對應很多個 segment時,會分批地進行合并:先從segment列表中把第一批取出來,以key為關鍵字放置成最小堆,然后從最小堆中每次取出最小 的
最終的索引數據仍然輸出到Index文件中。
Map端的Shuffle過程到此結束。
6、Merge Sort階段
在合并的同時,也會做排序操作。由于各個Map Task已經實現對數據做過局部排序,故此Reduce Task只需要做一次歸并排序即可保證copy數據的整體有序性。執行完合并與排序操作后,Reduce Task會將數據交給reduce()方法處理。
這里使用的Merge和Map端使用的Merge過程一樣。Map的輸出數據已經是有序的,Merge進行一次合并排序,所謂Reduce端的 sort過程就是這個合并的過程。一般Reduce是一邊copy一邊sort,即copy和sort兩個階段是重疊而不是完全分開的。
Reduce端的Shuffle過程至此結束。
HDFS
HDFS是分布式計算的存儲基石,Hadoop的分布式文件系統和其他分布式文件系統有很多類似的特質。分布式文件系統基本的幾個特點:
上圖中展現了整個HDFS三個重要角色:NameNode、DataNode和Client。NameNode可以看作是分布式文件系統中的管理 者,主要負責管理文件系統的命名空間、集群配置信息和存儲塊的復制等。NameNode會將文件系統的Meta-data存儲在內存中,這些信息主要包括 了文件信息、每一個文件對應的文件塊的信息和每一個文件塊在DataNode的信息等。DataNode是文件存儲的基本單元,它將Block存儲在本地 文件系統中,保存了Block的Meta-data,同時周期性地將所有存在的Block信息發送給NameNode。Client就是需要獲取分布式文 件系統文件的應用程序。這里通過三個操作來說明他們之間的交互關系。
文件寫入:
文件讀取:
文件Block復制:
最后再說一下HDFS的幾個設計特點(對于框架設計值得借鑒):
Hadoop的優勢
Hadoop是一個能夠讓用戶輕松架構和使用的分布式計算平臺。用戶可以輕松地在Hadoop上開發和運行處理海量數據的應用程序。它主要有以下幾個優點:
Hadoop得以在大數據處理應用中廣泛應用得益于其自身在數據提取、變形和加載(ETL)方面上的天然優勢。Hadoop的分布式架構,將大數據 處理引擎盡可能的靠近存儲,對例如像ETL這樣的批處理操作相對合適,因為類似這樣操作的批處理結果可以直接走向存儲。Hadoop的MapReduce 功能實現了將單個任務打碎,并將碎片任務發送(Map)到多個節點上,之后再以單個數據集的形式加載(Reduce)到數據倉庫里。
參考資料