推ter開源庫Twemcache分析
Twemcache(發音: “two-em-cache”),是推ter公司使用的內存緩存技術,在2012.7.17號向github提交了推ter定制過的memcached,命名為twemcache;并在上周其官網首次出現了對twemcache的介紹。
與memcache相比,twemcache更輕便,定制性更強,無論從代碼結構還是模塊設計,都是很優雅的。
源代下載網址 git clone https://github.com/推ter/twemcache.git
官方特性:
支持完整memcached的ASCII協議
支持TCP,UDP,UNIX域套接字
可觀測性(stats / klogger)
可替換淘汰策略
簡單調試性(assertion / logging)
Twemcache代碼只有150 000行,其模塊組織結構十分清晰,如下表所示:
主程序 mc.c
線程模型 mc_thread.h / mc_thread.c
內存管理 mc_items.h / mc_items.c / mc_slabs.h / mc_slabs.c
存儲模型 mc_assoc.h / mc_assoc.c
網絡接口 mc_connection.h / mc_connection.c
命令解析 mc_ascii.h / mc_ascii.c
log相關 mc_log.h / mc_log.c / mc_klog.h / mc_klog.c
其它 mc_time.h / mc_queue.h / mc_hash.h / mc_cache.h / mc_util.h / mc_signal.h ……
Twemcache模塊分析
twemcache以服務器形式存在,可以接收來自TCP/UDP/UNIX域套接字的請求,默認的TCP/UDP端口都是11211。關于服務器的模型上,twemcache都使用了libevent庫來處理各種網絡IO事件,同時又使用了多線程來提升性能,異步與多線程結合都是使用的經典的網絡模型,因此網絡通信這一塊很通用,可以作為很好的服務器通信模塊。
線程模型
Twemcache中大量使用了多線程,將任務的分配明晰化,每樣任務由一個線程去完成。Aggregator線程負責采集twemcache的運行狀態,客戶端可以使用stats命令進行查詢,線程在初始化時啟動,包含獨立的事件域ag->base,并注冊了定時器事件,默認間隔100ms,每次定時器觸發,線程去采集運行數據;
Klogger線程負責打印log信息,這些log信息由log_XXX簇函數打印,線程在初始化時啟動,包含獨立事件域k->base,并注冊了定時器事件,默認間隔1000ms,每次定時器觸發,線程收集所有工作線程的log信息,并打印到指定描述符。這里的log打印使用了緩沖策略,每1000ms的信息緩存在線程的buffer中,并不進行打印,最終由klogger進行統一收集并打印。
主進程負責所有的初始化工作,擁有獨立的事件域main_base,注冊了定時器事件和網絡IO事件,其中定時器事件用于提供時間服務,程序中會大量用到當前時間,會導致大量的time()系統調用開銷,定時器事件提供了秒級的精確,避免了time()調用;網絡IO事件主要用于TCP/UNIX域的監聽listen,所有連接的處理都交由工作線程完成。
Worker線程負責連接的處理工作,線程在初始化時啟動,包含獨立事件域t->base,并注冊了管道IO事件和網絡IO事件。管道IO事件用于主進程通知其有連接到來,網絡IO事件用于處理到來的連接。
這里的多線程使用了線程池的概念,但工作的方式不再是用cond_signal,而是用管道IO事件代替,這是為了保證線程內也是非阻塞的,可以并發執行多個任務。這里沒有提及的是hashtable維護線程,它負責hashtable擴容時數據的遷移工作。

網絡模型
網絡通信twemcache將TCP/UDP/UNIX域進行統一,其中TCP和UNIX域套接字流程是一樣的,UDP因為不用建立連接則少了listen+accept的過程,下面以TCP和UDP進行說明。
TCP:與主進程相關聯的事件域是main_base,首先會向main_base中注冊一個listen事件,用于監聽連接的到達。當連接到來后,listen事件觸發并調用accept()接收,并為這個新的cli_fd分配一個新的連接標識conn,此時conn的事件域是main_base,然后選擇一個線程t去處理這個連接,這里主進程與線程間通信是通過管道來實現的,向選定的t->thread_send_fd發送一個字符。而工作線程都會監聽管道IO事件,事件觸發后會注冊連接conn的讀事件并變更conn的事件域為t->base,從而將一個連接交由線程去處理。
這里的模型與傳統模型一樣,主進程監聽,連接到來后交由線程去處理。每到來一個連接,就觸發一個線程的管道IO事件去處理連接,并且這里的線程選擇是用的RoundRobin算法,所有每個線程是均勻分配任務的。

