Redis 中的事件循環
在目前的很多服務中,由于需要持續接受客戶端或者用戶的輸入,所以需要一個事件循環來等待并處理外部事件,這篇文章主要會介紹 Redis 中的事件循環是如何處理事件的。
在文章中,我們會先從 Redis 的實現中分析事件是如何被處理的,然后用更具象化的方式了解服務中的不同模塊是如何交流的。
aeEventLoop
在分析具體代碼之前,先了解一下在事件處理中處于核心部分的 aeEventLoop 到底是什么:
reids-eventloop
aeEventLoop 在 Redis 就是負責保存待處理文件事件和時間事件的結構體,其中保存大量事件執行的上下文信息,同時持有三個事件數組:
- aeFileEvent
- aeTimeEvent
- aeFiredEvent
aeFileEvent 和 aeTimeEvent 中會存儲監聽的文件事件和時間事件,而最后的 aeFiredEvent 用于存儲待處理的文件事件,我們會在后面的章節中介紹它們是如何工作的。
Redis 服務中的 EventLoop
在 redis-server 啟動時,首先會初始化一些 redis 服務的配置,最后會調用 aeMain 函數陷入 aeEventLoop 循環中,等待外部事件的發生:
int main(int argc, char **argv) {
...
aeMain(server.el);
}
aeMain 函數其實就是一個封裝的 while 循環,循環中的代碼會一直運行直到 eventLoop 的 stop 被設置為 true :
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
aeProcessEvents(eventLoop, AE_ALL_EVENTS);
}
}
它會不停嘗試調用 aeProcessEvents 對可能存在的多種事件進行處理,而 aeProcessEvents 就是實際用于處理事件的函數:
int aeProcessEvents(aeEventLoop *eventLoop, int flags) {
int processed = 0, numevents;
if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
if (eventLoop->maxfd != -1 ||
((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
struct timeval *tvp;
#1:計算 I/O 多路復用的等待時間 tvp
numevents = aeApiPoll(eventLoop, tvp);
for (int j = 0; j < numevents; j++) {
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int rfired = 0;
if (fe->mask & mask & AE_READABLE) {
rfired = 1;
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
}
if (fe->mask & mask & AE_WRITABLE) {
if (!rfired || fe->wfileProc != fe->rfileProc)
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
}
processed++;
}
}
if (flags & AE_TIME_EVENTS) processed += processTimeEvents(eventLoop);
return processed;
}
上面的代碼省略了 I/O 多路復用函數的等待時間,不過不會影響我們對代碼的理解,整個方法大體由兩部分代碼組成,一部分處理文件事件,另一部分處理時間事件。
Redis 中會處理兩種事件:時間事件和文件事件。
文件事件
在一般情況下, aeProcessEvents 都會先 計算最近的時間事件發生所需要等待的時間 ,然后調用 aeApiPoll 方法在這段時間中等待事件的發生,在這段時間中如果發生了文件事件,就會優先處理文件事件,否則就會一直等待,直到最近的時間事件需要觸發:
numevents = aeApiPoll(eventLoop, tvp);
for (j = 0; j < numevents; j++) {
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int rfired = 0;
if (fe->mask & mask & AE_READABLE) {
rfired = 1;
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
}
if (fe->mask & mask & AE_WRITABLE) {
if (!rfired || fe->wfileProc != fe->rfileProc)
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
}
processed++;
}
文件事件如果綁定了對應的讀/寫事件,就會執行對應的對應的代碼,并傳入事件循環、文件描述符、數據以及掩碼:
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
其中 rfileProc 和 wfileProc 就是在文件事件被創建時傳入的函數指針:
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData) {
aeFileEvent *fe = &eventLoop->events[fd];
if (aeApiAddEvent(eventLoop, fd, mask) == -1)
return AE_ERR;
fe->mask |= mask;
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;
fe->clientData = clientData;
if (fd > eventLoop->maxfd)
eventLoop->maxfd = fd;
return AE_OK;
}
需要注意的是,傳入的 proc 函數會在對應的 mask 位事件發生時執行。
時間事件
在 Redis 中會發生兩種時間事件:
- 一種是定時事件,每隔一段時間會執行一次;
- 另一種是非定時事件,只會在某個時間點執行一次;
時間事件的處理在 processTimeEvents 中進行,我們會分三部分分析這個方法的實現:
static int processTimeEvents(aeEventLoop *eventLoop) {
int processed = 0;
aeTimeEvent *te, *prev;
long long maxId;
time_t now = time(NULL);
if (now < eventLoop->lastTime) {
te = eventLoop->timeEventHead;
while(te) {
te->when_sec = 0;
te = te->next;
}
}
eventLoop->lastTime = now;
由于對系統時間的調整會影響當前時間的獲取,進而影響時間事件的執行;如果系統時間先被設置到了未來的時間,又設置成正確的值,這就會導致 時間事件會隨機延遲一段時間執行 ,也就是說,時間事件不會按照預期的安排盡早執行,而 eventLoop 中的 lastTime 就是用于檢測上述情況的變量:
typedef struct aeEventLoop {
...
time_t lastTime; /* Used to detect system clock skew */
...
} aeEventLoop;
如果發現了系統時間被改變(小于上次 processTimeEvents 函數執行的開始時間),就會強制所有時間事件盡早執行。
prev = NULL;
te = eventLoop->timeEventHead;
maxId = eventLoop->timeEventNextId-1;
while(te) {
long now_sec, now_ms;
long long id;
if (te->id == AE_DELETED_EVENT_ID) {
aeTimeEvent *next = te->next;
if (prev == NULL)
eventLoop->timeEventHead = te->next;
else
prev->next = te->next;
if (te->finalizerProc)
te->finalizerProc(eventLoop, te->clientData);
zfree(te);
te = next;
continue;
}
Redis 處理時間事件時,不會在當前循環中直接移除不再需要執行的事件,而是會在當前循環中將時間事件的 id 設置為 AE_DELETED_EVENT_ID ,然后再下一個循環中刪除,并執行綁定的 finalizerProc 。
aeGetTime(&now_sec, &now_ms);
if (now_sec > te->when_sec ||
(now_sec == te->when_sec && now_ms >= te->when_ms))
{
int retval;
id = te->id;
retval = te->timeProc(eventLoop, id, te->clientData);
processed++;
if (retval != AE_NOMORE) {
aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
} else {
te->id = AE_DELETED_EVENT_ID;
}
}
prev = te;
te = te->next;
}
return processed;
}
在移除不需要執行的時間事件之后,我們就開始通過比較時間來判斷是否需要調用 timeProc 函數, timeProc 函數的返回值 retval 為時間事件執行的時間間隔:
- retval == AE_NOMORE :將時間事件的 id 設置為 AE_DELETED_EVENT_ID ,等待下次 aeProcessEvents 執行時將事件清除;
- retval != AE_NOMORE :修改當前時間事件的執行時間并重復利用當前的時間事件;
以使用 aeCreateTimeEvent 一個創建的簡單時間事件為例:
aeCreateTimeEvent(config.el,1,showThroughput,NULL,NULL)
時間事件對應的函數 showThroughput 在每次執行時會返回一個數字,也就是該事件發生的時間間隔:
int showThroughput(struct aeEventLoop *eventLoop, long long id, void *clientData) {
...
float dt = (float)(mstime()-config.start)/1000.0;
float rps = (float)config.requests_finished/dt;
printf("%s: %.2f\r", config.title, rps);
fflush(stdout);
return 250; /* every 250ms */
}
這樣就不需要重新 malloc 一塊相同大小的內存,提高了時間事件處理的性能,并減少了內存的使用量。
我們對 Redis 中對時間事件的處理以流程圖的形式簡單總結一下:
process-time-event
創建時間事件的方法實現其實非常簡單,在這里不想過多分析這個方法,唯一需要注意的就是時間事件的 id 跟數據庫中的大多數主鍵都是遞增的:
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
aeTimeProc *proc, void *clientData,
aeEventFinalizerProc *finalizerProc) {
long long id = eventLoop->timeEventNextId++;
aeTimeEvent *te;
te = zmalloc(sizeof(*te));
if (te == NULL) return AE_ERR;
te->id = id;
aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms);
te->timeProc = proc;
te->finalizerProc = finalizerProc;
te->clientData = clientData;
te->next = eventLoop->timeEventHead;
eventLoop->timeEventHead = te;
return id;
}
事件的處理
上一章節我們已經從代碼的角度對 Redis 中事件的處理有一定的了解,在這里,我想從更高的角度來觀察 Redis 對于事件的處理是怎么進行的。
整個 Redis 服務在啟動之后會陷入一個巨大的 while 循環,不停地執行 processEvents 方法處理文件事件 fe 和時間事件 te 。
有關 Redis 中的 I/O 多路復用模塊可以看這篇文章 Redis 和 I/O 多路復用 。
當文件事件觸發時會被標記為 “紅色” 交由 processEvents 方法處理,而時間事件的處理都會交給 processTimeEvents 這一子方法:
redis-eventloop-proces-event
在每個事件循環中 Redis 都會先處理文件事件,然后再處理時間事件直到整個循環停止, processEvents 和 processTimeEvents 作為 Redis 中發生事件的消費者,每次都會從“事件池”中拉去待處理的事件進行消費。
文件事件的處理
由于文件事件觸發條件較多,并且 OS 底層實現差異性較大,底層的 I/O 多路復用模塊使用了 eventLoop->aeFiredEvent 保存對應的文件描述符以及事件,將信息傳遞給上層進行處理,并抹平了底層實現的差異。
整個 I/O 多路復用模塊在事件循環看來就是一個輸入事件、輸出 aeFiredEvent 數組的一個黑箱:
eventloop-file-event-in-redis
在這個黑箱中,我們使用 aeCreateFileEvent 、 aeDeleteFileEvent 來添加刪除需要監聽的文件描述符以及事件。
在對應事件發生時,當前單元格會“變色”表示發生了可讀(黃色)或可寫(綠色)事件,調用 aeApiPoll 時會把對應的文件描述符和事件放入 aeFiredEvent 數組,并在 processEvents 方法中執行事件對應的回調。
時間事件的處理
時間事件的處理相比文件事件就容易多了,每次 processTimeEvents 方法調用時都會對整個 timeEventHead 數組進行遍歷:
process-time-events-in-redis
遍歷的過程中會將時間的觸發時間與當前時間比較,然后執行時間對應的 timeProc ,并根據 timeProc 的返回值修改當前事件的參數,并在下一個循環的遍歷中移除不再執行的時間事件。
總結
筆者對于文章中兩個模塊的展示順序考慮了比較久的時間,最后還是覺得,目前這樣的順序更易于理解。
Redis 對于事件的處理方式十分精巧,通過傳入函數指針以及返回值的方式,將時間事件移除的控制權交給了需要執行的處理器 timeProc ,在 processTimeEvents 設置 aeApiPoll 超時時間也十分巧妙,充分地利用了每一次事件循環,防止過多的無用的空轉,并且保證了該方法不會阻塞太長時間。
事件循環的機制并不能時間事件準確地在某一個時間點一定執行,往往會比實際約定處理的時間稍微晚一些。
Reference
來自:http://www.jianshu.com/p/85e540e54525