五種基于 MapReduce 的并行計算框架介紹及性能測試
并行計算模型和框架
目前開源社區有許多并行計算模型和框架可供選擇,按照實現方式、運行機制、依附的產品生態圈等可以被劃分為幾個類型,每個類型各有優缺點,如果能夠對各類型的并行計算框架都進行深入研究及適當的缺點修復,就可以為不同硬件環境下的海量數據分析需求提供不同的軟件層面的解決方案。
- 并行計算框架
并行計算或稱平行計算是相對于串行計算來說的。它是一種一次可執行多個指令的算法,目的是提高計算速度,以及通過擴大問題求解規模,解決大型而復雜的計算問題。所謂并行計算可分為時間上的并行和空間上的并行。時間上的并行就是指流水線技術,而空間上的并行則是指用多個處理器并發的執行計算。并行計算(Parallel Computing)是指同時使用多種計算資源解決計算問題的過程,是提高計算機系統計算速度和處理能力的一種有效手段。它的基本思想是用多個處理器來協同求解同一問題,即將被求解的問題分解成若干個部分,各部分均由一個獨立的處理機來并行計算。并行計算系統既可以是專門設計的、含有多個處理器的超級計算機,也可以是以某種方式互連的若干臺的獨立計算機構成的集群。通過并行計算集群完成數據的處理,再將處理的結果返回給用戶。
- 國內外研究
歐美發達國家對于并行計算技術的研究要遠遠早于我國,從最初的并行計算逐漸過渡到網格計算,隨著 Internet 網絡資源的迅速膨脹,因特網容納了海量的各種類型的數據和信息。海量數據的處理對服務器 CPU、IO 的吞吐都是嚴峻的考驗,不論是處理速度、存儲空間、容錯性,還是在訪問速度等方面,傳統的技術架構和僅靠單臺計算機基于串行的方式越來越不適應當前海量數據處理的要求。國內外學者提出很多海量數據處理方法,以改善海量數據處理存在的諸多問題。
目前已有的海量數據處理方法在概念上較容易理解,然而由于數據量巨大,要在可接受的時間內完成相應的處理,只有將這些計算進行并行化處理,通過提取出處理過程中存在的可并行工作的分量,用分布式模型來實現這些并行分量的并行執行過程。隨著技術的發展,單機的性能有了突飛猛進的發展變化,尤其是內存和處理器等硬件技術,但是硬件技術的發展在理論上總是有限度的,如果說硬件的發展在縱向上提高了系統的性能,那么并行技術的發展就是從橫向上拓展了處理的方式。
2003 年美國 Google 公司對外發布了 MapReduce、GFS、BigData 三篇論文,至此正式將并行計算框架落地為 MapReduce 框架。
我國的并行和分布式計算技術研究起源于 60 年代末,按照國防科技大學周興銘院士提出的觀點,到目前為止已經三個階段了。第一階段,自 60 年代末至 70 年代末,主要從事大型機內的并行處理技術研究;第二階段,自 70 年代末至 90 年代初,主要從事向量機和并行多處理器系統研究;第三階段,自 80 年代末至今,主要從事 MPP(Massively Parallel Processor) 系統研究。
盡管我國在并行計算方面開展的研究和應用較早,目前也擁有很多的并行計算資源,但研究和應用的成效相對美國還存在較大的差距,有待進一步的提高和發展。
回頁首
MapReduce
MapReduce 是由谷歌推出的一個編程模型,是一個能處理和生成超大數據集的算法模型,該架構能夠在大量普通配置的計算機上實現并行化處理。MapReduce 編程模型結合用戶實現的 Map 和 Reduce 函數。用戶自定義的 Map 函數處理一個輸入的基于 key/value pair 的集合,輸出中間基于 key/value pair 的集合,MapReduce 庫把中間所有具有相同 key 值的 value 值集合在一起后傳遞給 Reduce 函數,用戶自定義的 Reduce 函數合并所有具有相同 key 值的 value 值,形成一個較小 value 值的集合。一般地,一個典型的 MapReduce 程序的執行流程如圖 1 所示。
圖 1 .MapReduce 程序執行流程圖

