Hadoop中MapReduce框架入門
MapReduce是一種分布式計算模型,由Google提出,主要用于搜索領域,解決海量數據的計算問題.對于業 界的大數據存儲及分布式處理系統來說Hadoop2提出的新MapReudce就是YARN: A framework for job scheduling and cluster resource management.
1.MapReduce的簡單概念
百度百科:MapReduce是一種編程模型,用于大規模數據集(大于1TB)的并行運算。概念"Map(映射)"和"Reduce(歸約)",和他們的主要思想,都是從函數式編程語言里借來的,還有從矢量編程語言里借來的特性。它極大地方便了編程人員在不會分布式并行編程的情況下,將自己的程序運行在分布式系統上。 當前的軟件實現是指定一個Map(映射)函數,用來把一組鍵值對映射成一組新的鍵值對,指定并發的Reduce(歸約)函數,用來保證所有映射的鍵值對中的每一個共享相同的鍵組。至于什么是函數式編程語言和矢量編程語言,自己也搞得不太清楚,見解釋鏈接:
http://www.cnblogs.com/kym/archive/2011/03/07/1976519.html.
自己的理解:MapReduce是一種分布式計算模型,由Google提出,主要用于搜索領域,解決海量數據的計算問題.當你向MapReduce 框架提交一個計算作業時,它會首先把計算作業拆分成若干個Map 任務,然后分配到不同的節點上去執行,每一個Map 任務處理輸入數據中的一部分,當Map 任務完成后,它會生成一些中間文件,這些中間文件將會作為Reduce 任務的輸入數據。Reduce 任務的主要目標就是把前面若干個Map 的輸出匯總到一起并輸出.就是說HDFS已經為我們提供了高性能、高并發的服務,但是并行編程可不是所有程序員都玩得轉的活兒,如果我們的應用本身不能并發,那Hadoop的HDFS也都是沒有意義的。MapReduce的偉大之處就在于讓不熟悉并行編程的程序員(比如像我這的)也能充分發揮分布式系統的威力。這里說明以下:Hadoop本身這個框架就是洋人基于洋人公司谷歌的三大論文GFS,BigTable,MapReduce(編程模型),用Java語言實現的框架.谷歌它就用的C++實現,而MapReduce編程模型(是高度抽象的)大體離不開下面這張圖.Spark并行運算框架(和Hadoop的MapReduce)的不同點:在于它將中間結果即map函數結果直接放入內存中,而不是放入本地磁盤的HDFS中.這些都不是重點,重點是下面圖的流程:
上圖是論文里給出的流程圖。一切都是從最上方的user program開始的,user program鏈接了MapReduce庫,實現了最基本的Map函數和Reduce函數。圖中執行的順序都用數字標記了。 1.MapReduce庫先把user program的輸入文件劃分為M份(M為用戶定義),每一份通常有16MB到64MB,如圖左方所示分成了split0~4;然后使用fork將用戶進程拷貝到集群內其它機器上。 2.user program的副本中有一個稱為master,其余稱為worker,master是負責調度的,為空閑worker分配作業(Map作業3或者Reduce作業),worker的數量也是可以由用戶指定的。 3.被分配了Map作業的worker,開始讀取對應分片的輸入數據,Map作業數量是由M決定的,和split一一對應;Map作業從輸入數據中抽取出鍵值對,每一個鍵值對都作為參數傳遞給map函數,map函數產生的中間鍵值對被緩存在內存中。 4.緩存的中間鍵值對會被定期寫入本地磁盤,而且被分為R個區,R的大小是由用戶定義的,將來每個區會對應一個Reduce作業;這些中間鍵值對的位置會被通報給master,master負責將信息轉發給Reduce worker。 5.master通知分配了Reduce作業的worker它負責的分區在什么位置(肯定不止一個地方,每個Map作業產生的中間鍵值對都可能映射到所有R個不同分區),當Reduce worker把所有它負責的中間鍵值對都讀過來后,先對它們進行排序,使得相同鍵的鍵值對聚集在一起。因為不同的鍵可能會映射到同一個分區也就是同一個Reduce作業(誰讓分區少呢),所以排序是必須的。 6.reduce worker遍歷排序后的中間鍵值對,對于每個唯一的鍵,都將鍵與關聯的值傳遞給reduce函數,reduce函數產生的輸出會添加到這個分區的輸出文件中。 7.當所有的Map和Reduce作業都完成了,master喚醒正版的user program,MapReduce函數調用返回user program的代碼 所有執行完畢后,MapReduce輸出放在了R個分區的輸出文件中(分別對應一個Reduce作業)。用戶通常并不需要合并這R個文件,而是將其作為輸入交給另一個MapReduce程序處理。整個過程中,輸入數據是來自底層分布式文件系統(GFS)的,中間數據是放在本地文件系統的,最終輸出數據是寫入底層分布式文件系統(GFS)的。而且我們要注意Map/Reduce作業和map/reduce函數的區別:Map作業處理一個輸入數據的分片,可能需要調用多次map函數來處理每個輸入鍵值對;Reduce作業處理一個分區的中間鍵值對,期間要對每個不同的鍵調用一次reduce函數,Reduce作業最終也對應一個輸出文件。
至于下面一張圖Hadoop MapReduce(彩色的)的模型實現則如下圖(當然這也不是我畫的,只是大自然的搬運工):
(input) <k1, v1> -> map -> <k2, v2> -> combine -> <k2, v2> -> reduce -> <k3, v3> (output)
參考鏈接:怎樣向妻子解釋MapReduce.
2.Hadoop1.x中的MapReduce
在Hadoop里面的MapReduce的是有存在兩個不同的時期.剛開始的Hadoop中的MapReduce實現是做到很多的事情,而該框架的核心Job Tracker(作業跟蹤者)則是既當爹又當媽的意思.看下圖:
原 MapReduce 程序的流程及設計思路: 1.首先用戶程序 (JobClient) 提交了一個 job,job 的信息會發送到 Job Tracker 中,Job Tracker 是 Map-reduce 框架的中心,他需要與集群中的機器定時通信 (heartbeat), 需要管理哪些程序應該跑在哪些機器上,需要管理所有 job 失敗、重啟等操作。 2.TaskTracker 是 Map-reduce 集群中每臺機器都有的一個部分,他做的事情主要是監視自己所在機器的資源情況。 3.TaskTracker 同時監視當前機器的 tasks 運行狀況。TaskTracker 需要把這些信息通過 heartbeat發送給JobTracker,JobTracker 會搜集這些信息以給新提交的 job 分配運行在哪些機器上。
既然出現Hadoop2改進它,那它就有一些問題咯。主要的問題如下:
1.JobTracker 是 Map-reduce 的集中處理點,存在單點故障。 2.JobTracker 完成了太多的任務,造成了過多的資源消耗,當 map-reduce job 非常多的時候,會造成很大的內存開銷,潛在來說,也增加了 JobTracker fail 的風險,這也是業界普遍總結出老 Hadoop 的 Map-Reduce 只能支持 4000 節點主機的上限。 3.在 TaskTracker 端,以 map/reduce task 的數目作為資源的表示過于簡單,沒有考慮到 cpu/ 內存的占用情況,如果兩個大內存消耗的 task 被調度到了一塊,很容易出現 OOM。 4.在 TaskTracker 端,把資源強制劃分為 map task slot 和 reduce task slot, 如果當系統中只有 map task 或者只有 reduce task 的時候,會造成資源的浪費,也就是前面提過的集群資源利用的問題。 源代碼層面分析的時候,會發現代碼非常的難讀,常常因為一個 class 做了太多的事情,代碼量達 3000 多行造成 class 的任務不清晰,增加 bug 修復和版本維護的難度。 5.從操作的角度來看,現在的 Hadoop MapReduce 框架在有任何重要的或者不重要的變化 ( 例如 bug 修復,性能提升和特性化 ) 時,都會強制進行系統級別的升級更新。更糟的是,它不管用戶的喜好,強制讓分布式集群系統的每一個用戶端同時更新。這些更新會讓用戶為了驗證他們之前的應用程序是不是適用新的 Hadoop 版本而浪費大量時間。
3.Hadoop2.x中新方案YARN+MapReduce
首先的不要被YARN給迷惑住了,它只是負責資源調度管理,而MapReduce才是負責運算的家伙,所以YARN != MapReduce2.這是大師說的:
YARN并不是下一代MapReduce(MRv2),下一代MapReduce與第一代MapReduce(MRv1)在編程接口、數據處理引擎(MapTask和ReduceTask)是完全一樣的, 可認為MRv2重用了MRv1的這些模塊,不同的是資源管理和作業管理系統,MRv1中資源管理和作業管理均是由JobTracker實現的,集兩個功能于一身,而在MRv2中,將這兩部分分開了, 其中,作業管理由ApplicationMaster實現,而資源管理由新增系統YARN完成,由于YARN具有通用性,因此YARN也可以作為其他計算框架的資源管理系統,不僅限于MapReduce,也是其他計算框架(Spark).
看上圖我們可以知道Hadoop1中mapreduce可以說是啥事都干,而Hadoop2中的MapReduce的話則是專門處理數據分析.而YARN的話則做為資源管理器存在.
有了YARN之后,官網上這么說Apache Hadoop NextGen MapReduce (YARN).它的架構圖如下:
在Hadoop2中將JobTracker兩個主要的功能分離成單獨的組件,這兩個功能是資源管理和任務調度/監控。新的資源管理器全局管理所有應用程序計算資源的分配,每一個應用的 ApplicationMaster 負責相應的調度和協調。一個應用程序無非是一個單獨的傳統的 MapReduce 任務或者是一個 DAG( 有向無環圖 ) 任務。ResourceManager 和每一臺機器的節點管理服務器能夠管理用戶在那臺機器上的進程并能對計算進行組織。 1.事實上,每一個應用的ApplicationMaster是一個詳細的框架庫,它結合從ResourceManager獲得的資源和 NodeManagr 協同工作來運行和監控任務。 2.在上圖中ResourceManager支持分層級的應用隊列,這些隊列享有集群一定比例的資源。從某種意義上講它就是一個純粹的調度器,它在執行過程中不對應用進行監控和狀態跟蹤。同樣,它也不能重啟因應用失敗或者硬件錯誤而運行失敗的任務。 ResourceManager 是基于應用程序對資源的需求進行調度的 ; 每一個應用程序需要不同類型的資源因此就需要不同的容器。資源包括:內存,CPU,磁盤,網絡等等。可以看出,這同現 Mapreduce 固定類型的資源使用模型有顯著區別,它給集群的使用帶來負面的影響。資源管理器提供一個調度策略的插件,它負責將集群資源分配給多個隊列和應用程序。調度插件可以基于現有的能力調度和公平調度模型。 3.在上圖中 NodeManager 是每一臺機器框架的代理,是執行應用程序的容器,監控應用程序的資源使用情況 (CPU,內存,硬盤,網絡 ) 并且向調度器匯報。 4.在上圖中,每一個應用的 ApplicationMaster的職責有:向調度器索要適當的資源容器,運行任務,跟蹤應用程序的狀態和監控它們的進程,處理任務的失敗原因。
再次總結,在Hadoop2集群里,一個客戶端提交任務的一整套的流程圖:
1.客戶端的mapreduce程序通過hadoop shell提交到hadoop的集群中. 2.程序會通過RPC通信將打成jar包的程序的有關信息傳遞給Hadoop集群中RM(ResourceManager),可稱為領取JOBID的過程 3.RM更加提交上來的信息給任務分配一個唯一的ID,同時會將run.jar的在HDFS上的存儲路徑發送給客戶端. 4.客戶端得到那個存儲路徑之后,會相應的拼接出最終的存放路徑目錄,然后將run.jar分多份存儲在HDFS目錄中,默認情況下備份數量為10份.可配置. 5.客戶端提交一些配置信息,例如:最終存儲路徑,JOB ID等. 6.RM會將這些配置信息放入一個隊列當中,所謂的調度器.至于調度的算法,則不必深究. 7.NM(NodeManager)和RM是通過心跳機制保持著通信的,NM會定期的向RM去領取任務. 8.RM會在任意的一臺或多臺的NM中,啟動任務監控的進程Application Master.用來監控其他NM中YARN CHild的執行的情況 9.NM在領取到任務之后,得到信息,會去HDFS的下載run.jar.然后在本地的機器上啟動YARN Child進程來執行map或者reduce函數.map函數的處理之后的中間結果數據會放在本地文件系統中的. 10.在結束程序之后,將結果數據寫會HDFS中.整個流程大概就是這樣子的.
4.YARN出現的意義----引用
隨著 YARN 的出現,您不再受到更簡單的 MapReduce 開發模式約束,而是可以創建更復雜的分布式應用程序。實際上,您可以將 MapReduce 模型視為 YARN 架構可運行的一些應用程序中的其中一個,只是為自定義開發公開了基礎框架的更多功能。這種能力非常強大,因為 YARN 的使用模型幾乎沒有限制,不再需要與一個集群上可能存在的其他更復雜的分布式應用程序框架相隔離,就像 MRv1 一樣。甚至可以說,隨著 YARN 變得更加健全,它有能力取代其他一些分布式處理框架,從而完全消除了專用于其他框架的資源開銷,同時還簡化了整個系統。
為了演示 YARN 相對于 MRv1 的效率提升,可考慮蠻力測試舊版本的 LAN Manager Hash 的并行問題,這是舊版 Windows? 用于密碼散列運算的典型方法。在此場景中,MapReduce 方法沒有多大意義,因為 Mapping/Reducing 階段涉及到太多開銷。相反,更合理的方法是抽象化作業分配,以便每個容器擁有密碼搜索空間的一部分,在其之上進行枚舉,并通知您是否找到了正確的密碼。這里的重點是,密碼將通過一個函數 來動態確定(這確實有點棘手),而不需要將所有可能性映射到一個數據結構中,這就使得 MapReduce 風格顯得不必要且不實用。
歸結而言,MRv1 框架下的問題僅是需要一個關聯數組,而且這些問題有專門朝大數據操作方向演變的傾向。但是,問題一定不會永遠僅局限于此范式中,因為您現在可以更為簡單地將它們抽象化,編寫自定義客戶端、應用程序主程序,以及符合任何您想要的設計的應用程序。
5.編寫簡單MapReduce Yarn的應用程序
我們直接拿Apache Hadoop官網中的wordcount的例子來說明MapReduce程序的編寫.
Source Code
import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WordCount { //編寫自己的Mapper,需要繼承org.apache.hadoop.mapreduce.Mapper public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ //輸入的<Key,Value>的類型,輸出的<Key,Value> //作為類中成員變量 private final static IntWritable one = new IntWritable(1); private Text word = new Text(); //key : offset 偏移量,幾乎可以忽略 //value : one line string 一行的數據 //context : the context of computer 計算的上下文 public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } //編寫自己的Reducer,需要繼承org.apache.hadoop.mapreduce.Reducer public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } //主函數開始運行JOB public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); //提交JOB成功,退出JVM虛擬機 } }
6.Hadoop2.0中提交Job的源碼分析
----------------------------至此,與服務器RM的通信已建立.---------
-----------------------接下來的話,就是提交job任務了.
------------------------到此為止,JOB提交任務結束--------------------------------------------------
7.備注(PS)
開發使用MapReduce程序,代碼本身是沒有難度的。因為都被Hadoop本身的框架封裝好了.我們要做的只是使用它的相關API來完成我們的實際的業務需求.但是MapReduce本身的程序是有很多的擴展的,包括(Partitioner編程,自定義排序編程,Combiner編程,常見的MapReduce算法)。在實際開發中,由于是分布式的環境。也會造成我們開發調試的難度也會增加。有很多的細節和知識點需要了解.很雜很亂的感覺。如何在window下eclipse遠程Debug程序.都是開發中常用到的。所以MapReduce知識的才剛開始.
1.入門包括學習最好的是去Hadoop官網上(英文是只攔路虎):http://hadoop.apache.org/
2.推薦的書籍:http://pan.baidu.com/s/1bn4IjCj
end..............................
來自:http://my.oschina.net/codeWatching/blog/345374