如何將 MongoDB MapReduce 速度提升 20 倍
分析在MongoDB中正成為越來越重要的話題,因為它在越來越多的大型項目中使用。人們厭倦了使用不同的軟件來做分析(包括Hadoop),它們顯然需要傳輸大量開銷的數據。
MongoDB提供了兩種內置分析數據的方法:Map Reduce和Aggregation框架。MR非常靈活,很容易部署。它通過分區工作良好,并允許大量輸出。MR在MongoDB v2.4中,通過使用JavaScript引擎把Spider Monkey替換成V8,性能提升很多。老板抱怨它太慢了,尤其是和Agg框架(使用C++)相比。讓我們看看能否從中榨出點果汁。
練習
讓我們插入1千萬條文檔,每個文檔包含一個從0到1000000的整數。這意味著平均有10個文檔會具有相同的值。
> for (var i = 0; i < 10000000; ++i){ db.uniques.insert({ dim0: Math.floor(Math.random()*1000000) });} > db.uniques.findOne() { "_id" : ObjectId("51d3c386acd412e22c188dec"), "dim0" : 570859 } > db.uniques.ensureIndex({dim0: 1}) > db.uniques.stats() { "ns" : "test.uniques", "count" : 10000000, "size" : 360000052, "avgObjSize" : 36.0000052, "storageSize" : 582864896, "numExtents" : 18, "nindexes" : 2, "lastExtentSize" : 153874432, "paddingFactor" : 1, "systemFlags" : 1, "userFlags" : 0, "totalIndexSize" : 576040080, "indexSizes" : { "_id_" : 324456384, "dim0_1" : 251583696 }, "ok" : 1 }
從這其中,我們想要計算出現的不同值的個數。可以用下列MR任務輕松完成這個工作:
> db.runCommand( { mapreduce: "uniques", map: function () { emit(this.dim0, 1); }, reduce: function (key, values) { return Array.sum(values); }, out: "mrout" }) { "result" : "mrout", "timeMillis" : 1161960, "counts" : { "input" : 10000000, "emit" : 10000000, "reduce" : 1059138, "output" : 999961 }, "ok" : 1 }
正如你在輸出內容中看到的,這耗費了大概1200秒(在EC2 M3實例上進行的測試)。有1千萬個map,1百萬個reduce,輸出了999961個文檔。結果就像下面這樣:
> db.mrout.find() { "_id" : 1, "value" : 10 } { "_id" : 2, "value" : 5 } { "_id" : 3, "value" : 6 } { "_id" : 4, "value" : 10 } { "_id" : 5, "value" : 9 } { "_id" : 6, "value" : 12 } { "_id" : 7, "value" : 5 } { "_id" : 8, "value" : 16 } { "_id" : 9, "value" : 10 } { "_id" : 10, "value" : 13 } ...
使用排序
我在上一篇博文中提到了在MR中使用排序多么有益。這個特性很少被理解。在這個例子中,處理未排序的輸入意味著MR引擎將得到隨機順序的值,在RAM中根本無法reduce。相反,它將不得不把所有文章寫入一個臨時收集的磁盤,然后按順序讀取并reduce。讓我們看看使用排序是否有助:
> db.runCommand( { mapreduce: "uniques", map: function () { emit(this.dim0, 1); }, reduce: function (key, values) { return Array.sum(values); }, out: "mrout", sort: {dim0: 1} }) { "result" : "mrout", "timeMillis" : 192589, "counts" : { "input" : 10000000, "emit" : 10000000, "reduce" : 1000372, "output" : 999961 }, "ok" : 1 }
確實大有助益!我們下降到192秒,已經提升了6倍。reduce的數量基本相同,但現在它們在寫入磁盤前,可以在RAM內完成。
使用多線程
MongoDB對單獨的MR作業并不使用多線程——它僅僅對多作業使用多線程。但通過多核CPU,在單個服務器使用Hadoop風格來并行作業非常有優勢。我們需要做的是把輸入分成幾塊,通過各個塊來加速一個MR作業。也許數據集有簡單的方法來分割,但其他使用splitVector命令(不明確)可以使你很快的找到分割點:
> db.runCommand({splitVector: "test.uniques", keyPattern: {dim0: 1}, maxChunkSizeBytes: 32000000}) { "timeMillis" : 6006, "splitKeys" : [ { "dim0" : 18171 }, { "dim0" : 36378 }, { "dim0" : 54528 }, { "dim0" : 72717 }, … { "dim0" : 963598 }, { "dim0" : 981805 } ], "ok" : 1 }這個命令在超過1千萬個文檔中找到分割點僅僅需要花費5秒,很快!那么現在我們僅僅需要一個方法來創建多個MR作業。從一個應用服務器,使用多線程和為MR命令使用$gt/$It查詢 相當簡單。通過shell,你可以使用ScopedThread,使用方法如下:
> var t = new ScopedThread(mapred, 963598, 981805) > t.start() > t.join()
現在我們把一些快速運行的js代碼放在一起,它們會產生4個線程(或者更多的線程),執行后呈現出下面的結果:
> var res = db.runCommand({splitVector: "test.uniques", keyPattern: {dim0: 1}, maxChunkSizeBytes: 32 *1024 * 1024 }) > var keys = res.splitKeys > keys.length 39 > var mapred = function(min, max) { return db.runCommand({ mapreduce: "uniques", map: function () { emit(this.dim0, 1); }, reduce: function (key, values) { return Array.sum(values); }, out: "mrout" + min, sort: {dim0: 1}, query: { dim0: { $gte: min, $lt: max } } }) } > var numThreads = 4 > var inc = Math.floor(keys.length / numThreads) + 1 > threads = []; for (var i = 0; i < numThreads; ++i) { var min = (i == 0) ? 0 : keys[i * inc].dim0; var max = (i * inc + inc >= keys.length) ? MaxKey : keys[i * inc + inc].dim0 ; print("min:" + min + " max:" + max); var t = new ScopedThread(mapred, min, max); threads.push(t); t.start() } min:0 max:274736 min:274736 max:524997 min:524997 max:775025 min:775025 max:{ "$maxKey" : 1 } connecting to: test connecting to: test connecting to: test connecting to: test > for (var i in threads) { var t = threads[i]; t.join(); printjson(t.returnData()); } { "result" : "mrout0", "timeMillis" : 205790, "counts" : { "input" : 2750002, "emit" : 2750002, "reduce" : 274828, "output" : 274723 }, "ok" : 1 } { "result" : "mrout274736", "timeMillis" : 189868, "counts" : { "input" : 2500013, "emit" : 2500013, "reduce" : 250364, "output" : 250255 }, "ok" : 1 } { "result" : "mrout524997", "timeMillis" : 191449, "counts" : { "input" : 2500014, "emit" : 2500014, "reduce" : 250120, "output" : 250019 }, "ok" : 1 } { "result" : "mrout775025", "timeMillis" : 184945, "counts" : { "input" : 2249971, "emit" : 2249971, "reduce" : 225057, "output" : 224964 }, "ok" : 1 } "ok" : 1 } { "result" : "mrout775025", "timeMillis" : 184945, "counts" : { "input" : 2249971, "emit" : 2249971, "reduce" : 225057, "output" : 224964 }, "ok" : 1 }
第一個線程時間確實超過了其他的線程,但是平均每個線程仍然用了大約190s的時間.這意味著并沒有一個線程快!這有點奇怪,自從用了‘top’,在某種程度上,你可以看到所有的內核運行情況。
使用多數據庫
問題是在多線程之間會有很多鎖競爭。在上鎖時,MR并不是那么無私的(它每1000次讀操作就會產生一次鎖定),而且MR任務還會執行許多寫操作,導致線程最終都會在等待另一個線程。由于每個MongoDB數據庫都有私有鎖,讓我們嘗試為每一個線程使用一個不同的輸出數據庫:
> var mapred = function(min, max) { return db.runCommand({ mapreduce: "uniques", map: function () { emit(this.dim0, 1); }, reduce: function (key, values) { return Array.sum(values); }, out: { replace: "mrout" + min, db: "mrdb" + min }, sort: {dim0: 1}, query: { dim0: { $gte: min, $lt: max } } }) } > threads = []; for (var i = 0; i < numThreads; ++i) { var min = (i == 0) ? 0 : keys[i * inc].dim0; var max = (i * inc + inc >= keys.length) ? MaxKey : keys[i * inc + inc].dim0 ; print("min:" + min + " max:" + max); var t = new ScopedThread(mapred, min, max); threads.push(t); t.start() } min:0 max:274736 min:274736 max:524997 min:524997 max:775025 min:775025 max:{ "$maxKey" : 1 } connecting to: test connecting to: test connecting to: test connecting to: test > for (var i in threads) { var t = threads[i]; t.join(); printjson(t.returnData()); } ... { "result" : { "db" : "mrdb274736", "collection" : "mrout274736" }, "timeMillis" : 105821, "counts" : { "input" : 2500013, "emit" : 2500013, "reduce" : 250364, "output" : 250255 }, "ok" : 1 } ...
這才像話!我們現在降到了100秒,這意味著相比一個線程而言已經提升了2倍。還算差強人意吧。現在我們只有4個核所以只快了2倍,要是在8核CPU上將會快4倍,以此類推。
使用純JavaScript模式
當把輸入數據拆分到不同線程上去的時候,發生了一些有趣的事情:每個線程現在有大約250000個不同的值來輸出,而不是1百萬。這意味著我們可以使用“純JS模式”,它可以通過使用jsMode:true來開啟。開啟后,MongoDB在處理時將不會把對象在JS和BSON之間來回翻譯,相反,它使用一個限額500000個key的內部JS字典來化簡所有對象。讓我們看看這是否有用:
> var mapred = function(min, max) { return db.runCommand({ mapreduce: "uniques", map: function () { emit(this.dim0, 1); }, reduce: function (key, values) { return Array.sum(values); }, out: { replace: "mrout" + min, db: "mrdb" + min }, sort: {dim0: 1}, query: { dim0: { $gte: min, $lt: max } }, jsMode: true }) } > threads = []; for (var i = 0; i < numThreads; ++i) { var min = (i == 0) ? 0 : keys[i * inc].dim0; var max = (i * inc + inc >= keys.length) ? MaxKey : keys[i * inc + inc].dim0 ; print("min:" + min + " max:" + max); var t = new ScopedThread(mapred, min, max); threads.push(t); t.start() } min:0 max:274736 min:274736 max:524997 min:524997 max:775025 min:775025 max:{ "$maxKey" : 1 } connecting to: test connecting to: test connecting to: test connecting to: test > for (var i in threads) { var t = threads[i]; t.join(); printjson(t.returnData()); } ... { "result" : { "db" : "mrdb274736", "collection" : "mrout274736" }, "timeMillis" : 70507, "counts" : { "input" : 2500013, "emit" : 2500013, "reduce" : 250156, "output" : 250255 }, "ok" : 1 } ...
現在我們降到了70秒,就搞定了任務!jsMode真心有用,尤其是當對象有很多字段的時候。這里只有一個數字字段就已經下降了30%。
MongoDB在2.6版本上的改進
在很早的2.6版本中,在任何的js函數調用的時候,我們就通過一段代碼設置一個可選參數”args“。這種做法并不標準,不在使用。但是它確有留下來的原因(查看 SERVER-4654)。讓我們從Git資源庫中導入MongoDB,編譯并運行進行測試:
... { "result" : { "db" : "mrdb274736", "collection" : "mrout274736" }, "timeMillis" : 62785, "counts" : { "input" : 2500013, "emit" : 2500013, "reduce" : 250156, "output" : 250255 }, "ok" : 1 } ...
這是明顯的提高了3倍的運行速度,時間降低到了60s,大約10-15%。這種變化也提高了整體JS引擎的堆消耗。
結語
回顧一下,對于同一個MR作業,我們開始時花費1200秒,最后花費60秒,提升了20倍!這項提高應該對大部分應用都有效,即使有些trick不太理想(例如,使用多種輸出dbs/collections)。至少這能提供給人們思路,如何加速他們的MR作業,希望這些特征在將來會更加易于使用。接下來的票將使得‘splitVector’命令更加有用,這張票將在同一個數據庫中提升多MR作業。干杯!