MapReduce 執行過程主要包括:
- 將輸入的海量數據切片分給不同的機器處理;
- 執行 Map 任務的 Worker 將輸入數據解析成 key/value pair,用戶定義的 Map 函數把輸入的 key/value pair 轉成中間形式的 key/value pair;
- 按照 key 值對中間形式的 key/value 進行排序、聚合;
- 把不同的 key 值和相應的 value 集分配給不同的機器,完成 Reduce 運算;
- 輸出 Reduce 結果。
任務成功完成后,MapReduce 的輸出存放在 R 個輸出文件中,一般情況下,這 R 個輸出文件不需要合并成一個文件,而是作為另外一個 MapReduce 的輸入,或者在另一個可處理多個分割文件的分布式應用中使用。
受 Google MapReduce 啟發,許多研究者在不同的實驗平臺上實現了 MapReduce 框架,本文將對 Apache Hadoop MapReduce、Apache、Spark、斯坦福大學的 Phoenix,Nokia 研發的 Disco,以及香港科技大學的 Mars 等 5 個 MapReduce 實現框架進行逐一介紹和各方面對比。
- Hadoop MapReduce
Hadoop 的設計思路來源于 Google 的 GFS 和 MapReduce。它是一個開源軟件框架,通過在集群計算機中使用簡單的編程模型,可編寫和運行分布式應用程序處理大規模數據。Hadoop 包含三個子項目:Hadoop Common、Hadoop Distributed File System(HDFS) 和 Hadoop MapReduce。
第一代 Hadoop MapReduce 是一個在計算機集群上分布式處理海量數據集的軟件框架,包括一個 JobTracker 和一定數量的 TaskTracker。運行流程圖如圖 2 所示。
圖 2 .Hadoop MapReduce 系統架構圖

在最上層有 4 個獨立的實體,即客戶端、JobTracker、TaskTracker 和分布式文件系統。客戶端提交 MapReduce 作業;JobTracker 協調作業的運行;JobTracker 是一個 Java 應用程序,它的主類是 JobTracker;TaskTracker 運行作業劃分后的任務,TaskTracker 也是一個 Java 應用程序,它的主類是 TaskTracker。Hadoop 運行 MapReduce 作業的步驟主要包括提交作業、初始化作業、分配任務、執行任務、更新進度和狀態、完成作業等 6 個步驟。
- Spark MapReduce
Spark 是一個基于內存計算的開源的集群計算系統,目的是讓數據分析更加快速。Spark 非常小巧玲瓏,由加州伯克利大學 AMP 實驗室的 Matei 為主的小團隊所開發。使用的語言是 Scala,項目的核心部分的代碼只有 63 個 Scala 文件,非常短小精悍。Spark 啟用了內存分布數據集,除了能夠提供交互式查詢外,它還可以優化迭代工作負載。Spark 提供了基于內存的計算集群,在分析數據時將數據導入內存以實現快速查詢,“速度比”基于磁盤的系統,如比 Hadoop 快很多。Spark 最初是為了處理迭代算法,如機器學習、圖挖掘算法等,以及交互式數據挖掘算法而開發的。在這兩種場景下,Spark 的運行速度可以達到 Hadoop 的幾百倍。
- Disco
Disco 是由 Nokia 研究中心開發的,基于 MapReduce 的分布式數據處理框架,核心部分由 Erlang 語言開發,外部編程接口為 Python 語言。Disco 是一個開放源代碼的大規模數據分析平臺,支持大數據集的并行計算,能運行在不可靠的集群計算機上。Disco 可部署在集群和多核計算機上,還可部署在 Amazon EC2 上。Disco 基于主/從架構 (Master/Slave),圖 3 總體設計架構圖展示了通過一臺主節點 (Master) 服務器控制多臺從節點 (Slave) 服務器的總體設計架構。
圖 3 .Disco 總體架構圖

Disco 運行 MapReduce 步驟如下:
- Disco 用戶使用 Python 腳本開始 Disco 作業;
- 作業請求通過 HTTP 發送到主機;
- 主機是一個 Erlang 進程,通過 HTTP 接收作業請求;
- 主機通過 SSH 啟動每個節點處的從機;
- 從機在 Worker 進程中運行 Disco 任務。
- Phoenix
Phoenix 作為斯坦福大學 EE382a 課程的一類項目,由斯坦福大學計算機系統實驗室開發。Phoenix 對 MapReduce 的實現原則和最初由 Google 實現的 MapReduce 基本相同。不同的是,它在集群中以實現共享內存系統為目的,共享內存能最小化由任務派生和數據間的通信所造成的間接成本。Phoenix 可編程多核芯片或共享內存多核處理器 (SMPs 和 ccNUMAs),用于數據密集型任務處理。
- Mars
Mars 是香港科技大學與微軟、新浪合作開發的基于 GPU 的 MapReduce 框架。目前已經包含字符串匹配、矩陣乘法、倒排索引、字詞統計、網頁訪問排名、網頁訪問計數、相似性評估和 K 均值等 8 項應用,能夠在 32 位與 64 位的 Linux 平臺上運行。Mars 框架實現方式和基于 CPU 的 MapReduce 框架非常類似,也由 Map 和 Reduce 兩個階段組成,它的基本工作流程圖如圖 4 所示。
圖 4 .Mars 基本工作流程圖