UDP:UDP中只有一個接收fd,初始化時主進程會對每個工作線程tX的管道tX->thread_send_fd寫字符,從而觸發所有工作線程,注冊conn的讀事件[此時conn不代表一個完整的連接,只含有服務端信息],并變更conn的事件域為tX->base,這樣所有線程都監聽fd的報文。
這里看出線程策略上與TCP的不同,TCP是均勻的分配任務給線程,UDP則是啟動所有線程去監聽fd并競爭接收報文,任務的分配并不保證均勻。

存儲模型
為了解決內存malloc/free帶來的性能開銷,twemcache使用了slab來管理內存,具體如下圖所示:

slabclass是一個數組,每個槽對應不同大小的item大小,size表示該槽的item大小;需要slabclass數組的原因是item是變長的,預分配全部大小相同的item會造成大量碎片。從上至下,item的大小依次增加,每個slab是一組item的集合,slab的大小是固定的(slab_size),可以通過參數--slab_size配置;當一個slab用完后,新的分配請求要來,則分配一個新的slab,比較重要的屬性是free_item和nfree_itemq,free_item指向當前槽中第一個空閑的item,nfree_itemq是一個鏈表,所有使用過被刪除了的item會放入其中重復利用。
item是實際存儲的數據單元,因此這里slab管理以item為單位,當需要分配一個item時,并不直接malloc,而是從slabclass中取一個已經分配好的,因為每個槽代表了一種item大小且是有序的(遞增),可以用二分查找到最接近要分配item大小的槽,并從中直接獲取。
同時,slab是由heapinfo來進行管理的,nslab表示當前slab的數目,每分配一個新的slab(malloc了一塊內存),就會順序的插入到slab_table中,即slab_table[nslab++]=slab。在淘汰slab時使用的是LRU算法,heapinfo->slab_lruq維護了這樣一個slab的LRU鏈表,每次被修改過的slab會移到鏈表尾,淘汰時選擇鏈表頭。

item則是由item_lruq[]管理的,item_lruq是一個數組,與slabclass數組一一對應,即item_lruq[id]鏈接了slabclass[id]中被使用的item,當item需要進行淘汰時,采用的同樣是LRU算法。

上面都是內存的管理,item真正存儲的數據結構是hashtable,因為twemcache要實現的是{key, value}的映射。primary_hashtable就是存儲的數據結構,只有插入hashtable的item才算作被使用,需要被鏈到item_lruq中。而同時存在的old_hashtable則是當primary_hashtable需要進行擴容時使用的:當primary_hashtable中item數據超過1.5倍hashtable大小時,進行擴容,此時old_hashtable指向primary_hashtable,在擴容和遷移數據期間所有的item操作轉移到old_hashtable中,同時喚醒assoc_maintenance_thread進行數據遷移工作,將所有old_hashtable中的item遷移到擴容后的primary_hashtable中。

