redis數據庫之主從復制
redis除了基本功能外,還提供了主從復制功能。一個redis服務可以有多個slave服務,而這個slave服務又可以有slave服務。master服務把屬于自己的slave服務用鏈表管理起來,也就是struct redisServer中的slaves成員,slave服務會通過redisServer中的masterhost和masterport來標識它的master服務的ip和port。
redis有兩種方式來標識從屬于哪個master服務:
1、在 配置文件中配置slaveof masterhost masterport
2、發送slaveof命令。
同樣redis也提供了兩種方式來同步主從的數據庫的。
1、通過定時器來完成同步
2、master服務每次執行的命令都會根據情況發送一份給slave服務。
首先來講述下定時器完成同步的實現:
redis服務的定時功能都是通過serverCron完成,而在serverCron中會調用replicationCron,這個函數就完成了不同的功能。
void replicationCron(void) { ....... if (server.repl_state == REDIS_REPL_CONNECT) { redisLog(REDIS_NOTICE,"Connecting to MASTER..."); if (connectWithMaster() == REDIS_OK) { redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync started"); } } ....... }replicationCron檢查一些超時情況做一些超時的處理,然后會調用connectWithMaster去連接master服務。
int connectWithMaster(void) { fd = anetTcpNonBlockConnect(NULL,server.masterhost,server.masterport); ...... if (aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL) == AE_ERR) ...... }connectWithMaster首先會向master服務發起連接,然后創建一個讀寫事件并把設置server.repl_state = REDIS_REPL_CONNECTING;
void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) { ...... if (syncWrite(fd,"SYNC\r\n",6,server.repl_syncio_timeout*1000) == -1) { redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s", strerror(errno)); goto error; } ...... if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL) == AE_ERR) ...... server.repl_state = REDIS_REPL_TRANSFER; ...... }syncWithMaster會發送SYNC命令給master服務,然后設置可讀事件的handler,并把slave的狀態設置為傳輸狀態。下面來看下master服務接收到sync命令的處理:
void syncCommand(redisClient *c) { ...... // 檢查是否已經有 BGSAVE 在執行,否則就創建一個新的 BGSAVE 任務 if (server.rdb_child_pid != -1) { /* Ok a background save is in progress. Let's check if it is a good * one for replication, i.e. if there is another slave that is * registering differences since the server forked to save */ // 已有 BGSAVE 在執行,檢查它能否用于當前客戶端的 SYNC 操作 redisClient *slave; listNode *ln; listIter li; // 檢查是否有其他客戶端在等待 SYNC 進行 listRewind(server.slaves,&li); while((ln = listNext(&li))) { slave = ln->value; if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break; } if (ln) { /* Perfect, the server is already registering differences for * another slave. Set the right state, and copy the buffer. */ // 找到一個同樣在等到 SYNC 的客戶端 // 設置當前客戶端的狀態,并復制 buffer 。 copyClientOutputBuffer(c,slave); c->replstate = REDIS_REPL_WAIT_BGSAVE_END; redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC"); } else { /* No way, we need to wait for the next BGSAVE in order to * register differences */ // 沒有客戶端在等待 SYNC ,當前客戶端只能等待下次 BGSAVE 進行 c->replstate = REDIS_REPL_WAIT_BGSAVE_START; redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC"); } } else { // 沒有 BGSAVE 在進行,自己啟動一個。 /* Ok we don't have a BGSAVE in progress, let's start one */ redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC"); if (rdbSaveBackground(server.rdb_filename) != REDIS_OK) { redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE"); addReplyError(c,"Unable to perform background save"); return; } // 等待 BGSAVE 結束 c->replstate = REDIS_REPL_WAIT_BGSAVE_END; } c->repldbfd = -1; c->flags |= REDIS_SLAVE; c->slaveseldb = 0; listAddNodeTail(server.slaves,c); ...... }這里并不是真正處理同步的,而是把slave插入到master中slaves鏈表中等待真正同步的操作。那什么時候才是真正同步的操作呢?請看updateSlavesWaitingBgsave
void updateSlavesWaitingBgsave(int bgsaveerr) { ...... listRewind(server.slaves,&li); while((ln = listNext(&li))) { redisClient *slave = ln->value; if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) { // 告訴那些這次不能同步的客戶端,可以等待下次 BGSAVE 了。 startbgsave = 1; slave->replstate = REDIS_REPL_WAIT_BGSAVE_END; } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) { // 這些是本次可以同步的客戶端 struct redis_stat buf; // 如果 BGSAVE 失敗,釋放 slave 節點 if (bgsaveerr != REDIS_OK) { freeClient(slave); redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error"); continue; } // 打開 .rdb 文件 if ((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == -1 || // 如果打開失敗,釋放并清除 redis_fstat(slave->repldbfd,&buf) == -1) { freeClient(slave); redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno)); continue; } // 偏移量 slave->repldboff = 0; // 數據庫大小(.rdb 文件的大小) slave->repldbsize = buf.st_size; // 狀態 slave->replstate = REDIS_REPL_SEND_BULK; // 清除 slave->fd 的寫事件 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE); // 創建一個將 .rdb 文件內容發送到附屬節點的寫事件 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) { freeClient(slave); continue; } } } ...... }這個函數會每個slave創建一個可寫的事件,并從rdb文件中把數據讀出來,通過sendBulkToSlave發送給slave。master發送完后,slave接受數據并進行處理,上面已經看到slave給讀事件設置了handler(readSyncBulkPayload)
以上就是定時器實現主從同步,第二種實現主從同步的情況比較簡單。
每次master接收到客戶端指令都會調用call這個函數:
void call(redisClient *c, int flags) { ...... if (flags & REDIS_CALL_PROPAGATE) { int flags = REDIS_PROPAGATE_NONE; if (c->cmd->flags & REDIS_CMD_FORCE_REPLICATION) flags |= REDIS_PROPAGATE_REPL; if (dirty) flags |= (REDIS_PROPAGATE_REPL | REDIS_PROPAGATE_AOF); if (flags != REDIS_PROPAGATE_NONE) propagate(c->cmd,c->db->id,c->argv,c->argc,flags); } ...... }propagate就是實現第二種主從同步。
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, int flags) { if (server.aof_state != REDIS_AOF_OFF && flags & REDIS_PROPAGATE_AOF) feedAppendOnlyFile(cmd,dbid,argv,argc); if (flags & REDIS_PROPAGATE_REPL && listLength(server.slaves)) replicationFeedSlaves(server.slaves,dbid,argv,argc); }從函數代碼中可以看出,reiplicationFeedSlaves就是真正實現主從同步第二種方式的地方,這個函數也比較簡單,這里就列舉出來啦。
這里要提出一個問題:master服務的命令會同步給slave,但是如果slave服務發生變化,master并不會得到同步,這種情況怎么辦?還是slave只允許讀操作,而不進行寫操作,但是slave服務也可能是別的redis服務的master服務,這樣就感覺不合理了。為什么要讓slave服務又稱為master服務呢?
本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!