在開始每個階段之前,Mars 初始化線程配置,包括 GPU 上線程組的數量和每個線程組中線程的數量。Mars 在 GPU 內使用大量的線程,在運行時階段會均勻分配任務給線程,每個線程負責一個 Map 或 Reduce 任務,以小數量的 key/value 對作為輸入,并通過一種無鎖的方案來管理 MapReduce 框架中的并發寫入。
Mars 的工作流程主要有 7 個操作步驟:
- 在主存儲器中輸入 key/value 對,并將它們存儲到數組;
- 初始化運行時的配置參數;
- 復制主存儲器中的輸入數組到 GPU 設備內存;
- 啟動 GPU 上的 Map 階段,并將中間的 key/value 對存儲到數組;
- 如果 mSort 選擇 F,即需要排序階段,則對中間結果進行排序;
- 如果 noReduce 是 F,即需要 Reduce 階段,則啟動 GPU 上的 Reduce 階段,并輸出最終結果,否則中間結果就是最終結果;
- 復制 GPU 設備存儲器中的結果到主存儲器。
上述步驟的 1,2,3,7 這四個步驟的操作由調度器來完成,調度器負責準備數據輸入,在 GPU 上調用 Map 和 Reduce 階段,并將結果返回給用戶。
五種框架的優缺點比較
表 1. 五種框架優缺點比較
Hadoop MapReduce | Spark | Phoenix | Disco | Mars | |
---|---|---|---|---|---|
編程語言 | Java 為主 | Scala | C | Erlang | C++ |
構建平臺 | 需要首先架構基于 Hadoop 的集群系統,通過 HDFS 完成運算的數據存儲工作 | 可以的單獨運行,也可以與 Hadoop 框架完整結合 | 獨立運行,不需要提前部署集群,運行時系統的實現是建立在 PThread 之上的,也可方便地移植到其他共享內存線程庫上 | 整個 Disco 平臺由分布式存儲系統 DDFS 和 MapReduce 框架組成,DDFS 與計算框架高度耦合,通過監控各個節點上的磁盤使用情況進行負載均衡 | 運行時為 Map 或 Reduce 任務初始化大量的 GPU 線程,并為每個線程自動分配少量的 key/value 對來運行任務 |
功能特點 | 計算能力非常強,適合超大數據集的應用程序,但是由于系統開銷等原因,處理小規模數據的速度不一定比串行程序快,并且本身集群的穩定性不高 | 在保證容錯的前提下,用內存來承載工作集,內存的存取速度快于磁盤多個數量級,從而可以極大提升性能 | 利用共享內存緩沖區實現通信,從而避免了因數據復制產生的開銷,但 Phoenix 也存在不能自動執行迭代計算、沒有高效的錯誤發現機制等不足 | 由一個 Master 服務器和一系列 Worker 節點組成,Master 和 Worker 之間采用基于輪詢的通信機制,通過 HTTP 的方式傳輸數據。輪詢的時間間隔不好確定,若時間間隔設置不當,會顯著降低程序的執行性能 | 由于 GPU 線程不支持運行時動態調度,所以給每個 GPU 線程分配的任務是固定的,若輸入數據劃分布均勻,將導致 Map 或 Reduce 階段的負載不均衡,使得整個系統性能急劇降低。同時由于 GPU 不支持運行時在設備內存中分配空間,需要預先在設備內存中分配好輸入數據和輸出數據的存放空間,但是 Map 和 Reduce 階段輸出數據大小是未知的,并且當多個 GPU 線程同時向共享輸出區域中寫數據時,易造成寫沖突 |
回頁首
WordCount 實驗
- 基本原理
單詞計數 (WordCount) 是最簡單也是最能體現 MapReduce 思想的程序之一,可以稱為 MapReduce 版"Hello World"。單詞計數主要完成功能是:統計一系列文本文件中每個單詞出現的次數。
- 本次實驗步驟
本次實驗的硬件資源基于 x86 服務器 1 臺,配置為內存 32GB DDR3、E5 CPU/12 核、GPU,實驗數據樣本為 10M/50M/100M/500M/1000M 的文本文件五個,我們使用 Hadoop MapReduce、Spark、Phoenix、Disco、Mars 等 MapReduce 框架分別運行文本分析程序,基于結果一致的前提下統計出運行時間、運行時 CPU 占有率、運行時內存占有率等數據,并采用這些數據繪制成柱狀圖。
Hadoop MapReduce
首先需要將文件拆分成 splits,由于測試用的文件較小,所以每個文件為一個 split,并將文件按行分割形成<key,value>對,圖 12 分割過程圖所示。這一步由 MapReduce 框架自動完成,其中偏移量(即 key 值)包括了回車所占的字符數(Windows 和 Linux 環境會不同)。
圖 5 . 分割過程圖

