MapReduce論文學習筆記

jopen 11年前發布 | 33K 次閱讀 MapReduce 分布式/云計算/大數據

一、簡介

* 大數據計算問題的提出:如何處理并行計算、如何分發數據、如何處理錯誤等等。這些問題合起來使得大數據處理變得復雜
* 為了解決這些問題,需要設計新的計算抽象模型:只要表述想要執行的運算,而屏蔽了并行計算、容錯、數據分發、負載均衡等復雜細節,這些細節被封裝在一個庫里
* 抽象模型設計的靈感來自于函數式語言的Map和Reduce原語
    - Map:對輸入數據應用Map操作得出一個中間<key, value>對集合
    - Reduce:對具有相同key的value集合上應用Reduce操作合并中間結果

* MapReduce框架模型:通過簡單接口(用戶只要實現Map和Reduce函數)實現大規模數據的分布式計算,實現在大量普通機器上的高性能計算

二、編程模型

1、例子

* 計算一個大文檔集合中每個單詞的出現次數(MapReduce編程的HelloWorld了),偽代碼:

map(string key, string value)

    for each word w in value

        emitIntermediate(w, 1)

reduce(string key, Iterator values)

    int result = 0

    for each v in values

        result += ParseInt(v);

    Emit(AsString(result))

* map函數輸出文檔每個詞(key),及這個詞的出現次數(value,這里為1)

* reduce函數將每個詞的出現次數累加起來輸出

2、類型

* 在概念上,Map和Reduce函數有相關聯的輸入輸出類型

    map(k1, v1) => list(k2, v2)

    reduce(k2, list(v2)) => list(v2)

3、更多的例子

* 分布式grep,分布式排序,計算URL訪問頻率,轉置網絡鏈接圖等等

三、實現

* MapReduce模型有多種不同實現方式,取決于具體環境,如小型的共享內存機器、大型NUMA架構多處理器主機、大型網絡連接集群等

1、執行概括

* Map階段概括:輸入數據自動分割為M個片段集合,Map調用因此分到多臺機器上并行處理。

* Reduce階段概括:使用分區函數將Map輸出的key值分成R個分區(如hash(key) mod R),使得Reduce調用也被分到多臺機器并行處理。這里分區數R和分區函數作為一個重要指標,由用戶來指定

1.jpg

a、用戶調用MapReduce庫將輸入文件分為M個數據片段(即split,一般為16~64MB)

b、用戶程序有一個Master(即JobTracker ),其他都為Worker(即TaskTracker),由Master負責任務分配。

c、被分配了Map任務的Map Worker讀取數據片段,將其解析出<key, value>對傳遞給用戶map函數處理,最后輸出中間<key, value>對到內存

d、緩存的<key, value>對通過分區函數分成R個分區,周期性寫入本地磁盤,并將位置信息上傳給Master。Master再將這些存儲位置傳給Reduce Worker

e、Reduce Worker接收到信息后,使用RPC將落地的<key, value>對讀取到本地,對Key排序后聚合相同Key值的數據,然后將<key, list<value>>對傳遞給用戶reduce函數處理,最后輸出到所屬分區的輸出文件(故最終的MapReduce輸出為R個文件)

2、Master數據結構

* Master數據結構存儲信息:每個Map/Reduce任務狀態、中間文件存儲信息,Worker機器標識等

* Master類似一個數據管道:Map任務產生的R個中間文件存儲信息,通過這個管道傳遞給Reduce任務

3、Worker容災

* Master周期性Ping每個Worker維持心跳,約定時間內沒收到返回信息則將Worker標為失效

* 對于失效的Map Worker:Map任務輸出到本機,已不可訪問,故需重新執行

* 對于失效的Reduce Worker:Reduce任務輸出到全局文件系統,故不許重新執行

* MapReduce可以處理大規模Worker失效的情況,最多只需簡單重新執行失效Worker未完成的任務

4、Master容災

* 一個簡單的方法為Master周期性將上面的數據結構落地,即檢查點CheckPoint。如果Master掛了可用CheckPoint啟動另一Master

