redis rdb 的常用接口了
這是比較大的一塊。顧名思義,就是rdb的常用接口了。
接口如下:
// rdb沒有little endian和big endian的區別(除了最后的checksum),所以rdb文件在不同系統中可能不能移植
// type只占一個字節
int rdbSaveType(rio* rdb, unsigned char type);
int rdbLoadType(rio* rdb);
// 這個只有聲明,沒有定義,不用去管
int rdbSaveTime(rio* rdb, time_t t);
time_t rdbLoadTime(rio* rdb);
int rdbSaveLen(rio* rdb, uint32_t len);
uint32_t rdbLoadLen(rio* rdb, int* isencoded);
int rdbSaveObjectType(rio* rdb, robj* o);
int rdbLoadObjectType(rio* rdb);
int rdbLoad(char* filename);
int rdbSaveBackground(char* filename);
void rdbRemoveTempFile(pid_t childpid);
int rdbSave(char* filename);
int rdbSaveObject(rio* rdb, robj* o);
off_t rdbSavedObjectLen(robj* o);
off_t rdbSavedObjectPages(robj* o);
robj* rdbLoadObject(int type, rio* rdb);
void backgroundSaveDoneHandler(int exitcode, int bysignal);
int rdbSaveKeyValuePair(rio* rdb, robj* key, robj* val, long long expiretime, long long now);
robj* rdbLoadStringObject(rio* rdb);
redis做了很多減少內存使用的工作,其中最常使用的就是對長度編碼,長度域最高兩比特(MSB)00表示長度值為后面的6比特表示,01表示14bit,10表示32bit,11表示后面的6bit為而不是長度。
目前看到rdbLoad,基本還是比較清晰的,除了一部分是在redis.h中的不太清楚之外。
所謂robj是這個樣子的:
typedef struct redisObject {
unsigned type:4;
unsigned notused:2;
unsigned encoding:4;
unsigned lru:22; // server.lrulock
int refcount;
void* ptr;
} robj;
一個type可以有多種encoding,如string可以用整數編碼(如string代表的是整數)或raw。refcount是引用計數,共享對象(0-10000的整數字符串引用計數可能大于1),變為0時這個對象就要回收了。直接貼rdbLoad代碼,幾乎沒什么特別的:
int rdbLoad(char* filename) {
uint32_t dbid;
int type, rdbver;
redisDb* db = server.db + 0;
char buf[1024];
long long expiretime, now = mstime();
long loops = 0;
FILE* fp;
rio rdb;
if ((fp = fopen(filename, "r")) == NULL) return REDIS_ERR;
rioInitWithFile(&rdb, fp);
// 如果打開了checksum選項,rio就需要記錄crc64.
if (server.rdb_checksum) {
rdb.update_cksum = rioGenericChecksum;
}
if (rioRead(&rdb, buf, 9) == 0) goto eoferr;
buf[9] = '\0';
if (memcmp(buf, "REDIS", 5) != 0) {
fclose(fp);
redisLog(REDIS_WARNING, "Wrong signature trying to load DB from file");
errno = EINVAL;
return REDIS_ERR;
}
rdbver = atoi(buf + 5);
if (rdbver < 1 || rdbver > REDIS_RDB_VERSION) {
fclose(fp);
redisLog(REDIS_WARNING, "Can't handle RDB format version %d", rdbver);
errno = EINVAL;
return REDIS_ERR;
}
startLoading(fp);
while (1) {
robj* key, *val;
expiretime = -1;
// serve the clients from time to time.
if (!(loops++ % 1000)) {
loadingProgress(rioTell(&rdb));
aeProcessEvents(server.el, AE_FILE_EVENTS | AE_DONT_WAIT);
}
// read type.
if ((type = rdbLoadType(&rdb)) == -1) goto eoferr;
if (type == REDIS_RDB_OPCODE_EXIPRETIME) {
if ((expiretime = rdbLoadTime(&rdb)) == -1) goto eoferr; // 32位
// we read the time so we need to read the object type again.
if ((type = rdbLoadType(&rdb)) == -1) goto eoferr;
expiretime *= 1000;
} else if (type == REDIS_RDB_OPCODE_EXPIRETIME_MS) {
if ((exiretime = rdbLoadMillisecondTime(&rdb)) == -1) goto eoferr; // 64位
if ((type = rdbLoadType(&rdb)) == -1) goto eoferr;
}
if (type == REDIS_RDB_OPCODE_EOF) {
break;
}
if (type == REDIS_RDB_OPCODE_SELECTDB) {
if ((dbid = rdbLoadLen(&rdb, NULL)) == REDIS_RDB_LENERR) {
goto eoferr;
}
if (dbid >= (unsigned)server.dbnum) {
redisLog(REDIS_WARNING, "FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
exit(1);
}
db = server.db + dbid;
continue;
}
// Read key
// 雖說是加載字符串,但也可能以整數編碼
if ((key = rdbLoadStringObject(&rdb)) == NULL) goto eoferr;
if ((val = rdbLoadObject(type, &rdb)) == NULL) goto eoferr;
// Check if the key already exipired. This function is used when loading an RDB file from disk, either at startup, or when an RDB was received from the master. In the latter case, the master is responsible for key expiry. If we could expire keys here, the snapshot taken by the master may not be reflected on the slave.
if (server.masterhost == NULL && expiretime != -1 && expirtetime < now) {
decrRefCount(key);
decrRefCount(val);
continue;
}
dbAdd(db, key, val);
if (expiretime != -1) setExpire(db, key, expiretime);
decrRefCount(key);
}
// Verify the checksum if RDB version is >= 5
if (rdbver >= 5 && server.rdb_checksum) {
uint64_t cksum, expected = rdb.cksum;
if (rioRead(&rdb, &cksum, 8) == 0) goto eoferr;
memrevifbe(&cksum);
if (cksum == 0) {
redisLog(REDIS_WARNING, "RDB file was saved with checksum disabled: no check performed.");
} else if (cksum != expected) {
redisLog(REDIS_WARNING, "Wrong RDB checkusm. Aborting now.");
exit(1);
}
}
fclose(fp);
stopLoading();
return REDIS_OK;
eoferr:
redisLog(REDIS_WARNING, "Short read or OOM loading DB. Unrecoverable error, aborting now.");
exit(1);
return REDIS_ERR;
}
其中大頭是rdbLoadObject,對各種對象進行加載,可以看到加載時一般優先使用ziplist,intset這樣省內存的結構,只有元素數量足夠大或者其中一個元素較大時才使用常規的list,dict結構。set也是dict結構,只不過其中的value指向NULL。還有一些結構如 zipmap等好像也是list,dict等的衍生(zipmap好像是以ziplist形式保存key,value),也有的結構涉及到跳躍表,要等看 redis.h和redis.c時可以完全弄清楚。
可以看到,這里面字符串有兩種加載方式:
// rdb沒有little endian和big endian的區別(除了最后的checksum),所以rdb文件在不同系統中可能不能移植 // type只占一個字節 int rdbSaveType(rio* rdb, unsigned char type); int rdbLoadType(rio* rdb); // 這個只有聲明,沒有定義,不用去管 int rdbSaveTime(rio* rdb, time_t t); time_t rdbLoadTime(rio* rdb); int rdbSaveLen(rio* rdb, uint32_t len); uint32_t rdbLoadLen(rio* rdb, int* isencoded); int rdbSaveObjectType(rio* rdb, robj* o); int rdbLoadObjectType(rio* rdb); int rdbLoad(char* filename); int rdbSaveBackground(char* filename); void rdbRemoveTempFile(pid_t childpid); int rdbSave(char* filename); int rdbSaveObject(rio* rdb, robj* o); off_t rdbSavedObjectLen(robj* o); off_t rdbSavedObjectPages(robj* o); robj* rdbLoadObject(int type, rio* rdb); void backgroundSaveDoneHandler(int exitcode, int bysignal); int rdbSaveKeyValuePair(rio* rdb, robj* key, robj* val, long long expiretime, long long now); robj* rdbLoadStringObject(rio* rdb); redis做了很多減少內存使用的工作,其中最常使用的就是對長度編碼,長度域最高兩比特(MSB)00表示長度值為后面的6比特表示,01表示14bit,10表示32bit,11表示后面的6bit為而不是長度。 目前看到rdbLoad,基本還是比較清晰的,除了一部分是在redis.h中的不太清楚之外。 所謂robj是這個樣子的: typedef struct redisObject { unsigned type:4; unsigned notused:2; unsigned encoding:4; unsigned lru:22; // server.lrulock int refcount; void* ptr; } robj; 一個type可以有多種encoding,如string可以用整數編碼(如string代表的是整數)或raw。refcount是引用計數,共 享對象(0-10000的整數字符串引用計數可能大于1),變為0時這個對象就要回收了。直接貼rdbLoad代碼,幾乎沒什么特別的: int rdbLoad(char* filename) { uint32_t dbid; int type, rdbver; redisDb* db = server.db + 0; char buf[1024]; long long expiretime, now = mstime(); long loops = 0; FILE* fp; rio rdb; if ((fp = fopen(filename, "r")) == NULL) return REDIS_ERR; rioInitWithFile(&rdb, fp); // 如果打開了checksum選項,rio就需要記錄crc64. if (server.rdb_checksum) { rdb.update_cksum = rioGenericChecksum; } if (rioRead(&rdb, buf, 9) == 0) goto eoferr; buf[9] = '\0'; if (memcmp(buf, "REDIS", 5) != 0) { fclose(fp); redisLog(REDIS_WARNING, "Wrong signature trying to load DB from file"); errno = EINVAL; return REDIS_ERR; } rdbver = atoi(buf + 5); if (rdbver < 1 || rdbver > REDIS_RDB_VERSION) { fclose(fp); redisLog(REDIS_WARNING, "Can't handle RDB format version %d", rdbver); errno = EINVAL; return REDIS_ERR; } startLoading(fp); while (1) { robj* key, *val; expiretime = -1; // serve the clients from time to time. if (!(loops++ % 1000)) { loadingProgress(rioTell(&rdb)); aeProcessEvents(server.el, AE_FILE_EVENTS | AE_DONT_WAIT); } // read type. if ((type = rdbLoadType(&rdb)) == -1) goto eoferr; if (type == REDIS_RDB_OPCODE_EXIPRETIME) { if ((expiretime = rdbLoadTime(&rdb)) == -1) goto eoferr; // 32位 // we read the time so we need to read the object type again. if ((type = rdbLoadType(&rdb)) == -1) goto eoferr; expiretime *= 1000; } else if (type == REDIS_RDB_OPCODE_EXPIRETIME_MS) { if ((exiretime = rdbLoadMillisecondTime(&rdb)) == -1) goto eoferr; // 64位 if ((type = rdbLoadType(&rdb)) == -1) goto eoferr; } if (type == REDIS_RDB_OPCODE_EOF) { break; } if (type == REDIS_RDB_OPCODE_SELECTDB) { if ((dbid = rdbLoadLen(&rdb, NULL)) == REDIS_RDB_LENERR) { goto eoferr; } if (dbid >= (unsigned)server.dbnum) { redisLog(REDIS_WARNING, "FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum); exit(1); } db = server.db + dbid; continue; } // Read key // 雖說是加載字符串,但也可能以整數編碼 if ((key = rdbLoadStringObject(&rdb)) == NULL) goto eoferr; if ((val = rdbLoadObject(type, &rdb)) == NULL) goto eoferr; // Check if the key already exipired. This function is used when loading an RDB file from disk, either at startup, or when an RDB was received from the master. In the latter case, the master is responsible for key expiry. If we could expire keys here, the snapshot taken by the master may not be reflected on the slave. if (server.masterhost == NULL && expiretime != -1 && expirtetime < now) { decrRefCount(key); decrRefCount(val); continue; } dbAdd(db, key, val); if (expiretime != -1) setExpire(db, key, expiretime); decrRefCount(key); } // Verify the checksum if RDB version is >= 5 if (rdbver >= 5 && server.rdb_checksum) { uint64_t cksum, expected = rdb.cksum; if (rioRead(&rdb, &cksum, 8) == 0) goto eoferr; memrevifbe(&cksum); if (cksum == 0) { redisLog(REDIS_WARNING, "RDB file was saved with checksum disabled: no check performed."); } else if (cksum != expected) { redisLog(REDIS_WARNING, "Wrong RDB checkusm. Aborting now."); exit(1); } } fclose(fp); stopLoading(); return REDIS_OK; eoferr: redisLog(REDIS_WARNING, "Short read or OOM loading DB. Unrecoverable error, aborting now."); exit(1); return REDIS_ERR; }
rdbLoadEncodedStringObject:如果字符串是數字,就使用數字編碼,并允許使用共享integer編碼對象。
rdbLoadStringObject:即使字符串是整數,也仍然要使用string編碼。對于db中的key是這樣的(其實如果沒有maxmemory和lru的設定,也大可使用rdbLoadEncodedStringObject,這應該是作者沒優化的地方)。
介于中間的是,rdbLoadStringObject之后調用tryObjectEncoding,這個方法會盡量使用integer編碼,還檢查是否有maxmemory和lru設定,沒有的話可以使用共享integer編碼對象。
rdbSave保存rdb文件,可以看出它是先寫到一個臨時文件,寫成功之后再rename到指定的文件名。
rdbBgsave非阻塞第做保存工作,它是fork出一個子進程做這個工作(按照一般的思路,可能就要用線程之類的),這種做法簡潔,避免了多線程內存共享的很多問題。
void backgroundSaveDoneHandler(int exitcode, int bysignal)是bgsave完成之后調用的函數,它會調用replication.c中的updateSlavesWaitingBgsave,需要到時留意其功能。
代碼看到現在,發現現在幾個文件是糾結在一起的,似乎共用一個redis.h。