MapReduce論文學習筆記
一、簡介
* 大數據計算問題的提出:如何處理并行計算、如何分發數據、如何處理錯誤等等。這些問題合起來使得大數據處理變得復雜
* 為了解決這些問題,需要設計新的計算抽象模型:只要表述想要執行的運算,而屏蔽了并行計算、容錯、數據分發、負載均衡等復雜細節,這些細節被封裝在一個庫里
* 抽象模型設計的靈感來自于函數式語言的Map和Reduce原語
- Map:對輸入數據應用Map操作得出一個中間<key, value>對集合
- Reduce:對具有相同key的value集合上應用Reduce操作合并中間結果
* MapReduce框架模型:通過簡單接口(用戶只要實現Map和Reduce函數)實現大規模數據的分布式計算,實現在大量普通機器上的高性能計算
二、編程模型
1、例子
* 計算一個大文檔集合中每個單詞的出現次數(MapReduce編程的HelloWorld了),偽代碼:
map(string key, string value)
for each word w in value
emitIntermediate(w, 1)
reduce(string key, Iterator values)
int result = 0
for each v in values
result += ParseInt(v);
Emit(AsString(result))
* map函數輸出文檔每個詞(key),及這個詞的出現次數(value,這里為1)
* reduce函數將每個詞的出現次數累加起來輸出
2、類型
* 在概念上,Map和Reduce函數有相關聯的輸入輸出類型
map(k1, v1) => list(k2, v2)
reduce(k2, list(v2)) => list(v2)
3、更多的例子
* 分布式grep,分布式排序,計算URL訪問頻率,轉置網絡鏈接圖等等
三、實現
* MapReduce模型有多種不同實現方式,取決于具體環境,如小型的共享內存機器、大型NUMA架構多處理器主機、大型網絡連接集群等
1、執行概括
* Map階段概括:輸入數據自動分割為M個片段集合,Map調用因此分到多臺機器上并行處理。
* Reduce階段概括:使用分區函數將Map輸出的key值分成R個分區(如hash(key) mod R),使得Reduce調用也被分到多臺機器并行處理。這里分區數R和分區函數作為一個重要指標,由用戶來指定
a、用戶調用MapReduce庫將輸入文件分為M個數據片段(即split,一般為16~64MB)
b、用戶程序有一個Master(即JobTracker ),其他都為Worker(即TaskTracker),由Master負責任務分配。
c、被分配了Map任務的Map Worker讀取數據片段,將其解析出<key, value>對傳遞給用戶map函數處理,最后輸出中間<key, value>對到內存
d、緩存的<key, value>對通過分區函數分成R個分區,周期性寫入本地磁盤,并將位置信息上傳給Master。Master再將這些存儲位置傳給Reduce Worker
e、Reduce Worker接收到信息后,使用RPC將落地的<key, value>對讀取到本地,對Key排序后聚合相同Key值的數據,然后將<key, list<value>>對傳遞給用戶reduce函數處理,最后輸出到所屬分區的輸出文件(故最終的MapReduce輸出為R個文件)
2、Master數據結構
* Master數據結構存儲信息:每個Map/Reduce任務狀態、中間文件存儲信息,Worker機器標識等
* Master類似一個數據管道:Map任務產生的R個中間文件存儲信息,通過這個管道傳遞給Reduce任務
3、Worker容災
* Master周期性Ping每個Worker維持心跳,約定時間內沒收到返回信息則將Worker標為失效
* 對于失效的Map Worker:Map任務輸出到本機,已不可訪問,故需重新執行
* 對于失效的Reduce Worker:Reduce任務輸出到全局文件系統,故不許重新執行
* MapReduce可以處理大規模Worker失效的情況,最多只需簡單重新執行失效Worker未完成的任務
4、Master容災
* 一個簡單的方法為Master周期性將上面的數據結構落地,即檢查點CheckPoint。如果Master掛了可用CheckPoint啟動另一Master
* 失效處理機制:保證用戶提供輸入確定函數時,在任何情況下(各種失效)的輸出都和沒有出現任何錯誤,且順序執行產生的輸出是一樣的。通過依賴對Map和Reduce任務的輸出是原子提交來完成這個特性
5、存儲位置
* 網絡帶寬為相對匱乏的資源,Master調度時,會盡量將Map任務調度在包含相關輸入的機器上執行(對于大型集群,大部分輸出能實現)
* 對于上述調度失敗的情況,Master會選擇相關輸入附近的機器執行
6、任務粒度
* 任務粒度M和R:我們把Map拆分為M個片段(輸入數據split決定),把Reduce拆分為R個片段(分區函數決定)
* 理想狀態下,M和R應該比Worker數要多得多,有利于Worker機器執行大量不同小任務而利于集群負載均衡,以及故障恢復
* 實際上,M和R取值收到一些限制,如Master內存容納的數據結構等
* M和R值為系統2個較關鍵調優參數,通常的比例為M=200000,R=5000,Worker=2000
7、備用任務
* 一個現象:影響MapReduce總執行時間常常是極少數“落伍任務”,出現“落伍”的原因很多難以避免。
* 備用任務為一種減少“落伍任務”出現幾率的機制:當MapReduce總執行接近結束時,多啟動一個備用任務來執行剩下的還未完成的任務。
* 添加備用任務機制的效果:多花費幾個百分點的計算資源,對于減少超大MapReduce總執行時間效果顯著(排序任務:優化44%時間)
四、技巧
1、分區函數
* 分區函數:使用分區函數對Map的輸出進行分區,即指定Map任務的輸出按key分為R個分區給Reduce任務處理
* 默認的分區函數為 hash(key) mod R,hash能產生非常平衡的分區
2、順序保證
* 保證中間<key, value>對的處理順序是按key值增量進行的(先排序),保證每個分區生成有序的輸出文件,對于后面一些應用很有幫助
3、Combiner函數
* 由于Map產生的中間<key, value>對中key值重復的比重很大(如詞數統計的例子,每個Map會產生很多的<word,1>),所以我們允許用戶定義一個可選的Combiner函數,來將Map產生的本地的中間<key, value>對先進行一次合并,再通過RPC傳遞給Reduce
* 一般情況下,Combiner和Reduce函數是一樣的,唯一的區別是MapReduce庫控制函數輸出不同,Combiner輸出為中間文件,Reduce輸出為最終結果文件
4、輸入輸出類型
* MapReduce庫支持幾種預定義好的輸入類型,如文本、Mysql等等。
* 以文本作為輸入數據為例,文件的偏移量作為key,每一行的內容作為value,就可將文本每一行視為<key,value>對傳給Map任務
* 用戶可使用Reader接口實現自己新定義的輸入類型
5、副作用
* 在某些情況下,如果Map和Reduce操作增加輔助的輸出文件會比較簡單,故我們依靠Writer接口把這種“副作用”變成原子和冪等的(冪等:總產生相同結果的數學運算)
* 實現:首先把輸出結果寫到一個臨時文件中,在完成輸出后,再調用系統級的原子操作rename臨時文件
6、跳過損壞的記錄
* 當用戶程序Bug導致在處理某些記錄時Crash掉,慣常的做法的修復Bug后再次執行。但有時候這些Bug修復是非常困難的,然而對于一個巨大的數據集來說,忽略小部分記錄是可以接受的
* 基于上述需求,我們提供一種執行模式:在這種模式下,為了保證整個處理順利進行,MapReduce庫會檢查導致Crash的記錄,并跳過這些記錄不處理
* 實現:MapReduce庫通過全局變量保存所有記錄序號,每個Worker進程設置信號處理函數來捕獲異常(內存段/總線等),當程序Bug導致異常錯誤被信號處理函數捕獲,信號處理函數會通過UDP包向Master發出最后一條記錄的序號
7、本地執行
* 由于Map和Reduce會分布到集群各個機器上執行,而且執行位置是Master動態調度的,所以調試Map和Reduce函數非常困難。
* 所以我們開發了一套MapReduce庫的本地版本,用于使用本地調試和測試工具,如gdb等
8、狀態信息
* Master使用嵌入式HTTP服務器顯示一組狀態信息Web界面,給用戶監控任務執行狀態
* 顯示的內容包括:計算執行進度、各狀態的任務數、輸入/中間/輸出的字節數、各Worker的狀態等等
9、計數器
* MapReduce庫使用計數器來統計各事件的發生次數,計數器當前的值也會顯示在Master狀態頁面給用戶監控
* 用戶創建:用戶可在程序中創建一個計數器對象,在map和reduce函數中增加計數器的值。
* 系統自動維護:MapReduce庫自己也會維護一些計數器對象,比如已處理的<key, value>對數量等
* 計數器機制對MapReduce操作的完整性檢查非常有用,比如監控中間key值集合必須等于輸出的key值集合等
五、性能
* 這里在同個集群環境做了2個性能測試,1個為特定模式匹配,1個為數據排序,這2個程序在MapReduce的應用也是非常典型的
1、集群配置
* 1800臺機器,每臺機器2個2G主頻Intel XeonCPU、4GB內存、2個160G硬盤、千兆以太網卡
* 集群部署在2層樹形交換網絡,Root節點100~200GBPS傳輸帶寬,所有機器均對等部署,任意兩點網絡來回時間小于1ms
2、GREP(測試1)
* 實驗設計:掃描10^10左右個100字節組成的記錄(1TB),查找出概論較小的3個字符模式。輸入數據拆分64MB的block(M=15000),結果輸出到一個文件中(R=1,因為結果集不大)
* 實驗結果如上圖所示,Y軸為處理速度,處理速度隨著參與MapReduce計算機器的增加而增加。整個過程用了150s,包括開始約1分鐘預啟動(程序上次到集群、GFS打開文件、Master獲取文件位置信息等),以及80s的實際計算時間
3、排序(測試2)
* 實驗設計:排序10^10左右個100字節組成的記錄(1TB),模仿YeraSort排序。輸入數據拆分64MB的block(M=15000),排序結果輸出到4000個文件中(R=4000)
* 圖的(a)欄顯示排序程序正常執行的情況,對于正常執行的程序而言:
- 上圖Input:顯示Map輸入數據讀取速度,峰值達到13GB/s
- 中圖Shuffle:顯示從Map任務的中間數據傳輸給Reduce任務的網絡IO情況
- 下圖Output:顯示Reduce最終輸出文件速度
* 有一些值得注意的現象,如Map對輸入數據的讀取速度比Reduce對輸出數據的寫入速度高很多,這是因為輸入數據本地優化策略的作用,使得絕大部分數據是從本地硬盤讀取的原因
4、高效的備用任務
* 圖的(b)欄顯示了關閉備用任務的執行情況,與(a)相比,下圖Output輸出最終文件時間上被拖了一個長長的尾巴,而且這段時間幾乎沒有寫入,卻將任務執行時間在960s后由延時了300s(44%)。
5、失效的機器
* 圖的(c)欄顯示了在程序開始幾分鐘后kill掉200(1/9)個Worker的執行情況,集群立即調度重啟新的Worker處理進程。總的執行時間為933s,只比程序正常執行的情況多用了5%的時間
六、經驗
* MapReduce庫能廣泛應用于我們日常工作遇到的各類問題,在各個領域應用廣泛,如大規模機器學習、網頁信息提取、大規模圖形計算等
* MapReduce的成功取決于快速寫出一個簡單的程序,就能在上千臺機器的集群上做大規模并發處理,極大加快了設計和開發周期;而且完全可以讓沒有分布式/并行處理開發經驗的程序員利用大量資源,開發出分布式/并行處理的應用
* MapReduce最成功的應用就是重寫了Google搜索服務所使用的Index系統(大規模索引),使用MapReduce帶來了代碼簡單小巧、高性能、操作管理更簡單的優點
來自:http://blog.csdn.net/yyyiran/article/details/12918891