將分割好的<key,value>對交給用戶定義的 map 方法進行處理,生成新的<key,value>對,圖 6 執行 map 方法所示。
圖 6 . 執行 Map 方法過程圖

得到 map 方法輸出的<key,value>對后,Mapper 會將它們按照 key 值進行排序,并執行 Combine 過程,將 key 相同的 value 值累加,得到 Mapper 的最終輸出結果。圖 7Map 端排序及 Combine 過程所示。
圖 7 . Map 端排序及 Combine 過程

Reducer 先對從 Mapper 接收的數據進行排序,再交由用戶自定義的 reduce 方法進行處理,得到新的<key,value>對,并作為 WordCount 的輸出結果,圖 15Reduce 端排序及輸出結果所示。
圖 8 . Reduce 端排序及輸出結果流程圖

清單 1 . 第一代 Hadoop MapReduce WordCount 示例代碼
import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class WordCount { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); // 開始 Map 過程 public void map(Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); //遍歷 Map 里面的字符串 while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); //開始 Reduce 過程 public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: wordcount <in> <out>"); System.exit(2); } Job job = new Job(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
Spark WordCount 實驗
Spark 與 Hadoop MapReduce 的最大區別是它把所有數據保存在內存中,Hadoop MapReduce 需要從外部存儲介質里把數據讀入到內存,Spark 不需要這一步驟。它的實現原理與 Hadoop MapReduce 沒有太大區別,這里不再重復原理,完整的運行代碼如清單 2 所示。
清單 2 . Spark WordCount 示例代碼
SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount"); JavaSparkContext ctx = new JavaSparkContext(sparkConf); JavaRDD<String> lines = ctx.textFile(args[0], Integer.parseInt(args[1])); JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() { @Override public Iterable<String> call(String s) { return Arrays.asList(SPACE.split(s)); } }); //定義 RDD ones JavaPairRDD<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) { return new Tuple2<String, Integer>(s, 1); } }); //ones.reduceByKey(func, numPartitions) JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer i1, Integer i2) { return i1 + i2; } },10); //輸出 List List<Tuple2<String, Integer>> output = counts.collect(); Collections.sort(output, new Comparator<Tuple2<String, Integer>>() { @Override public int compare(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) { if(t1._2 > t2._2) { return -1; } else if(t1._2 < t2._2) { return 1; } return 0; } });
Disco WordCount 實驗
MapReduce 框架由于 Disco 有分布式文件系統存在,所以一般情況下都不會單獨使用,都是從分布式文件系統內取數據后讀入內存,然后再切分數據、進入 MapReduce 階段。首先需要調用 ddfs 的 chunk 命令把文件上傳到 DDFS,然后開始編寫 MapReduce 程序,Disco 外層應用程序采用 Python 編寫。Map 程序實例如清單 3 所示,Reduce 程序示例如清單 4 所示。
清單 3 . Map 程序段
def fun_map(line, params): for word in line.split(): yield word, 1
清單 4 . Reduce 程序段
def fun_reduce(iter, params): from disco.util import kvgroup for word, counts in kvgroup(sorted(iter)): yield word, sum(counts)
清單 5 . Map/Reduce 任務
from disco.core import Job, result_iterator def map(line, params): for word in line.split(): yield word, 1 def reduce(iter, params): from disco.util import kvgroup for word, counts in kvgroup(sorted(iter)): yield word, sum(counts) if __name__ == '__main__': job = Job().run(input=["http://discoproject.org/media/text/chekhov.txt"], map=map, reduce=reduce) for word, count in result_iterator(job.wait(show=True)): print(word, count) Note
Phoenix WordCount 實驗
Phoenix 是基于 CPU 的 MapReduce 框架,所以它也是采用將數據分割后讀入內存,然后開始 MapReduce 處理階段這樣的傳統方式。Phoenix 并不由用戶決定切分每個 Map 分配到的數據塊的大小,它是根據集群系統的實際 Cache 大小來切分的,這樣可以避免出現分配到 Map 的數據塊過大或者過小的情況出現。過大的數據快會導致 Map 執行較慢,過小的數據快會導致 Map 資源浪費,因為每次啟動 Map 線程都需要消耗一定的系統資源。Map 階段切分好的文本被多個 Map 并行執行,Phoenix 支持 100 個左右的 Map 并行執行,一個工作節點下可以有若干個 Map 并行執行。只有當一個工作節點上所有的 Map 任務都結束后才開始 Reduce 階段。Reduce 階段繼續沿用了動態任務調度機制,同時允許用戶自定義數據分區規則。
清單 6 . Phoenix 的 wordCount 程序段
#include <stdio.h> #include <strings.h> #include <string.h> #include <stddef.h> #include <stdlib.h> #include <unistd.h> #include <assert.h> #include <sys/mman.h> #include <sys/stat.h> #include <sys/time.h> #include <fcntl.h> #include <ctype.h> #include <inttypes.h> #include "map_reduce.h" #include "stddefines.h" #include "sort.h" #define DEFAULT_DISP_NUM 10 typedef struct { int fpos; off_t flen; char *fdata; int unit_size; } wc_data_t; enum { IN_WORD, NOT_IN_WORD }; struct timeval begin, end; #ifdef TIMING unsigned int library_time = 0; #endif /** mystrcmp() * Comparison function to compare 2 words */ int mystrcmp(const void *s1, const void *s2) { return strcmp((const char *)s1, (const char *) s2); } /** mykeyvalcmp() * Comparison function to compare 2 ints */ int mykeyvalcmp(const void *v1, const void *v2) { keyval_t* kv1 = (keyval_t*)v1; keyval_t* kv2 = (keyval_t*)v2; intptr_t *i1 = kv1->val; intptr_t *i2 = kv2->val; if (i1 < i2) return 1; else if (i1 > i2) return -1; else { return strcmp((char *)kv1->key, (char *)kv2->key); //return 0; } } /** wordcount_分割器 () * 內存里面進行 Map 計算 */ int wordcount_splitter(void *data_in, int req_units, map_args_t *out) { wc_data_t * data = (wc_data_t *)data_in; assert(data_in); assert(out); assert(data->flen >= 0); assert(data->fdata); assert(req_units); assert(data->fpos >= 0); // End of file reached, return FALSE for no more data if (data->fpos >= data->flen) return 0; // Set the start of the next data out->data = (void *)&data->fdata[data->fpos]; // Determine the nominal length out->length = req_units * data->unit_size; if (data->fpos + out->length > data->flen) out->length = data->flen - data->fpos; // Set the length to end at a space for (data->fpos += (long)out->length; data->fpos < data->flen && data->fdata[data->fpos] != ' ' && data->fdata[data->fpos] != '\t' && data->fdata[data->fpos] != '\r' && data->fdata[data->fpos] != '\n'; data->fpos++, out->length++); return 1; } /** wordcount_locator() * Return the memory address where this map task would heavily access. */ void *wordcount_locator (map_args_t *task) { assert (task); return task->data; } /** wordcount_map() * 對文本進行計數 */ void wordcount_map(map_args_t *args) { char *curr_start, curr_ltr; int state = NOT_IN_WORD; int i; assert(args); char *data = (char *)args->data; assert(data); curr_start = data; for (i = 0; i < args->length; i++) { curr_ltr = toupper(data[i]); switch (state) { case IN_WORD: data[i] = curr_ltr; if ((curr_ltr < 'A' || curr_ltr > 'Z') && curr_ltr != '\'') { data[i] = 0; emit_intermediate(curr_start, (void *)1, &data[i] - curr_start + 1); state = NOT_IN_WORD; } break; default: case NOT_IN_WORD: if (curr_ltr >= 'A' && curr_ltr <= 'Z') { curr_start = &data[i]; data[i] = curr_ltr; state = IN_WORD; } break; } } // Add the last word if (state == IN_WORD) { data[args->length] = 0; emit_intermediate(curr_start, (void *)1, &data[i] - curr_start + 1); } } /** wordcount_reduce() * 計算字符 */ void wordcount_reduce(void *key_in, iterator_t *itr) { char *key = (char *)key_in; void *val; intptr_t sum = 0; assert(key); assert(itr); while (iter_next (itr, &val)) { sum += (intptr_t)val; } emit(key, (void *)sum); } void *wordcount_combiner (iterator_t *itr) { void *val; intptr_t sum = 0; assert(itr); while (iter_next (itr, &val)) { sum += (intptr_t)val; } return (void *)sum; } int main(int argc, char *argv[]) { final_data_t wc_vals; int i; int fd; char * fdata; int disp_num; struct stat finfo; char * fname, * disp_num_str; struct timeval starttime,endtime; get_time (&begin); // 確保文件名 if (argv[1] == NULL) { printf("USAGE: %s <filename> [Top # of results to display]\n", argv[0]); exit(1); } fname = argv[1]; disp_num_str = argv[2]; printf("Wordcount: Running...\n"); // 讀取文件 CHECK_ERROR((fd = open(fname, O_RDONLY)) < 0); // Get the file info (for file length) CHECK_ERROR(fstat(fd, &finfo) < 0); #ifndef NO_MMAP // 內存里面開始調用 map CHECK_ERROR((fdata = mmap(0, finfo.st_size + 1, PROT_READ | PROT_WRITE, MAP_PRIVATE, fd, 0)) == NULL); #else int ret; fdata = (char *)malloc (finfo.st_size); CHECK_ERROR (fdata == NULL); ret = read (fd, fdata, finfo.st_size); CHECK_ERROR (ret != finfo.st_size); #endif CHECK_ERROR((disp_num = (disp_num_str == NULL) ? DEFAULT_DISP_NUM : atoi(disp_num_str)) <= 0); wc_data_t wc_data; wc_data.unit_size = 5; // approx 5 bytes per word wc_data.fpos = 0; wc_data.flen = finfo.st_size; wc_data.fdata = fdata; CHECK_ERROR (map_reduce_init ()); map_reduce_args_t map_reduce_args; memset(?_reduce_args, 0, sizeof(map_reduce_args_t)); map_reduce_args.task_data = &wc_data; map_reduce_args.map = wordcount_map; map_reduce_args.reduce = wordcount_reduce; map_reduce_args.combiner = wordcount_combiner; map_reduce_args.splitter = wordcount_splitter; map_reduce_args.locator = wordcount_locator; map_reduce_args.key_cmp = mystrcmp; map_reduce_args.unit_size = wc_data.unit_size; map_reduce_args.partition = NULL; // use default map_reduce_args.result = &wc_vals; map_reduce_args.data_size = finfo.st_size; map_reduce_args.L1_cache_size = atoi(GETENV("MR_L1CACHESIZE"));//1024 * 1024 * 2; map_reduce_args.num_map_threads = atoi(GETENV("MR_NUMTHREADS"));//8; map_reduce_args.num_reduce_threads = atoi(GETENV("MR_NUMTHREADS"));//16; map_reduce_args.num_merge_threads = atoi(GETENV("MR_NUMTHREADS"));//8; map_reduce_args.num_procs = atoi(GETENV("MR_NUMPROCS"));//16; map_reduce_args.key_match_factor = (float)atof(GETENV("MR_KEYMATCHFACTOR"));//2; printf("Wordcount: Calling MapReduce Scheduler Wordcount\n"); gettimeofday(&starttime,0); get_time (&end); #ifdef TIMING fprintf (stderr, "initialize: %u\n", time_diff (&end, &begin)); #endif get_time (&begin); CHECK_ERROR(map_reduce (?_reduce_args) < 0); get_time (&end); #ifdef TIMING library_time += time_diff (&end, &begin); #endif get_time (&begin); gettimeofday(&endtime,0); printf("Wordcount: Completed %ld\n",(endtime.tv_sec - starttime.tv_sec)); printf("Wordcount: MapReduce Completed\n"); printf("Wordcount: Calling MapReduce Scheduler Sort\n"); mapreduce_sort(wc_vals.data, wc_vals.length, sizeof(keyval_t), mykeyvalcmp); CHECK_ERROR (map_reduce_finalize ()); printf("Wordcount: MapReduce Completed\n"); dprintf("\nWordcount: Results (TOP %d):\n", disp_num); for (i = 0; i < disp_num && i < wc_vals.length; i++) { keyval_t * curr = &((keyval_t *)wc_vals.data)[i]; dprintf("%15s - %" PRIdPTR "\n", (char *)curr->key, (intptr_t)curr->val); } free(wc_vals.data); #ifndef NO_MMAP CHECK_ERROR(munmap(fdata, finfo.st_size + 1) < 0); #else free (fdata); #endif CHECK_ERROR(close(fd) < 0); get_time (&end); #ifdef TIMING fprintf (stderr, "finalize: %u\n", time_diff (&end, &begin)); #endif return 0; }
Mars MapReduce
Mars 框架中,Map 和 Reduce 的處理階段都在 GPU 內進行,Map 和 Reduce 的分割數據階段都在 CPU 內進行,這是與其他基于 CPU 的 MapReduce 框架的最大不同。Mars 更多的是利用 CPU、GPU 緩存來替代內存,執行數據分割、處理過程。
具體的 Word count 的流程如下所示:
- 準備 key/value 鍵值對,將這些鍵值對存儲在數組里面;
- 初始化 MapReduce 上下文,設置參數 (根據不同的 GPU 需要根據 CUDA 核心數目設置并發線程數);
- 數據預處理,首先打開文件,將文件所有內容讀入內存,然后申請一塊同文件大小的顯存,將文件內容中小寫字符轉為大寫 (這樣的影響 word,Word 算通一個單詞)。
- 開始 MapReduce 階段。根據并發線程數和文件大寫切換內存中的文件,每塊切分后的任務記錄下該任務在內存中的偏移位置和長度視為 value, 顯存的指針地址視為 key,將任務添加的任務池。將處理后的內存內容復制到剛剛申請的顯存中。接著開始 Map 流程,將內存中的任務池復制到顯存,申請顯存塊用于存放 Map 產生的數據,開啟多線程并發執行用戶定義的 map 流程 MAP_COUNT_FUNC,這個是 Mars 由于 GPU 程序的特殊性而設計的,用于記錄 map 產生的 key 和 value 的長度 (sizeof)。調用 MAP_FUNC 方法,輸入任務記錄,輸出單詞以及單詞所在的位置;
- 如果 noSort 是 F,對結果排序;
- 如果 noReduce 是 F,GPU 開始 reduce 階段,生成最終的結果集。否則,立即輸出最后的結果集;
- 結果輸出,從 GPU 設備拷貝最終的結果集到內存,然后輸出到屏幕。
通過上述的 7 個步驟,WordCount 的計算過程全部完成并且輸出結果集。
清單 7 . Mars 的 Map 程序段
#ifndef __MAP_CU__ #define __MAP_CU__ #include "MarsInc.h" #include "global.h" __device__ int hash_func(char* str, int len) { int hash, i; for (i = 0, hash=len; i < len; i++) hash = (hash<<4)^(hash>>28)^str[i]; return hash; } __device__ void MAP_COUNT_FUNC//(void *key, void *val, size_t keySize, size_t valSize) { WC_KEY_T* pKey = (WC_KEY_T*)key; WC_VAL_T* pVal = (WC_VAL_T*)val; char* ptrBuf = pKey->file + pVal->line_offset; int line_size = pVal->line_size; char* p = ptrBuf; int lsize = 0; int wsize = 0; char* start = ptrBuf; while(1) { for (; *p >= 'A' && *p <= 'Z'; p++, lsize++); *p = '\0'; ++p; ++lsize; wsize = (int)(p - start); if (wsize > 6) { //printf("%s, wsize:%d\n", start, wsize); EMIT_INTER_COUNT_FUNC(wsize, sizeof(int)); } for (; (lsize < line_size) && (*p < 'A' || *p > 'Z'); p++, lsize++); if (lsize >= line_size) break; start = p; } } __device__ void MAP_FUNC//(void *key, void val, size_t keySize, size_t valSize) { WC_KEY_T* pKey = (WC_KEY_T*)key; WC_VAL_T* pVal = (WC_VAL_T*)val; char* filebuf = pKey->file; char* ptrBuf = filebuf + pVal->line_offset; int line_size = pVal->line_size; char* p = ptrBuf; char* start = ptrBuf; int lsize = 0; int wsize = 0; while(1) { for (; *p >= 'A' && *p <= 'Z'; p++, lsize++); *p = '\0'; ++p; ++lsize; wsize = (int)(p - start); int* o_val = (int*)GET_OUTPUT_BUF(0); *o_val = wsize; if (wsize > 6) { //printf("%s, %d\n", start, wsize); EMIT_INTERMEDIATE_FUNC(start, o_val, wsize, sizeof(int)); } for (; (lsize < line_size) && (*p < 'A' || *p > 'Z'); p++, lsize++); if (lsize >= line_size) break; start = p; } } #endif //__MAP_CU__
清單 8 . Mars 的 Reduce 程序段
#ifndef __REDUCE_CU__ #define __REDUCE_CU__ #include "MarsInc.h" __device__ void REDUCE_COUNT_FUNC//(void* key, void* vals, size_t keySize, size_t valCount) { } __device__ void REDUCE_FUNC//(void* key, void* vals, size_t keySize, size_t valCount) { } #endif //__REDUCE_CU__
回頁首
五種框架 WordCount 實驗性能對比
圖 9 . 實驗運行時間比較圖