Slab分配策略
slab的分配策略是寫覆蓋,當有新的slab分配請求,不斷分配slab,直到達到上限max_nslab,此時slab不再重新分配,而是從已在使用的slab中找出一個淘汰掉,并作為此次的要用的slab,分配的策略是由slab_get()完成的,決策過程如下:
1. 分配新的slab,若失敗則2
2. 如設置LRU淘汰策略,則淘汰最近未使用的,若失敗則3
3. 如設置RANDOM淘汰策略,則隨機淘汰一個
slab_get()
大體上來說,slab_get_new()是決策1,slab_evict_lru()是決策2,slab_evict_rand()是決策3,經過決策后,如果分配到了新的slab,則重新初始化它,并添加到slab_table和slab_lruq中,這是由slab_add_one()完成的。下面分析下slab_evict_lru()淘汰規則。
slab = slab_get_new();
if (slab == NULL && (settings.evict_opt & EVICT_LS)) {
slab = slab_evict_lru(id);
}
if (slab == NULL && (settings.evict_opt & EVICT_RS)) {
slab = slab_evict_rand();
}
if (slab != NULL) {
stats_slab_settime(id, slab_new_ts, time_now());
slab_add_one(slab, id);
status = MC_OK;
}
slab_evict_lru()
所有使用的slab都會添加到slab_lurq中(即slab_lruq_head()),找到最近未使用的即是查找LRU鏈表中最靠前的slab且其refcount==0,為了避免遍歷鏈表的時間消耗,tries限制了至多遍歷slab_lruq的前SLAB_LRU_MAX_TRIES個元素,如果找到了這樣的slab,則掉用slab_evict_one()將它淘汰,淘汰包括將它從slab_lruq上刪除,其中所有item從item_lruq上刪除,從slabclass相應槽中刪除。
for (tries = SLAB_LRU_MAX_TRIES, slab = slab_lruq_head();
tries > 0 && slab != NULL;
tries--, slab = TAILQ_NEXT(slab, s_tqe)) {
if (slab->refcount == 0) {
break;
}
}
……
slab_evict_one(slab);
Item分配策略
當需要新的item時,會經過一組決策來決定新分配的item取自哪里,這組決策都是在_item_alloc()中完成的,決策過程如下:
1. 查找一個過時的item,如無則2
2. 查找一個slab上空閑的item,如無則3
a. 當前slab上有空閑item
b. 當前slab上沒有空閑item,分配新的slab
3. 淘汰一個item
_item_alloc()
大體來說,item_get_from_lruq()是決策1,slab_get_item()是決策2,item_reuse()是決策3。這里的uit是LRU上最近未使用的一個item,如果設置了EVICT_LRU即LRU淘汰策略的話,則在決策1和2未成功時執行3。除了決策2,其它兩個都是對不會再使用的item的復用,過程是先在item_lruq[id]中查找是否有已超時的,有則返回給it;沒有則試圖從slab上分配一個item,有則返回給it;沒有則試圖復用最近未使用的uit。下面分析item_get_from_lruq()和slab_get_item()。
it = item_get_from_lruq(id); / expired / unexpired lru item /if (it != NULL && item_expired(it)) { stats_slab_incr(id, item_expire); stats_slab_settime(id, item_expire_ts, it->exptime); item_reuse(it); goto done; }
uit = (settings.evict_opt & EVICT_LRU)? it : NULL; / keep if can be used / it = slab_get_item(id); if (it != NULL) { goto done; }
if (uit != NULL) { it = uit; stats_slab_incr(id, item_evict); stats_slab_settime(id, item_evict_ts, time_now());
item_reuse(it); goto done;}</pre>
item_get_from_lruq()
函數從item_lruq中查找已超時的item記錄為it,最近未使用的item記錄為uit。id是根據item的大小所對應的槽id,槽中所有使用的item都會鏈在item_lruq[id]上,遍歷item_lruq[id]上的item,如果it->refcount!=0則表示還在被使用,這樣的item不能復用,直接跳過;對于refcount==0的item,如果找到超時的,則直接返回它,在查找過程中,記錄第一個refcount==0并且未超時的item(即最近未使用),作為決策3淘汰的對象。
tries限制了遍歷的長度不能超過ITEM_LRUQ_MAX_TRIES,這樣節省了大量鏈表遍歷的時間,并且按LRU的性質,越靠近鏈表頭的元素越有可能作為淘汰對象,所有遍歷前ITEM_LRUQ_MAX_TRIES已經覆蓋了大部分情況。for (tries = ITEM_LRUQ_MAX_TRIES, it = TAILQ_FIRST(&item_lruq[id]), uit = NULL; it != NULL && tries > 0; tries--, it = TAILQ_NEXT(it, i_tqe)) {if (it->refcount != 0) { …… continue; } if (item_expired(it)) { return it; } else if (uit == NULL) { uit = it; }}</pre>
slab_get_item() -> _slab_get_item()
slab_get_item_from_freeq()從slabclass[id]中查找是否有空閑的item可用(即p->free_itemq),有則返回it,p->free_itemq上記錄的是使用過后被刪除的item;如果沒有這樣的item,則從p->free_item上取,它記錄還未使用過的item的首地址;如果沒有這樣的item,則表示當前slab已經用滿了,需要分配新的slab,slab_get()使用slab分配策略分配一個新的slab,此時slab中的item都未使用,都記錄到p->free_item中。最后從p->free_item中最得一個item返回就可以了。p = &slabclass[id]; it = slab_get_item_from_freeq(id); if (it != NULL) return it; if (p->free_item == NULL && (slab_get(id) != MC_OK)) { return NULL; } it = p->free_item; if (--p->nfree_item != 0) { p->free_item = (struct item *)(((uint8_t *)p->free_item) + p->size); } else { p->free_item = NULL; }
Hashtable策略
作為核心的存儲結構,twemcache使用的是鏈式哈希表,其主體由mc_assoc.c實現,hashtable初始大小為64K,在需要時進行擴容,在操作上與平時使用的hashtable并無差別,下面僅分析插入時assoc_insert()及擴容時assoc_expand()。
assoc_insert()
assoc_get_bucket()獲取當前需要插入的桶,里面封裝了對hashtable的選擇,在存儲模型里已經說明了primary_hashtable和old_hashtable的不同作用,當hashtable正在擴容時,expanding==1(并且expand_bucket小于hashtable大小),返回old_hashtable;否則返回primary_hashtable。SLIST_INSERT_HEAD將新的item插入到桶中,nhash_item表示hashtable中item的數目,當其達到hashtable大小的1.5倍時,調用assoc_expand()進行擴容。注意這里的插入操作不用去查找是否已有item存在,這里使用的策略是先刪除已存在的item,再插入新的item,所有查找已存在操作會存在于刪除操作中,不會存在于插入操作中。bucket = assoc_get_bucket(item_key(it), it->nkey); SLIST_INSERT_HEAD(bucket, it, h_sle); nhash_item++; if ((expanding == 0) && (nhash_item > (HASHSIZE(hash_power) * 3) / 2)) { assoc_expand(); }
assoc_expand()
函數進行hashtable的擴容,hash_power表示表大小的2次冪,當需要擴容時,hash_power + 1表示擴容一倍,old_hashtable指向primary_hashtable,primary_hashtable則指向新創建的hashtable,最近發送信號量給maintenance線程,這個線程一直等待在maintenance_cond信號量上,它負責將old_hashtable中的”所有”item插入到新的primary_hashtable。
這里要注意的是,在擴容期間,新的item會插入到old_hashtable,這樣不斷有item到來,擴容線程可能永遠也無法將item完全從old_hashtable遷移到primary_hashtable。這里使用了expland_bucket,它標識擴容了多少個桶,當expland_bucket > HASHSIZE(hash_power - 1)時(即超過了擴容前hashtable大小)時,這時新的item不再會插入到old_hashtable,而是插入到primary_hashtable,從而保證數據遷移一定可以在有限時間內完成。uint32_t hashtable_sz = HASHSIZE(hash_power + 1); old_hashtable = primary_hashtable; primary_hashtable = assoc_create_table(hashtable_sz); …… hash_power++; expanding = 1; expand_bucket = 0; pthread_cond_signal(&maintenance_cond);
狀態機
非阻塞自然會與狀態機相關聯,twemcache也使用了狀態機來結合epoll調用,狀態機的核心處理函數是core_drive_machine(),下面的所有狀態遷移入口都是以該函數為入口的,它的大致結構如下,每次循環結束代表一次事件處理完成,在一次事件中可能發生多個狀態遷移。while (!stop) { switch (c->state) { case CONN_LISTEN: …… case CONN_WAIT: …… case CONN_READ: …… case CONN_PARSE: …… case CONN_NEW_CMD: …… case CONN_NREAD: …… case CONN_SWALLOW: …… case CONN_WRITE: …… case CONN_MWRITE: …… case CONN_CLOSE: …… default: …… } }TCP/UNIX域和UDP的流程稍有不同,前者多了客戶端建立連接的過程,它們的流程圖如下所示,圖中用藍色虛線圈住的是一次連接的狀態轉移,在一個循環中,它們擁有相同的連接標識conn。
TCP狀態機
UDP狀態機
Twemcache實例分析
系統初始化core_init() // 初始化 core_loop() // 系統啟動
core_init()
下面是提取的core_init的核心代碼段,main_base是創建的主進程的事件域,assoc_init()初始化核心存儲結構hashtable,item_init()初始化了管理item的item_lruq,slab_init()決定了每個slabclass槽的item大小,并預分配的內存,time_init()則向main_base中注冊了定時器事件clockevent,為系統提供秒級的當前時間,thread_init()分配并啟動了線程模型中描述的各類線程。下面就重要的slab_init()和thread_init()詳細分析下。status = log_init(settings.verbose, settings.log_filename); status = signal_init(); pthread_mutex_init(&accept_lock, NULL); STAILQ_INIT(&listen_connq); main_base = event_base_new(); status = assoc_init(); conn_init(); item_init(); status = slab_init(); stats_init(); status = klog_init(); time_init(); status = thread_init(main_base);
slab_init()
執行兩步操作:slab_slabclass_init() / slab_heapinfo_init()。slab_slabclass_init()
它的作用是設置slabclass每個槽中item的大小,這里的nitem是slabclass中item的個數,item_sz是item的大小,free_itemq鏈接被刪除的item,nfree_item記錄空閑的item個數,free_item指向第一個空閑的item。這里決定item大小很重要的因素是profile => settings.profile,它記錄了每個槽item的大小,在mc_generate_profile()中設置。for (id = SLABCLASS_MIN_ID; id <= slabclass_max_id; id++) { struct slabclass p; / slabclass / uint32_t nitem; / # item per slabclass / size_t item_sz; / item size */nitem = slab_size() / profile[id]; item_sz = profile[id]; p = &slabclass[id]; p->nitem = nitem; p->size = item_sz; p->nfree_itemq = 0; TAILQ_INIT(&p->free_itemq); p->nfree_item = 0; p->free_item = NULL;}</pre>
slab_heapinfo_init()
nslab表示當前分配的slab,max_nslab表示最多能分配的slab,base表示slab內存的基址,如果是預分配策略的話,則一次性全部分配,否則則在每次需要時分配slab;curr表示當前指向的slab,slab_table記錄所有分配使用的slab,slab_lruq鏈接所有分配使用的slab,并在需要時用LRU算法進行淘汰。heapinfo.nslab = 0; heapinfo.max_nslab = settings.maxbytes / settings.slab_size;heapinfo.base = NULL; if (settings.prealloc) { heapinfo.base = mc_alloc(heapinfo.max_nslab settings.slab_size); ...... } heapinfo.curr = heapinfo.base; heapinfo.slab_table = mc_alloc(sizeof(heapinfo.slab_table) heapinfo.max_nslab); .... TAILQ_INIT(&heapinfo.slab_lruq);</pre>
sizeof(*threads)即為每id個線程。</span></p>
thread_init()
分配線程,nworkers代表工作線程的數目,1是主進程,即這里的dispatcher。以后在使用線程時,threads + idthreads = mc_zalloc(sizeof(*threads) * (1 + nworkers)); if (threads == NULL) { return MC_ENOMEM; } dispatcher = &threads[nworkers];對于每個工作線程,建立一個管道,fds[0]用于工作線程接收來自主進程的數據,fds[1]用于主進程向工作線程發送數據(這里的數據只做信號作用),thread_setup()則為每個事件創建一個獨立的事件域t->base,并在t->base中注冊了管道IO事件,監聽fds[0]的讀事件,讀事件觸發則執行thread_libevent_process(),它負責完成由主進程轉來的客戶端連接conn。
for (i = 0; i < nworkers; i++) { int fds[2]; status = pipe(fds); if (status < 0) { log_error("pipe failed: %s", strerror(errno)); return status; } threads[i].notify_receive_fd = fds[0]; threads[i].notify_send_fd = fds[1]; status = thread_setup(&threads[i]); if (status != MC_OK) { return status; } }然后啟動所有工作線程,thread_worker_main執行的操作很簡單 – 開始事件的監聽(event_base_loop)。
for (i = 0; i < nworkers; i++) { status = thread_create(thread_worker_main, &threads[i]); if (status != MC_OK) { return status; } }最后,還會設置和啟動aggregator線程和klogger線程,在線程模型中已有描述,兩個線程都有獨立的事件域,并在其上注冊了定時器事件,前者用于采集狀態數據,后者用于輸出log信息,啟動線程后執行的操作與工作線程一樣 – 開始事件的監聽event_base_dispatch()。
status = thread_setup_aggregator(); status = thread_create(thread_aggregator_main, NULL); status = thread_setup_klogger(); status = thread_create(thread_klogger_main, NULL);
core_loop()
core_create_socket創建服務器的套接字,然后event_base_loop()開始監聽事件,下面詳細分析core_create_socket()。status = core_create_socket(); event_base_loop(main_base, 0);
core_create_socket() -> core_create_inet_socket()
如果是udp端口,則沒有listen()和accept()的過程,thread_dispatch()向每個工作線程的管道寫入字符,觸發工作線程執行連接sd的監聽事件,競爭地讀取客戶端發往sd的請求報文。
如果是tcp端口,conn_set_event()向主進程main_base中注冊sd的監聽事件,當sd有連接到來由主進程經過accept()后再交由指定的線程去處理。
這里的conn代表了一個連接的標識,用完的conn會放入free_connq中,當下次需要conn時就不用重新分配內存了,而會直接從free_connq中復用。if (udp) { int c; for (c = 0; c < settings.num_workers; c++) { status = thread_dispatch(sd, CONN_READ, EV_READ | EV_PERSIST, 1); ...... } } else { conn = conn_get(sd, CONN_LISTEN, EV_READ | EV_PERSIST, 1, 0); ...... STAILQ_INSERT_HEAD(&listen_connq, conn, c_tqe);status = conn_set_event(conn, main_base); ......}</pre>
實例 [TCP連接,客戶端請求”set foo bar”]
CONN_LISTEN -> CONN_NEW_CMD
當客戶端連接到達后,c->sd監聽事件觸發,調用core_event_handler() -> core_accept()接收客戶端連接。
core_accept()
它的核心代碼如下,accept()完成與客戶端的三次握手建立連接,返回socket sd,然后主進程將這個連接sd交由一個工作線程去處理,這是由thread_dispatch()完成的。sd = accept(c->sd, NULL, NULL); ...... status = thread_dispatch(sd, CONN_NEW_CMD, EV_READ | EV_PERSIST, 0);
thread_dispatch()
conn_get()獲取一個連接conn并將它初始化為本次連接的標識,接下來tid是選擇要處理這個連接的線程,選擇的算法是Round Robin,即每次循環遞增一個id號,然后將連接標識c壓入選定的線程t->new_cq中,它存儲了線程要處理的所有連接,最后向t->notify_send_fd寫一個字符,觸發工作線程t的管道IO事件,讓其處理新的連接。此時,連接由主進程由給了工作線程t,表現在事件域發生了變更,接下來c的處理都在工作線程中,直到連接關閉。c = conn_get(sd, state, ev_flags, rsize, udp); ...... tid = (last_thread + 1) % settings.num_workers; t = threads + tid; last_thread = tid;conn_cq_push(&t->new_cq, c); n = write(t->notify_send_fd, "", 1);</pre>
CONN_NEW_CMD -> CONN_WAIT
nreqs是一次事件中,能處理的最大請求數目,避免工作線程被某個連接完全占用,core_reset_cmd_handler()會重新初始化連接相關的數據如req_type, item等,最后設置狀態為CONN_WAIT。--nreqs; if (nreqs >= 0) { core_reset_cmd_handler(c); }
CONN_WAIT -> CONN_READ
更新事件為監聽可讀事件,并設置狀態為CONN_READ,stop是個標志,所有的狀態遷移在一個while(!stop)循環中,只要stop未設為true,則這次狀態遷移還要繼續,只有當stop為true時才代表一次處理完成,重新回到epoll進入監聽狀態。status = core_update(c, EV_READ | EV_PERSIST); if (status != MC_OK) { log_error("update on c %d failed: %s", c->sd, strerror(errno)); conn_set_state(c, CONN_CLOSE); break; } conn_set_state(c, CONN_READ); stop = true;
CONN_READ -> CONN_PARSE
狀態CONN_READ作用是完成客戶端命令讀取。假設是TCP連接,core_read()調用core_read_tcp()完成命令讀取,并根據讀取結果設置連接狀態,讀取完整會設為CONN_PARSE狀態。case CONN_READ: core_read(c); break;
core_read_tcp()
c代表了客戶端連接,讀取數據到c->rbuf中,根據返回值會有三種情況:
1. n<0&&(errno==EGAIN||errno==EWOULDBLOCK) 連接不可讀,返回等待下次讀取
2. 03. n==size 數據占滿了c->rbuf,但仍未讀完,重新分配rbuf大小,并再次讀取數據到c->rbuf中,直到讀取完成。 for (;;) { ...... size = c->rsize - c->rbytes; n = read(c->sd, c->rbuf + c->rbytes, size); if (n > 0) { stats_thread_incr_by(data_read, n); gotdata = READ_DATA_RECEIVED; c->rbytes += n; if (n == size) { continue; } else { break; } } …… if (errno == EAGAIN || errno == EWOULDBLOCK) { log_debug(LOG_VERB, "recv on c %d not ready - eagain", c->sd); break; } }
CONN_PARSE -> CONN_NREAD
狀態CONN_PARSE作用是完成客戶端命令的分析(命令的分析并不包括附帶的數據),調用core_parse()完成[假設客戶端命令是”set foo bar”]。
core_parse() -> asc_parse() -> asc_dispatch() -> asc_process_update()
asc_process_update()
之前的函數對命令進行了解析,假設客戶端命令是”set foo bar”,則收到數據與解析后的結果如圖所示(其中0x20是空格,0x0D 0x0A是回車換行符):
item_alloc()按前面的item分配策略為本次命令分配了一個item – it,并設置了c->ritem指向item的數據(即value),rlbytes表示仍未讀取的命令長部,即數據部分(“bar”),最后設置狀態為CONN_NREAD。
it = item_alloc(key, nkey, flags, time_reltime(exptime), vlen); ...... c->item = it; c->ritem = item_data(it); c->rlbytes = it->nbyte; conn_set_state(c, CONN_NREAD);
CONN_NREAD -> CONN_WRITE
狀態CONN_NREAD完成命令數據部分的分析,這個狀態至少要循環兩次,前面幾次將c->rlbytes(數據部分長度)讀入到c->ritem中,這部分數據可能在CONN_READ時已讀入到c->rbuf中,那么此時c->rbytes > 0,直接從緩沖區取這部分數據就可以了,即第二個if語句段;這部分數據可能還沒有讀取,那么調用read()從c->sd中讀取。讀取的數據放到c->ritem即數據區,并更新c->rlbytes,它表示數據部分還有多少字節未讀取,當讀取完后最后一次進入循環,c->rlbytes == 0,此時調用core_complete_nread()完成數據部分的存儲,下面分析這個函數。if (c->rlbytes == 0) { core_complete_nread(c); break; }if (c->rbytes > 0) { int tocopy = c->rbytes > c->rlbytes ? c->rlbytes : c->rbytes; if (c->ritem != c->rcurr) { memmove(c->ritem, c->rcurr, tocopy); } c->ritem += tocopy; c->rlbytes -= tocopy; c->rcurr += tocopy; c->rbytes -= tocopy; if (c->rlbytes == 0) { break; } }
n = read(c->sd, c->ritem, c->rlbytes); if (n > 0) { stats_thread_incr_by(data_read, n); if (c->rcurr == c->ritem) { c->rcurr += n; } c->ritem += n; c->rlbytes -= n; break; }</pre>
core_complete_nread() -> asc_complete_nread()
item_store()將讀取的數據部分(即value)存入相應的item,根據返回值,成功存入則返回STORED,執行asc_write_stored()將回送信息”STORED”寫入連接c的發送緩沖區c->wbuf,并設置狀態為CONN_WRITE,c->write_and_go = CONN_NEW_CMD,這個變量指示了CONN_WRITE狀態后要遷移到哪個狀態。至此,這次請求對item的使用已經完成了,調用item_remove()減小計數,因為item還鏈在item_lruq上,所以不并實際刪除,然后c->item = NULL,表示此次請求不再使用item。下面詳細分析item的value存儲函數item_store()。ret = item_store(it, c->req_type, c); switch (ret) { case STORED: asc_write_stored(c); break; ...... } ...... item_remove(c->item); c->item = NULL;
item_store() -> _item_store()
在存儲模型中已經描述,item最終存儲的數據結構是hashtable,_item_get()從hashtable中以鍵值key(即foo)查找相應的item。
如果沒有找到item,則是首次插入,調用_item_link()將它插入到hashtable中;
如果找到item,則調用_item_replace()替代之前的item。key = item_key(it); nit = NULL; oit = _item_get(key, it->nkey); …… if (result == NOT_STORED && store_it) { if (oit != NULL) { _item_replace(oit, it); } else { _item_link(it); } result = STORED; }此例中是第一次set foo,_item_get()會返回NULL,最終調用_item_link(),這個函數很簡單,更改它的flags |= ITEM_LINKED表示被鏈接,assoc_insert()將這個item插入到hashtable中,item_link_q()將item鏈到item_lruq上。
it->flags |= ITEM_LINKED; item_set_cas(it, item_next_cas()); assoc_insert(it); item_link_q(it);
CONN_WRITE -> CONN_NEW_CMD
CONN_WRITE狀態完成客戶端的應答,應答內容在CONN_NREAD狀態下已經寫入到c->wbuf中了,首先調用conn_add_iov()將c->wbuf中的內容組裝成msgbuf的形式。if (c->iov_used == 0 || (c->udp && c->iov_used == 1)) { status = conn_add_iov(c, c->wcurr, c->wbytes); ...... }然后由core_transmit()完成內容的發送,發送成功會返回TRANSMIT_COMPLETE(至少需要兩次循環,同CONN_NREAD),因為此時c->state為CONN_WRITE,變遷狀態至c->write_and_go(即CONN_NEW_CMD),從而完成了這一次請求。當然,core_transmit()也會失敗,最大可能是因為socket當時并不可寫,寫socket的時機并不是由epoll的寫事件觸發的,這種情況下會返回TRANSMIT_SOFT_ERR,它置stop=true,表示此次事件處理完成,等待socket的可寫事件到達。下面分析core_transmit()函數。
switch (core_transmit(c)) { case TRANSMIT_COMPLETE: if (c->state == CONN_MWRITE) { ...... conn_set_state(c, CONN_NEW_CMD); } else if (c->state == CONN_WRITE) { if (c->write_and_free) { mc_free(c->write_and_free); c->write_and_free = 0; } conn_set_state(c, c->write_and_go); } else { log_debug(LOG_INFO, "unexpected state %d", c->state); conn_set_state(c, CONN_CLOSE); } break; case TRANSMIT_INCOMPLETE: case TRANSMIT_HARD_ERROR: break; case TRANSMIT_SOFT_ERROR: stop = true; break; }
core_transmit()
msg_curr和msg_used對比表示是否還有數據需要發送,沒有時返回TRANSMIT_COMPLETE;仍有數據則調用sendmsg()進行發送,res > 0表示發送成功,此時返回TRANSMIT_INCOMPLETE,這和CONN_NREAD狀態下讀取數據的做法是一樣的,至少需要兩次core_transmit,在發送完后最后一次進入會返回TRANSMIT_COMPLETE;res == -1及errno判斷表示c->sd此時并不可寫,我們是在讀事件觸發后直接寫socket,不可寫則core_update()更新c->sd上的監聽事件為寫事件,并返回TRANSMIT_SOFT_ERROR,它會導致此次事件處理結束,結果就是等待寫事件的到來。if (c->msg_curr < c->msg_used) { …… res = sendmsg(c->sd, m, 0); if (res > 0) { ...... return TRANSMIT_INCOMPLETE; } if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) { status = core_update(c, EV_WRITE | EV_PERSIST); if (status != MC_OK) { log_error("update on c %d failed: %s", c->sd, strerror(errno)); conn_set_state(c, CONN_CLOSE); return TRANSMIT_HARD_ERROR; } return TRANSMIT_SOFT_ERROR; } } else { return TRANSMIT_COMPLETE; }
CONN_NEW_CMD
這個狀態代表了該連接上可以接受下一個請求了,即一次客戶端請求結束。由上面的分析可見,所有的狀態轉移都是在core_dirve_machine()函數中完成的,并且并不是每個狀態對應一個事件,twemcache對狀態的劃分是按功能來的,比如在讀事件中就會完成讀數據、分析數據兩個功能,下面的圖表示了各狀態執行時所處的事件:
轉自:http://blog.csdn.net/qy532846454/article/details/7899780



