MongoDB分片存儲的集群架構實現
如果需要存儲大量數據,或者系統的讀寫吞吐量很大的時候,單個server就很難滿足需求了。這個時候我們可以使用MongoDB的分片機制來解決這些問題。
分片的基本概念
分片(sharding)是一種水平擴展(horizontal scaling)的方式,把一個大的數據集分散到多個片服務器上,所有的片服務器將組成一個邏輯上的數據庫來存儲這個大的數據集。分片對用戶(應用層)是透明的,用戶不會知道數據很被存放到哪個片服務器上。
這種方式有兩個好處:
- 分片之后,每個片服務器處理的請求數量將遠遠小于不分片的情況,這樣就可以水平擴展整個分片集群的存儲量和吞吐量
- 分片之后每個片服務器需要存儲、維護的數據量將大大減少
分片以及分片集群
下面的圖來自MongoDB文檔,圖片大概展示了分片的概念。1TB的數據被分配到四個片服務器上,四個片服務器組成了邏輯Collection I。
在MongoDB中,我們可以搭建分片集群(如下圖)來實現數據分片的功能。分片集群中有三個基本部分組成:
- Mongos(路由服務器):是MongoDB中的路由進程,它將路由所有來自客戶端的請求,分發到每個分片server;然后它將聚合各個分片server的結果,返回個客戶端
- config server(配置服務器):該服務器存儲了集群的配置信息:數據和片的對應關系。路由器將根據數據和分片的mapping,把數據操作分配到特定的分片上
- shard server(片服務器):用來保存子集合的數據容器。片服務器可以是單個的mongod服務器,也可以是一個副本集
片鍵
當設置分片時,需要從集合里面選擇一個或幾個鍵,把選擇出來的鍵作為數據拆分的依據,這個鍵叫做片鍵或復合片鍵。
選好片鍵后,MongoDB將不允許插入沒有片鍵的文檔,但是允許不同文檔的片鍵類型不一樣。
塊(chunk)
在一個shard server內部,MongoDB還是會把數據分為chunks,每個chunk代表這個shard server內部一部分數據。chunk的產生,會有以下兩個用途:
-
Splitting:
- 當一個chunk的大小超過配置中的chunk size時,MongDB的后臺進程會把這個chunk切分成更小的chunk,從而避免chunk過大的情況
-
Balancing:
- 在MongoDB中,balancer是一個后臺進程,負責chunk的遷移,從而均衡各個shard server的負載
分片集群的搭建
搭建
下面我們開始單鍵一個分片集群,來通過一些實踐體驗一下分片集群。
-
啟動分片中需要的MongDB實例,在這里為了簡單,片服務器使用單個MongoDB實例,而不是副本集
啟動配置服務器
mongod.exe --dbpath="C:\mongodb\db\config" --port 11110
啟動路由服務器
mongos.exe --port=11111 --configdb=127.0.0.1:11110
啟動片服務器
mongod.exe --dbpath="c:\mongodb\db\shard0" --port=22220 mongod.exe --dbpath="c:\mongodb\db\shard1" --port=22221
-
添加片服務器
通過MongoDB shell連接到mongos實例,并且通過以下命令添加
mongo.exe 127.0.0.1:11111 use admin db.runCommand({"addshard":"127.0.0.1:22220",allowLocal:true}) db.runCommand({"addshard":"127.0.0.1:22221",allowLocal:true})
-
啟動分片,并且設置片鍵
mongos> use admin switched to db admin mongos> db.runCommand({"enablesharding":"test"}) { "ok" : 1 } mongos> db.runCommand({"shardcollection":"test.student","key":{"sid":1}}) { "collectionsharded" : "test.student", "ok" : 1 } mongos>
查看分片集群狀態
通過下面的命令可以查看分片集群的狀態,可以得到以下信息:
- 分片集群中所有的片服務器
- 分片集群中所有的數據庫,以及哪些數據庫已經啟動分片
- 已分片數據庫的主片和片鍵
mongos> db.runCommand({"listShards":1}) { "shards" : [ { "_id" : "shard0000", "host" : "127.0.0.1:22220" }, { "_id" : "shard0001", "host" : "127.0.0.1:22221" } ], "ok" : 1 } mongos> db.printShardingStatus() --- Sharding Status --- sharding version: { "_id" : 1, "version" : 3, "minCompatibleVersion" : 3, "currentVersion" : 4, "clusterId" : ObjectId("548bd82b9355a9edbf5efa69") } shards: { "_id" : "shard0000", "host" : "127.0.0.1:22220" } { "_id" : "shard0001", "host" : "127.0.0.1:22221" } databases: { "_id" : "admin", "partitioned" : false, "primary" : "config" } { "_id" : "test", "partitioned" : true, "primary" : "shard0000" } test.student shard key: { "sid" : 1 } chunks: shard0000 1 { "name" : { "$minKey" : 1 } } -->> { " sid " : { "$maxKey" : 1 } } on : shard0000 Timestamp(1, 0) mongos>
刪除分片
在上面的輸出中,可以看到以下輸出語句。在所有的片服務器中,都會有一個主片,這個片的刪除需要特殊的命令。同時,刪除分片可能需要一定的時間,因為MongoDB會把要刪除分片上的數據移動到其他的分片上。
{ "_id" : "test", "partitioned" : true, "primary" : "shard0000" }
首先,從簡單的開始,先刪除非主片shard0001,從輸出可以看到,shard0001處于draining狀態,表示數據正在轉移,當需要遷移的數據量很大的時候,可能就需要等待一些時間
mongos> db.runCommand({"removeshard":"127.0.0.1:22221"}) { "msg" : "draining started successfully", "state" : "started", "shard" : "shard0001", "ok" : 1 } mongos> db.runCommand({"listShards":1}) { "shards" : [ { "_id" : "shard0000", "host" : "127.0.0.1:22220" }, { "_id" : "shard0001", "draining" : true, "host" : "127.0.0.1:22221" } ], "ok" : 1 } mongos>
再次使用刪除操作,可以看到shard0001已經被刪除
mongos> db.runCommand({"removeshard":"127.0.0.1:22221"}) { "msg" : "removeshard completed successfully", "state" : "completed", "shard" : "shard0001", "ok" : 1 } mongos> db.runCommand({"listShards":1}) { "shards" : [ { "_id" : "shard0000", "host" : "127.0.0.1:22220" } ], "ok" : 1 } mongos>
下面重新添加剛才刪除的分片,這次嘗試刪除主分片,會遇到以下錯誤
mongos> db.runCommand({"removeshard":"127.0.0.1:22220"}) { "msg" : "draining started successfully", "state" : "started", "shard" : "shard0000", "note" : "you need to drop or movePrimary these databases", "dbsToMove" : [ "test" ], "ok" : 1 } mongos>
這時,使用moveprimary命令把shard0001設置為主分片
mongos> db.runCommand({"moveprimary":"test","to":"127.0.0.1:22221"}) { "primary " : "shard0001:127.0.0.1:22221", "ok" : 1 } mongos> db.printShardingStatus() --- Sharding Status --- sharding version: { "_id" : 1, "version" : 3, "minCompatibleVersion" : 3, "currentVersion" : 4, "clusterId" : ObjectId("548bd82b9355a9edbf5efa69") } shards: { "_id" : "shard0000", "draining" : true, "host" : "127.0.0.1:22220" } { "_id" : "shard0001", "host" : "127.0.0.1:22221" } databases: { "_id" : "admin", "partitioned" : false, "primary" : "config" } { "_id" : "test", "partitioned" : true, "primary" : "shard0001" } test.student shard key: { " sid " : 1 } chunks: shard0001 1 { "name" : { "$minKey" : 1 } } -->> { " sid " : { "$maxKey" : 1 } } on : shard0001 Timestamp(2, 0) mongos>
然后就可以直接刪除shard0000了
mongos> db.runCommand({"removeshard":"127.0.0.1:22220"}) { "msg" : "removeshard completed successfully", "state" : "completed", "shard" : "shard0000", "ok" : 1 } mongos> db.runCommand({"listShards":1}) { "shards" : [ { "_id" : "shard0001", "host" : "127.0.0.1:22221" } ], "ok" : 1 } mongos>
最后重新添加剛才刪除的分片,將遇到以下錯誤,提示test數據庫已經存在
mongos> db.runCommand({"addshard":"127.0.0.1:22220",allowLocal:true}) { "ok" : 0, "errmsg" : "can't add shard 127.0.0.1:22220 because a local database 'test' exists in another shard0001:127.0.0.1:22221" } mongos>
這時,就需要通過MongoDB shell連接到這個實例,刪除test數據庫。然后重新添加。
C:\mongodb\bin>mongo.exe --port=22220 MongoDB shell version: 2.4.6 connecting to: 127.0.0.1:22220/test > use test switched to db test > db.dropDatabase() { "dropped" : "test", "ok" : 1 } >
mongos> db.runCommand({"addshard":"127.0.0.1:22220",allowLocal:true}) { "shardAdded" : "shard0002", "ok" : 1 } mongos> db.runCommand({"listShards":1}) { "shards" : [ { "_id" : "shard0001", "host" : "127.0.0.1:22221" }, { "_id" : "shard0002", "host" : "127.0.0.1:22220" } ], "ok" : 1 } mongos>
在這個部分中可以看到,通過上面三個步驟,一個簡單的分片集群就建立起來了。下面我們開始使用這個分片集群。
分片集群的管理
首先切換到config數據庫,看看都有哪些表
mongos> use config switched to db config mongos> show collections changelog chunks collections databases lockpings locks mongos settings shards system.indexes tags version mongos>
然后看看下面幾張表中的數據
- chunks表:通過這張表可以看到分片集群中的數據塊
- databases表:表里顯示了所有在分片集群中存儲的數據庫,同時可以看到那些數據庫是啟動分片的,并且指示出主分片
- settings表:這張表包含分片集群的配置項,其中塊大小(chunk size)就是通過這個文件設置
- shards表:這張表包含了分片集群中的所有片
mongos> db.chunks.find() { "_id" : "test.student-sid_MinKey", "lastmod" : Timestamp(1, 0), "lastmodEpoch" : ObjectId("548c02d79355a9edbf5f0193"), "ns" : "test.studen t", "min" : { "sid" : { "$minKey" : 1 } }, "max" : { "sid" : { "$maxKey" : 1 } }, "shard" : "shard0002" } mongos> db.databases.find() { "_id" : "admin", "partitioned" : false, "primary" : "config" } { "_id" : "test", "partitioned" : true, "primary" : "shard0002" } { "_id" : "amdin", "partitioned" : false, "primary" : "shard0002" } mongos> db.settings.find() { "_id" : "chunksize", "value" : 1 } mongos> db.shards.find() { "_id" : "shard0001", "host" : "127.0.0.1:22221" } { "_id" : "shard0002", "host" : "127.0.0.1:22220" } mongos>
默認的chunk size是64MB,為了方便看到下面平衡(Balancing)的過程,我們重設chunk size為1MB
mongos> use config switched to db config mongos> db.settings.save({ _id:"chunksize", value: 1}) mongos> db.settings.find() { "_id" : "chunksize", "value" : 1 } mongos>
分片集群的使用
基本測試
基于前面創建的分片集群,我們進行一些測試,首先插入10W條文檔,然后查詢分片集群的狀態。從下面的輸出中可以看到,10W條文檔一共使用了10個chunk,2個chunk在shard0002上,8個chunk在shard0001上。
mongos> db.printShardingStatus() --- Sharding Status --- sharding version: { "_id" : 1, "version" : 3, "minCompatibleVersion" : 3, "currentVersion" : 4, "clusterId" : ObjectId("548bd82b9355a9edbf5efa69") } shards: { "_id" : "shard0001", "host" : "127.0.0.1:22221" } { "_id" : "shard0002", "host" : "127.0.0.1:22220" } databases: { "_id" : "admin", "partitioned" : false, "primary" : "config" } { "_id" : "test", "partitioned" : true, "primary" : "shard0002" } test.student shard key: { "sid" : 1 } chunks: shard0002 2 shard0001 8 { "sid" : { "$minKey" : 1 } } -->> { "sid" : 0 } on : shard0002 Timestamp(2, 1) { "sid" : 0 } -->> { "sid" : 6673 } on : shard0002 Timestamp(1, 3) { "sid" : 6673 } -->> { "sid" : 20765 } on : shard0001 Timestamp(2, 2) { "sid" : 20765 } -->> { "sid" : 34839 } on : shard0001 Timestamp(2, 4) { "sid" : 34839 } -->> { "sid" : 48913 } on : shard0001 Timestamp(2, 6) { "sid" : 48913 } -->> { "sid" : 63011 } on : shard0001 Timestamp(2, 8) { "sid" : 63011 } -->> { "sid" : 74942 } on : shard0001 Timestamp(2, 10) { "sid" : 74942 } -->> { "sid" : 86877 } on : shard0001 Timestamp(2, 12) { "sid" : 86877 } -->> { "sid" : 98836 } on : shard0001 Timestamp(2, 14) { "sid" : 98836 } -->> { "sid" : { "$maxKey" : 1 } } on : shard0001 Timestamp(2, 15) { "_id" : "amdin", "partitioned" : false, "primary" : "shard0002" } mongos>
經過一段時間后,再次查詢分片集群的狀態,可以看到,經過Balancing過程,所有的chunk基本要均勻的分布到了兩個片服務器中。
MongoDB這種平衡的能力能夠對各個片服務器進行負載均衡,但是如果數據特別大話則可能會非常的慢,因為數據遷移了會很大。
mongos> db.printShardingStatus() --- Sharding Status --- sharding version: { "_id" : 1, "version" : 3, "minCompatibleVersion" : 3, "currentVersion" : 4, "clusterId" : ObjectId("548bd82b9355a9edbf5efa69") } shards: { "_id" : "shard0001", "host" : "127.0.0.1:22221" } { "_id" : "shard0002", "host" : "127.0.0.1:22220" } databases: { "_id" : "admin", "partitioned" : false, "primary" : "config" } { "_id" : "test", "partitioned" : true, "primary" : "shard0002" } test.student shard key: { "sid" : 1 } chunks: shard0002 5 shard0001 5 { "sid" : { "$minKey" : 1 } } -->> { "sid" : 0 } on : shard0002 Timestamp(2, 1) { "sid" : 0 } -->> { "sid" : 6673 } on : shard0002 Timestamp(1, 3) { "sid" : 6673 } -->> { "sid" : 20765 } on : shard0002 Timestamp(3, 0) { "sid" : 20765 } -->> { "sid" : 34839 } on : shard0002 Timestamp(4, 0) { "sid" : 34839 } -->> { "sid" : 48913 } on : shard0002 Timestamp(5, 0) { "sid" : 48913 } -->> { "sid" : 63011 } on : shard0001 Timestamp(5, 1) { "sid" : 63011 } -->> { "sid" : 74942 } on : shard0001 Timestamp(2, 10) { "sid" : 74942 } -->> { "sid" : 86877 } on : shard0001 Timestamp(2, 12) { "sid" : 86877 } -->> { "sid" : 98836 } on : shard0001 Timestamp(2, 14) { "sid" : 98836 } -->> { "sid" : { "$maxKey" : 1 } } on : shard0001 Timestamp(2, 15) { "_id" : "amdin", "partitioned" : false, "primary" : "shard0002" } mongos>
當然,也可以通過下面命令查看某個collection的數據在分片中的分布情況
mongos> db.student.getShardDistribution() Shard shard0001 at 127.0.0.1:22221 data : 4.28MiB docs : 51087 chunks : 5 estimated data per chunk : 878KiB estimated docs per chunk : 10217 Shard shard0002 at 127.0.0.1:22220 data : 4.1MiB docs : 48913 chunks : 5 estimated data per chunk : 840KiB estimated docs per chunk : 9782 Totals data : 8.39MiB docs : 100000 chunks : 10 Shard shard0001 contains 51.08% data, 51.08% docs in cluster, avg obj size on shard : 88B Shard shard0002 contains 48.91% data, 48.91% docs in cluster, avg obj size on shard : 88B mongos>
對現有數據庫分片
現在假設我們有一個”127.0.0.1:22222″的MongDB實例,我們想對上面已有的數據進行分片。
首先插入測試數據
use school for(var i=0;i<100000;i++){ var randAge = parseInt(5*Math.random()) + 20; var gender = (randAge%2)?"Male":"Female"; db.student.insert({"sid":i, "name":"Will"+i, "gender": gender, "age": randAge}); }
然后,在現有的分片集群中加入”127.0.0.1:22222″作為一個片服務器,從下面的輸出可以看到:
- 分片集群已經加入片服務器”127.0.0.1:22222″
- 分片集群中原有的需要分片的數據庫(test)的數據被Balancing到新的片服務器上
- 新加入的片服務器上的數據庫(school)沒有開啟分片
mongos> use admin switched to db admin mongos> db.runCommand({"addshard":"127.0.0.1:22222",allowLocal:true}) { "shardAdded" : "shard0003", "ok" : 1 } mongos> db.printShardingStatus() --- Sharding Status --- sharding version: { "_id" : 1, "version" : 3, "minCompatibleVersion" : 3, "currentVersion" : 4, "clusterId" : ObjectId("548bd82b9355a9edbf5efa69") } shards: { "_id" : "shard0001", "host" : "127.0.0.1:22221" } { "_id" : "shard0002", "host" : "127.0.0.1:22220" } { "_id" : "shard0003", "host" : "127.0.0.1:22222" } databases: { "_id" : "admin", "partitioned" : false, "primary" : "config" } { "_id" : "test", "partitioned" : true, "primary" : "shard0002" } test.student shard key: { "sid" : 1 } chunks: shard0003 3 shard0002 4 shard0001 3 { "sid" : { "$minKey" : 1 } } -->> { "sid" : 0 } on : shard0003 Timestamp(7, 0) { "sid" : 0 } -->> { "sid" : 6673 } on : shard0002 Timestamp(7, 1) { "sid" : 6673 } -->> { "sid" : 20765 } on : shard0002 Timestamp(3, 0) { "sid" : 20765 } -->> { "sid" : 34839 } on : shard0002 Timestamp(4, 0) { "sid" : 34839 } -->> { "sid" : 48913 } on : shard0002 Timestamp(5, 0) { "sid" : 48913 } -->> { "sid" : 63011 } on : shard0003 Timestamp(6, 0) { "sid" : 63011 } -->> { "sid" : 74942 } on : shard0003 Timestamp(8, 0) { "sid" : 74942 } -->> { "sid" : 86877 } on : shard0001 Timestamp(8, 1) { "sid" : 86877 } -->> { "sid" : 98836 } on : shard0001 Timestamp(2, 14) { "sid" : 98836 } -->> { "sid" : { "$maxKey" : 1 } } on : shard0001 Timestamp(2, 15) { "_id" : "amdin", "partitioned" : false, "primary" : "shard0002" } { "_id" : "school", "partitioned" : false, "primary" : "shard0003" } mongos>
當我們使用以下命令開啟school的分片時,會得到一個錯誤,提示我們sid沒有索引。
根據MongDB文檔,所有分片的集合在片鍵上都必須建索引,這是MongoDB自動執行的。所以,對已有的數據庫加入分片集群中的時候,可能就需要手動建立索引了。
根據這一點我們也可以得到,片鍵最好是查詢中經常用到的字段,因為如果選擇某個字段作為片鍵但是基本不在這個字段做查詢那么等于浪費了一個索引,而增加一個索引總是會使得插入操作變慢。
mongos> db.runCommand({"enablesharding":"school"}) { "ok" : 1 } mongos> db.runCommand({"shardcollection":"school.student","key":{"sid":1}}) { "proposedKey" : { "sid" : 1 }, "curIndexes" : [ { "v" : 1, "key" : { "_id" : 1 }, "ns" : "school.student", "name" : "_id_" } ], "ok" : 0, "errmsg" : "please create an index that starts with the shard key before sharding." }
回到剛才的錯誤,我們現在手動建立一個sid的索引,然后重新設置片鍵,這次成功了,并且school中的數據已經被Balancing到各個片服務器上。
mongos> use school switched to db school mongos> db.student.getIndexes() [ { "v" : 1, "key" : { "_id" : 1 }, "ns" : "school.student", "name" : "_id_" } ] mongos> db.student.ensureIndex({"sid":1}) mongos> use admin switched to db admin mongos> db.runCommand({"shardcollection":"school.student","key":{"sid":1}}) { "collectionsharded" : "school.student", "ok" : 1 } mongos> db.printShardingStatus() --- Sharding Status --- sharding version: { "_id" : 1, "version" : 3, "minCompatibleVersion" : 3, "currentVersion" : 4, "clusterId" : ObjectId("548bd82b9355a9edbf5efa69") } shards: { "_id" : "shard0001", "host" : "127.0.0.1:22221" } { "_id" : "shard0002", "host" : "127.0.0.1:22220" } { "_id" : "shard0003", "host" : "127.0.0.1:22222" } databases: { "_id" : "admin", "partitioned" : false, "primary" : "config" } { "_id" : "test", "partitioned" : true, "primary" : "shard0002" } test.student shard key: { "sid" : 1 } chunks: shard0003 3 shard0002 4 shard0001 3 { "sid" : { "$minKey" : 1 } } -->> { "sid" : 0 } on : shard0003 Timestamp(7, 0) { "sid" : 0 } -->> { "sid" : 6673 } on : shard0002 Timestamp(7, 1) { "sid" : 6673 } -->> { "sid" : 20765 } on : shard0002 Timestamp(3, 0) { "sid" : 20765 } -->> { "sid" : 34839 } on : shard0002 Timestamp(4, 0) { "sid" : 34839 } -->> { "sid" : 48913 } on : shard0002 Timestamp(5, 0) { "sid" : 48913 } -->> { "sid" : 63011 } on : shard0003 Timestamp(6, 0) { "sid" : 63011 } -->> { "sid" : 74942 } on : shard0003 Timestamp(8, 0) { "sid" : 74942 } -->> { "sid" : 86877 } on : shard0001 Timestamp(8, 1) { "sid" : 86877 } -->> { "sid" : 98836 } on : shard0001 Timestamp(2, 14) { "sid" : 98836 } -->> { "sid" : { "$maxKey" : 1 } } on : shard0001 Timestamp(2, 15) { "_id" : "amdin", "partitioned" : false, "primary" : "shard0002" } { "_id" : "school", "partitioned" : true, "primary" : "shard0003" } school.student shard key: { "sid" : 1 } chunks: shard0001 6 shard0002 5 shard0003 6 { "sid" : { "$minKey" : 1 } } -->> { "sid" : 5957 } on : shard0001 Timestamp(2, 0) { "sid" : 5957 } -->> { "sid" : 11915 } on : shard0002 Timestamp(3, 0) { "sid" : 11915 } -->> { "sid" : 17873 } on : shard0001 Timestamp(4, 0) { "sid" : 17873 } -->> { "sid" : 23831 } on : shard0002 Timestamp(5, 0) { "sid" : 23831 } -->> { "sid" : 29789 } on : shard0001 Timestamp(6, 0) { "sid" : 29789 } -->> { "sid" : 35747 } on : shard0002 Timestamp(7, 0) { "sid" : 35747 } -->> { "sid" : 41705 } on : shard0001 Timestamp(8, 0) { "sid" : 41705 } -->> { "sid" : 47663 } on : shard0002 Timestamp(9, 0) { "sid" : 47663 } -->> { "sid" : 53621 } on : shard0001 Timestamp(10, 0) { "sid" : 53621 } -->> { "sid" : 59579 } on : shard0002 Timestamp(11, 0) { "sid" : 59579 } -->> { "sid" : 65537 } on : shard0001 Timestamp(12, 0) { "sid" : 65537 } -->> { "sid" : 71495 } on : shard0003 Timestamp(12, 1) { "sid" : 71495 } -->> { "sid" : 77453 } on : shard0003 Timestamp(1, 12) { "sid" : 77453 } -->> { "sid" : 83411 } on : shard0003 Timestamp(1, 13) { "sid" : 83411 } -->> { "sid" : 89369 } on : shard0003 Timestamp(1, 14) { "sid" : 89369 } -->> { "sid" : 95327 } on : shard0003 Timestamp(1, 15) { "sid" : 95327 } -->> { "sid" : { "$maxKey" : 1 } } on : shard0003 Timestamp(1, 16) mongos>
總結
通過本篇文章了解了分片的基本概念,同時搭建了一個簡單的分片集群,進行了一些基本測試。
對于什么時候要使用分片這個問題,可以主要有下面幾個參考點:
- 數據量很大單個存儲服務器無法滿需需求
- 系統中活動數據量的大小很快將要超過系統RAM的最大容量
- 系統有大量的讀寫請求,單個單個存儲服務器不足以支持
來源:田小計劃