圖 9 實驗運行時間比較圖是分析不同大小的文本文件所消耗的時間對比圖。從上圖可以看出,Hadoop MapReduce 的運行時間最長,原因是 Hadoop 生態環境包含內容過多,所以每次任務啟動時首先需要加載所需資源包,然后緩慢地發起任務,并且由于本身是用性能較差的 Java 語言編寫的,所以導致整體計算時間長、性能差。Phoenix 由于采用匯編和 C 語言編寫,內核很小,運行時所用資源很少,所以整個測試過程耗時也較少。Spark 框架在 WordCount 實驗中消耗的時長較 Disco 稍少,但是比 Phoenix、Mars 耗時太多。耗時最短的兩個框架是 Mars 和 Phoenix。需要時長從高到低分別是 Hadoop MapReduce、Disco、Spark、Phoenix、Mars。
圖 10 .CPU 使用率對比圖

圖 10-CPU 使用率比較圖是分析任務執行過程當中 CPU 使用率情況圖。從上圖可以看出,Hadoop MapReduce、Disco 這兩個框架需要占用的 CPU 資源在 1000M 文本處理時基本到達最大飽和度 (大于 90%),Apache Spark 的 CPU 使用率沒有完全伴隨著文本文件增大而大幅上漲,Phoenix 和 Mars 基本控制在對 CPU 使用率較低的范圍內。
圖 11 . 內存使用率對比圖