* 失效處理機制:保證用戶提供輸入確定函數時,在任何情況下(各種失效)的輸出都和沒有出現任何錯誤,且順序執行產生的輸出是一樣的。通過依賴對Map和Reduce任務的輸出是原子提交來完成這個特性

5、存儲位置

* 網絡帶寬為相對匱乏的資源,Master調度時,會盡量將Map任務調度在包含相關輸入的機器上執行(對于大型集群,大部分輸出能實現)

* 對于上述調度失敗的情況,Master會選擇相關輸入附近的機器執行

6、任務粒度

* 任務粒度M和R:我們把Map拆分為M個片段(輸入數據split決定),把Reduce拆分為R個片段(分區函數決定)

* 理想狀態下,M和R應該比Worker數要多得多,有利于Worker機器執行大量不同小任務而利于集群負載均衡,以及故障恢復

* 實際上,M和R取值收到一些限制,如Master內存容納的數據結構等

* M和R值為系統2個較關鍵調優參數,通常的比例為M=200000,R=5000,Worker=2000

7、備用任務

* 一個現象:影響MapReduce總執行時間常常是極少數“落伍任務”,出現“落伍”的原因很多難以避免。

* 備用任務為一種減少“落伍任務”出現幾率的機制:當MapReduce總執行接近結束時,多啟動一個備用任務來執行剩下的還未完成的任務。

* 添加備用任務機制的效果:多花費幾個百分點的計算資源,對于減少超大MapReduce總執行時間效果顯著(排序任務:優化44%時間)

四、技巧

1、分區函數

* 分區函數:使用分區函數對Map的輸出進行分區,即指定Map任務的輸出按key分為R個分區給Reduce任務處理

* 默認的分區函數為 hash(key) mod R,hash能產生非常平衡的分區

2、順序保證

* 保證中間<key, value>對的處理順序是按key值增量進行的(先排序),保證每個分區生成有序的輸出文件,對于后面一些應用很有幫助

3、Combiner函數

* 由于Map產生的中間<key, value>對中key值重復的比重很大(如詞數統計的例子,每個Map會產生很多的<word,1>),所以我們允許用戶定義一個可選的Combiner函數,來將Map產生的本地的中間<key, value>對先進行一次合并,再通過RPC傳遞給Reduce

* 一般情況下,Combiner和Reduce函數是一樣的,唯一的區別是MapReduce庫控制函數輸出不同,Combiner輸出為中間文件,Reduce輸出為最終結果文件

4、輸入輸出類型

* MapReduce庫支持幾種預定義好的輸入類型,如文本、Mysql等等。

* 以文本作為輸入數據為例,文件的偏移量作為key,每一行的內容作為value,就可將文本每一行視為<key,value>對傳給Map任務

* 用戶可使用Reader接口實現自己新定義的輸入類型

5、副作用

* 在某些情況下,如果Map和Reduce操作增加輔助的輸出文件會比較簡單,故我們依靠Writer接口把這種“副作用”變成原子和冪等的(冪等:總產生相同結果的數學運算)

* 實現:首先把輸出結果寫到一個臨時文件中,在完成輸出后,再調用系統級的原子操作rename臨時文件

6、跳過損壞的記錄

* 當用戶程序Bug導致在處理某些記錄時Crash掉,慣常的做法的修復Bug后再次執行。但有時候這些Bug修復是非常困難的,然而對于一個巨大的數據集來說,忽略小部分記錄是可以接受的

* 基于上述需求,我們提供一種執行模式:在這種模式下,為了保證整個處理順利進行,MapReduce庫會檢查導致Crash的記錄,并跳過這些記錄不處理

* 實現:MapReduce庫通過全局變量保存所有記錄序號,每個Worker進程設置信號處理函數來捕獲異常(內存段/總線等),當程序Bug導致異常錯誤被信號處理函數捕獲,信號處理函數會通過UDP包向Master發出最后一條記錄的序號

7、本地執行

* 由于Map和Reduce會分布到集群各個機器上執行,而且執行位置是Master動態調度的,所以調試Map和Reduce函數非常困難。