圖 11 內存使用率比較圖是分析任務執行過程中內存使用情況對比。從圖中可以看出,Mars 和 Phoenix 這兩款框架所使用的內存在文本數據較小時是最少的,隨著文本數據的增大,Apache Spark 隨著數據量增大而內存大幅增加,Mars 和 Phoenix 有一定幅度的內存使用增加趨勢。當數據量達到本次測試最大的 1000M 文本時,Spark 框架對內存的消耗是最小的,Hadoop MapReduce 和 Disco 需要占用較多的內存。
從上面的測試結果我們得出,如果用戶只需要處理海量的文本文件,不需要考慮存儲、二次數據挖掘等,采用 Phoenix 或者 Mars 是最大性價比的選擇,但是由于 Mars 必須在 GPU 上運行,本身 GPU 由于價格因素,導致不太可能在實際應用場景里推廣,所以綜合來看 Phoenix 是性價比最高的框架。如果應用程序需要處理的數據量非常大,并且客戶希望計算出的數據可以被存儲和二次計算或數據挖掘,那 Hadoop MapReduce 較好,因為整個 Hadoop 生態圈龐大,支持性很好。Apache Spark 由于架構層面設計不同,所以對于 CPU、內存的使用率一直保持較低狀態,它未來可以用于海量數據分析用途。
回頁首
結束語
現實世界很多實例都可用 MapReduce 編程模型來表示,MapReduce 作為一個通用可擴展的、高容錯性的并行處理模型,可有效地處理海量數據,不斷地從中分析挖掘出有價值的信息。MapReduce 封裝了并行處理、負載均衡、容錯、數據本地化等技術難點細節。通過本文測試用例可以證明 MapReduce 適用于海量文本分析的應用場景,可以為處理大數據提供技術支撐。
原文 http://www.ibm.com/developerworks/cn/analytics/library/ba-1507-mapred