* 所以我們開發了一套MapReduce庫的本地版本,用于使用本地調試和測試工具,如gdb等

8、狀態信息

* Master使用嵌入式HTTP服務器顯示一組狀態信息Web界面,給用戶監控任務執行狀態

* 顯示的內容包括:計算執行進度、各狀態的任務數、輸入/中間/輸出的字節數、各Worker的狀態等等

9、計數器

* MapReduce庫使用計數器來統計各事件的發生次數,計數器當前的值也會顯示在Master狀態頁面給用戶監控

* 用戶創建:用戶可在程序中創建一個計數器對象,在map和reduce函數中增加計數器的值。

* 系統自動維護:MapReduce庫自己也會維護一些計數器對象,比如已處理的<key, value>對數量等

* 計數器機制對MapReduce操作的完整性檢查非常有用,比如監控中間key值集合必須等于輸出的key值集合等

五、性能

* 這里在同個集群環境做了2個性能測試,1個為特定模式匹配,1個為數據排序,這2個程序在MapReduce的應用也是非常典型的

1、集群配置

* 1800臺機器,每臺機器2個2G主頻Intel XeonCPU、4GB內存、2個160G硬盤、千兆以太網卡

* 集群部署在2層樹形交換網絡,Root節點100~200GBPS傳輸帶寬,所有機器均對等部署,任意兩點網絡來回時間小于1ms

2、GREP(測試1)

* 實驗設計:掃描10^10左右個100字節組成的記錄(1TB),查找出概論較小的3個字符模式。輸入數據拆分64MB的block(M=15000),結果輸出到一個文件中(R=1,因為結果集不大)

2.jpg

* 實驗結果如上圖所示,Y軸為處理速度,處理速度隨著參與MapReduce計算機器的增加而增加。整個過程用了150s,包括開始約1分鐘預啟動(程序上次到集群、GFS打開文件、Master獲取文件位置信息等),以及80s的實際計算時間

3、排序(測試2)

* 實驗設計:排序10^10左右個100字節組成的記錄(1TB),模仿YeraSort排序。輸入數據拆分64MB的block(M=15000),排序結果輸出到4000個文件中(R=4000)

3.jpg

* 圖的(a)欄顯示排序程序正常執行的情況,對于正常執行的程序而言:

    - 上圖Input:顯示Map輸入數據讀取速度,峰值達到13GB/s

    - 中圖Shuffle:顯示從Map任務的中間數據傳輸給Reduce任務的網絡IO情況

    - 下圖Output:顯示Reduce最終輸出文件速度

* 有一些值得注意的現象,如Map對輸入數據的讀取速度比Reduce對輸出數據的寫入速度高很多,這是因為輸入數據本地優化策略的作用,使得絕大部分數據是從本地硬盤讀取的原因

4、高效的備用任務

* 圖的(b)顯示了關閉備用任務的執行情況,與(a)相比,下圖Output輸出最終文件時間上被拖了一個長長的尾巴,而且這段時間幾乎沒有寫入,卻將任務執行時間在960s后由延時了300s(44%)。

5、失效的機器

* 圖的(c)顯示了在程序開始幾分鐘后kill掉200(1/9)個Worker的執行情況,集群立即調度重啟新的Worker處理進程。總的執行時間為933s,只比程序正常執行的情況多用了5%的時間

六、經驗

* MapReduce庫能廣泛應用于我們日常工作遇到的各類問題,在各個領域應用廣泛,如大規模機器學習、網頁信息提取、大規模圖形計算等

* MapReduce的成功取決于快速寫出一個簡單的程序,就能在上千臺機器的集群上做大規模并發處理,極大加快了設計和開發周期;而且完全可以讓沒有分布式/并行處理開發經驗的程序員利用大量資源,開發出分布式/并行處理的應用

* MapReduce最成功的應用就是重寫了Google搜索服務所使用的Index系統(大規模索引),使用MapReduce帶來了代碼簡單小巧、高性能、操作管理更簡單的優點

來自:http://blog.csdn.net/yyyiran/article/details/12918891

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!