推ter開源庫Twemcache分析

openkk 13年前發布 | 23K 次閱讀 緩存服務器 twemcache

      Twemcache(發音: “two-em-cache”),是推ter公司使用的內存緩存技術,在2012.7.17號向github提交了推ter定制過的memcached,命名為twemcache;并在上周其官網首次出現了對twemcache的介紹。
      與memcache相比,twemcache更輕便,定制性更強,無論從代碼結構還是模塊設計,都是很優雅的。
      源代下載網址 git clone
https://github.com/推ter/twemcache.git
      官方特性:
            支持完整memcached的ASCII協議
            支持TCP,UDP,UNIX域套接字
            可觀測性(stats / klogger)
            可替換淘汰策略
            簡單調試性(assertion / logging)

      Twemcache代碼只有150 000行,其模塊組織結構十分清晰,如下表所示:
            主程序    mc.c
            線程模型 mc_thread.h / mc_thread.c
            內存管理 mc_items.h / mc_items.c / mc_slabs.h / mc_slabs.c
            存儲模型 mc_assoc.h / mc_assoc.c
            網絡接口 mc_connection.h / mc_connection.c
            命令解析 mc_ascii.h / mc_ascii.c
            log相關   mc_log.h / mc_log.c / mc_klog.h / mc_klog.c
            其它        mc_time.h / mc_queue.h / mc_hash.h / mc_cache.h / mc_util.h / mc_signal.h ……
 
Twemcache模塊分析
      twemcache以服務器形式存在,可以接收來自TCP/UDP/UNIX域套接字的請求,默認的TCP/UDP端口都是11211。關于服務器的模型上,twemcache都使用了libevent庫來處理各種網絡IO事件,同時又使用了多線程來提升性能,異步與多線程結合都是使用的經典的網絡模型,因此網絡通信這一塊很通用,可以作為很好的服務器通信模塊。

線程模型
      Twemcache中大量使用了多線程,將任務的分配明晰化,每樣任務由一個線程去完成。Aggregator線程負責采集twemcache的運行狀態,客戶端可以使用stats命令進行查詢,線程在初始化時啟動,包含獨立的事件域ag->base,并注冊了定時器事件,默認間隔100ms,每次定時器觸發,線程去采集運行數據;
      Klogger線程負責打印log信息,這些log信息由log_XXX簇函數打印,線程在初始化時啟動,包含獨立事件域k->base,并注冊了定時器事件,默認間隔1000ms,每次定時器觸發,線程收集所有工作線程的log信息,并打印到指定描述符。這里的log打印使用了緩沖策略,每1000ms的信息緩存在線程的buffer中,并不進行打印,最終由klogger進行統一收集并打印。
      主進程負責所有的初始化工作,擁有獨立的事件域main_base,注冊了定時器事件和網絡IO事件,其中定時器事件用于提供時間服務,程序中會大量用到當前時間,會導致大量的time()系統調用開銷,定時器事件提供了秒級的精確,避免了time()調用;網絡IO事件主要用于TCP/UNIX域的監聽listen,所有連接的處理都交由工作線程完成。
      Worker線程負責連接的處理工作,線程在初始化時啟動,包含獨立事件域t->base,并注冊了管道IO事件和網絡IO事件。管道IO事件用于主進程通知其有連接到來,網絡IO事件用于處理到來的連接。
      這里的多線程使用了線程池的概念,但工作的方式不再是用cond_signal,而是用管道IO事件代替,這是為了保證線程內也是非阻塞的,可以并發執行多個任務。這里沒有提及的是hashtable維護線程,它負責hashtable擴容時數據的遷移工作。

推ter開源庫Twemcache分析

網絡模型
      網絡通信twemcache將TCP/UDP/UNIX域進行統一,其中TCP和UNIX域套接字流程是一樣的,UDP因為不用建立連接則少了listen+accept的過程,下面以TCP和UDP進行說明。
      TCP:與主進程相關聯的事件域是main_base,首先會向main_base中注冊一個listen事件,用于監聽連接的到達。當連接到來后,listen事件觸發并調用accept()接收,并為這個新的cli_fd分配一個新的連接標識conn,此時conn的事件域是main_base,然后選擇一個線程t去處理這個連接,這里主進程與線程間通信是通過管道來實現的,向選定的t->thread_send_fd發送一個字符。而工作線程都會監聽管道IO事件,事件觸發后會注冊連接conn的讀事件并變更conn的事件域為t->base,從而將一個連接交由線程去處理。
      這里的模型與傳統模型一樣,主進程監聽,連接到來后交由線程去處理。每到來一個連接,就觸發一個線程的管道IO事件去處理連接,并且這里的線程選擇是用的RoundRobin算法,所有每個線程是均勻分配任務的。

推ter開源庫Twemcache分析

      UDP:UDP中只有一個接收fd,初始化時主進程會對每個工作線程tX的管道tX->thread_send_fd寫字符,從而觸發所有工作線程,注冊conn的讀事件[此時conn不代表一個完整的連接,只含有服務端信息],并變更conn的事件域為tX->base,這樣所有線程都監聽fd的報文。
      這里看出線程策略上與TCP的不同,TCP是均勻的分配任務給線程,UDP則是啟動所有線程去監聽fd并競爭接收報文,任務的分配并不保證均勻。

推ter開源庫Twemcache分析

存儲模型
      為了解決內存malloc/free帶來的性能開銷,twemcache使用了slab來管理內存,具體如下圖所示:

推ter開源庫Twemcache分析

       slabclass是一個數組,每個槽對應不同大小的item大小,size表示該槽的item大小;需要slabclass數組的原因是item是變長的,預分配全部大小相同的item會造成大量碎片。從上至下,item的大小依次增加,每個slab是一組item的集合,slab的大小是固定的(slab_size),可以通過參數--slab_size配置;當一個slab用完后,新的分配請求要來,則分配一個新的slab,比較重要的屬性是free_item和nfree_itemq,free_item指向當前槽中第一個空閑的item,nfree_itemq是一個鏈表,所有使用過被刪除了的item會放入其中重復利用。
      item是實際存儲的數據單元,因此這里slab管理以item為單位,當需要分配一個item時,并不直接malloc,而是從slabclass中取一個已經分配好的,因為每個槽代表了一種item大小且是有序的(遞增),可以用二分查找到最接近要分配item大小的槽,并從中直接獲取。
      同時,slab是由heapinfo來進行管理的,nslab表示當前slab的數目,每分配一個新的slab(malloc了一塊內存),就會順序的插入到slab_table中,即slab_table[nslab++]=slab。在淘汰slab時使用的是LRU算法,heapinfo->slab_lruq維護了這樣一個slab的LRU鏈表,每次被修改過的slab會移到鏈表尾,淘汰時選擇鏈表頭。

推ter開源庫Twemcache分析

      item則是由item_lruq[]管理的,item_lruq是一個數組,與slabclass數組一一對應,即item_lruq[id]鏈接了slabclass[id]中被使用的item,當item需要進行淘汰時,采用的同樣是LRU算法。

推ter開源庫Twemcache分析

      上面都是內存的管理,item真正存儲的數據結構是hashtable,因為twemcache要實現的是{key, value}的映射。primary_hashtable就是存儲的數據結構,只有插入hashtable的item才算作被使用,需要被鏈到item_lruq中。而同時存在的old_hashtable則是當primary_hashtable需要進行擴容時使用的:當primary_hashtable中item數據超過1.5倍hashtable大小時,進行擴容,此時old_hashtable指向primary_hashtable,在擴容和遷移數據期間所有的item操作轉移到old_hashtable中,同時喚醒assoc_maintenance_thread進行數據遷移工作,將所有old_hashtable中的item遷移到擴容后的primary_hashtable中。

推ter開源庫Twemcache分析

Slab分配策略
      slab的分配策略是寫覆蓋,當有新的slab分配請求,不斷分配slab,直到達到上限max_nslab,此時slab不再重新分配,而是從已在使用的slab中找出一個淘汰掉,并作為此次的要用的slab,分配的策略是由slab_get()完成的,決策過程如下:
      1. 分配新的slab,若失敗則2
      2. 如設置LRU淘汰策略,則淘汰最近未使用的,若失敗則3
      3. 如設置RANDOM淘汰策略,則隨機淘汰一個

slab_get()
      大體上來說,slab_get_new()是決策1,slab_evict_lru()是決策2,slab_evict_rand()是決策3,經過決策后,如果分配到了新的slab,則重新初始化它,并添加到slab_table和slab_lruq中,這是由slab_add_one()完成的。下面分析下slab_evict_lru()淘汰規則。

slab = slab_get_new();
if (slab == NULL && (settings.evict_opt & EVICT_LS)) {
    slab = slab_evict_lru(id);
}
if (slab == NULL && (settings.evict_opt & EVICT_RS)) {
    slab = slab_evict_rand();
}
if (slab != NULL) {
    stats_slab_settime(id, slab_new_ts, time_now());
    slab_add_one(slab, id);
    status = MC_OK;
} 


slab_evict_lru()
      所有使用的slab都會添加到slab_lurq中(即slab_lruq_head()),找到最近未使用的即是查找LRU鏈表中最靠前的slab且其refcount==0,為了避免遍歷鏈表的時間消耗,tries限制了至多遍歷slab_lruq的前SLAB_LRU_MAX_TRIES個元素,如果找到了這樣的slab,則掉用slab_evict_one()將它淘汰,淘汰包括將它從slab_lruq上刪除,其中所有item從item_lruq上刪除,從slabclass相應槽中刪除。

for (tries = SLAB_LRU_MAX_TRIES, slab = slab_lruq_head();
    tries > 0 && slab != NULL;
    tries--, slab = TAILQ_NEXT(slab, s_tqe)) {
    if (slab->refcount == 0) {
        break;
    }
}
……
slab_evict_one(slab);


Item分配策略
      當需要新的item時,會經過一組決策來決定新分配的item取自哪里,這組決策都是在_item_alloc()中完成的,決策過程如下:
       1. 查找一個過時的item,如無則2
       2. 查找一個slab上空閑的item,如無則3
         a. 當前slab上有空閑item
         b. 當前slab上沒有空閑item,分配新的slab
      3. 淘汰一個item

_item_alloc()
      大體來說,item_get_from_lruq()是決策1,slab_get_item()是決策2,item_reuse()是決策3。這里的uit是LRU上最近未使用的一個item,如果設置了EVICT_LRU即LRU淘汰策略的話,則在決策1和2未成功時執行3。除了決策2,其它兩個都是對不會再使用的item的復用,過程是先在item_lruq[id]中查找是否有已超時的,有則返回給it;沒有則試圖從slab上分配一個item,有則返回給it;沒有則試圖復用最近未使用的uit。下面分析item_get_from_lruq()和slab_get_item()。

it = item_get_from_lruq(id); / expired / unexpired lru item /

if (it != NULL && item_expired(it)) { stats_slab_incr(id, item_expire); stats_slab_settime(id, item_expire_ts, it->exptime); item_reuse(it); goto done; }

uit = (settings.evict_opt & EVICT_LRU)? it : NULL; / keep if can be used / it = slab_get_item(id); if (it != NULL) { goto done; }

if (uit != NULL) { it = uit; stats_slab_incr(id, item_evict); stats_slab_settime(id, item_evict_ts, time_now());

item_reuse(it);
goto done;

}</pre>


item_get_from_lruq()
      函數從item_lruq中查找已超時的item記錄為it,最近未使用的item記錄為uit。id是根據item的大小所對應的槽id,槽中所有使用的item都會鏈在item_lruq[id]上,遍歷item_lruq[id]上的item,如果it->refcount!=0則表示還在被使用,這樣的item不能復用,直接跳過;對于refcount==0的item,如果找到超時的,則直接返回它,在查找過程中,記錄第一個refcount==0并且未超時的item(即最近未使用),作為決策3淘汰的對象。
      tries限制了遍歷的長度不能超過ITEM_LRUQ_MAX_TRIES,這樣節省了大量鏈表遍歷的時間,并且按LRU的性質,越靠近鏈表頭的元素越有可能作為淘汰對象,所有遍歷前ITEM_LRUQ_MAX_TRIES已經覆蓋了大部分情況。

for (tries = ITEM_LRUQ_MAX_TRIES, it = TAILQ_FIRST(&item_lruq[id]),
     uit = NULL; it != NULL && tries > 0;
     tries--, it = TAILQ_NEXT(it, i_tqe)) {

if (it->refcount != 0) {
 ……
    continue;
}

if (item_expired(it)) {
    return it;
} else if (uit == NULL) {
    uit = it;
}

}</pre>


slab_get_item() -> _slab_get_item()
       slab_get_item_from_freeq()從slabclass[id]中查找是否有空閑的item可用(即p->free_itemq),有則返回it,p->free_itemq上記錄的是使用過后被刪除的item;如果沒有這樣的item,則從p->free_item上取,它記錄還未使用過的item的首地址;如果沒有這樣的item,則表示當前slab已經用滿了,需要分配新的slab,slab_get()使用slab分配策略分配一個新的slab,此時slab中的item都未使用,都記錄到p->free_item中。最后從p->free_item中最得一個item返回就可以了。

p = &slabclass[id];
it = slab_get_item_from_freeq(id);
if (it != NULL)
return it;
if (p->free_item == NULL && (slab_get(id) != MC_OK)) {
    return NULL;
}
it = p->free_item;
if (--p->nfree_item != 0) {
    p->free_item = (struct item *)(((uint8_t *)p->free_item) + p->size);
} else {
    p->free_item = NULL;
}


Hashtable策略
      作為核心的存儲結構,twemcache使用的是鏈式哈希表,其主體由mc_assoc.c實現,hashtable初始大小為64K,在需要時進行擴容,在操作上與平時使用的hashtable并無差別,下面僅分析插入時assoc_insert()及擴容時assoc_expand()。
assoc_insert()
      assoc_get_bucket()獲取當前需要插入的桶,里面封裝了對hashtable的選擇,在存儲模型里已經說明了primary_hashtable和old_hashtable的不同作用,當hashtable正在擴容時,expanding==1(并且expand_bucket小于hashtable大小),返回old_hashtable;否則返回primary_hashtable。SLIST_INSERT_HEAD將新的item插入到桶中,nhash_item表示hashtable中item的數目,當其達到hashtable大小的1.5倍時,調用assoc_expand()進行擴容。注意這里的插入操作不用去查找是否已有item存在,這里使用的策略是先刪除已存在的item,再插入新的item,所有查找已存在操作會存在于刪除操作中,不會存在于插入操作中。

bucket = assoc_get_bucket(item_key(it), it->nkey);
SLIST_INSERT_HEAD(bucket, it, h_sle);
nhash_item++;
if ((expanding == 0) && (nhash_item > (HASHSIZE(hash_power) * 3) / 2)) {
    assoc_expand();
}


assoc_expand()
      函數進行hashtable的擴容,hash_power表示表大小的2次冪,當需要擴容時,hash_power + 1表示擴容一倍,old_hashtable指向primary_hashtable,primary_hashtable則指向新創建的hashtable,最近發送信號量給maintenance線程,這個線程一直等待在maintenance_cond信號量上,它負責將old_hashtable中的”所有”item插入到新的primary_hashtable。
      這里要注意的是,在擴容期間,新的item會插入到old_hashtable,這樣不斷有item到來,擴容線程可能永遠也無法將item完全從old_hashtable遷移到primary_hashtable。這里使用了expland_bucket,它標識擴容了多少個桶,當expland_bucket > HASHSIZE(hash_power - 1)時(即超過了擴容前hashtable大小)時,這時新的item不再會插入到old_hashtable,而是插入到primary_hashtable,從而保證數據遷移一定可以在有限時間內完成。

uint32_t hashtable_sz = HASHSIZE(hash_power + 1);
old_hashtable = primary_hashtable;
primary_hashtable = assoc_create_table(hashtable_sz);
……
hash_power++;
expanding = 1;
expand_bucket = 0;
pthread_cond_signal(&maintenance_cond);


狀態機
      非阻塞自然會與狀態機相關聯,twemcache也使用了狀態機來結合epoll調用,狀態機的核心處理函數是core_drive_machine(),下面的所有狀態遷移入口都是以該函數為入口的,它的大致結構如下,每次循環結束代表一次事件處理完成,在一次事件中可能發生多個狀態遷移。

while (!stop) {
    switch (c->state) {
    case CONN_LISTEN:  ……
    case CONN_WAIT:  ……
    case CONN_READ:  ……
    case CONN_PARSE:  ……
    case CONN_NEW_CMD:  ……
    case CONN_NREAD:  ……
    case CONN_SWALLOW:  ……
    case CONN_WRITE:  ……
    case CONN_MWRITE:  ……
    case CONN_CLOSE:  ……
    default:  ……
    }
}

       TCP/UNIX域和UDP的流程稍有不同,前者多了客戶端建立連接的過程,它們的流程圖如下所示,圖中用藍色虛線圈住的是一次連接的狀態轉移,在一個循環中,它們擁有相同的連接標識conn。

推ter開源庫Twemcache分析

TCP狀態機

推ter開源庫Twemcache分析

UDP狀態機


Twemcache實例分析
 系統初始化

core_init() // 初始化
core_loop() // 系統啟動


core_init()
       下面是提取的core_init的核心代碼段,main_base是創建的主進程的事件域,assoc_init()初始化核心存儲結構hashtable,item_init()初始化了管理item的item_lruq,slab_init()決定了每個slabclass槽的item大小,并預分配的內存,time_init()則向main_base中注冊了定時器事件clockevent,為系統提供秒級的當前時間,thread_init()分配并啟動了線程模型中描述的各類線程。下面就重要的slab_init()和thread_init()詳細分析下。

status = log_init(settings.verbose, settings.log_filename);
status = signal_init();
pthread_mutex_init(&accept_lock, NULL);
STAILQ_INIT(&listen_connq);
main_base = event_base_new();
status = assoc_init();
conn_init();
item_init();
status = slab_init();
stats_init();
status = klog_init();
time_init();
status = thread_init(main_base);


slab_init()
      執行兩步操作:slab_slabclass_init() / slab_heapinfo_init()。

slab_slabclass_init()
      它的作用是設置slabclass每個槽中item的大小,這里的nitem是slabclass中item的個數,item_sz是item的大小,free_itemq鏈接被刪除的item,nfree_item記錄空閑的item個數,free_item指向第一個空閑的item。這里決定item大小很重要的因素是profile => settings.profile,它記錄了每個槽item的大小,在mc_generate_profile()中設置。

for (id = SLABCLASS_MIN_ID; id <= slabclass_max_id; id++) {
    struct slabclass p; / slabclass /
    uint32_t nitem;      / # item per slabclass /
    size_t item_sz;      / item size */

nitem = slab_size() / profile[id];
item_sz = profile[id];
p = &slabclass[id];

p->nitem = nitem;
p->size = item_sz;

p->nfree_itemq = 0;
TAILQ_INIT(&p->free_itemq);

p->nfree_item = 0;
p->free_item = NULL;

}</pre>


slab_heapinfo_init()
       nslab表示當前分配的slab,max_nslab表示最多能分配的slab,base表示slab內存的基址,如果是預分配策略的話,則一次性全部分配,否則則在每次需要時分配slab;curr表示當前指向的slab,slab_table記錄所有分配使用的slab,slab_lruq鏈接所有分配使用的slab,并在需要時用LRU算法進行淘汰。

heapinfo.nslab = 0;
heapinfo.max_nslab = settings.maxbytes / settings.slab_size;

heapinfo.base = NULL; if (settings.prealloc) { heapinfo.base = mc_alloc(heapinfo.max_nslab settings.slab_size); ...... } heapinfo.curr = heapinfo.base; heapinfo.slab_table = mc_alloc(sizeof(heapinfo.slab_table) heapinfo.max_nslab); .... TAILQ_INIT(&heapinfo.slab_lruq);</pre>


thread_init()
       分配線程,nworkers代表工作線程的數目,1是主進程,即這里的dispatcher。以后在使用線程時,threads + id
sizeof(*threads)即為每id個線程。</span></p>

threads = mc_zalloc(sizeof(*threads) * (1 + nworkers));
if (threads == NULL) {
    return MC_ENOMEM;
}
dispatcher = &threads[nworkers];

       對于每個工作線程,建立一個管道,fds[0]用于工作線程接收來自主進程的數據,fds[1]用于主進程向工作線程發送數據(這里的數據只做信號作用),thread_setup()則為每個事件創建一個獨立的事件域t->base,并在t->base中注冊了管道IO事件,監聽fds[0]的讀事件,讀事件觸發則執行thread_libevent_process(),它負責完成由主進程轉來的客戶端連接conn。

for (i = 0; i < nworkers; i++) {
    int fds[2];
    status = pipe(fds);
    if (status < 0) {
        log_error("pipe failed: %s", strerror(errno));
        return status;
    }
    threads[i].notify_receive_fd = fds[0];
    threads[i].notify_send_fd = fds[1];
    status = thread_setup(&threads[i]);
    if (status != MC_OK) {
        return status;
    }
}

       然后啟動所有工作線程,thread_worker_main執行的操作很簡單 – 開始事件的監聽(event_base_loop)。

for (i = 0; i < nworkers; i++) {
    status = thread_create(thread_worker_main, &threads[i]);
    if (status != MC_OK) {
        return status;
    }
}

      最后,還會設置和啟動aggregator線程和klogger線程,在線程模型中已有描述,兩個線程都有獨立的事件域,并在其上注冊了定時器事件,前者用于采集狀態數據,后者用于輸出log信息,啟動線程后執行的操作與工作線程一樣 – 開始事件的監聽event_base_dispatch()。

status = thread_setup_aggregator();
status = thread_create(thread_aggregator_main, NULL);
status = thread_setup_klogger();
status = thread_create(thread_klogger_main, NULL);


core_loop()
       core_create_socket創建服務器的套接字,然后event_base_loop()開始監聽事件,下面詳細分析core_create_socket()。

status = core_create_socket();
event_base_loop(main_base, 0);


core_create_socket()    -> core_create_inet_socket()
       如果是udp端口,則沒有listen()和accept()的過程,thread_dispatch()向每個工作線程的管道寫入字符,觸發工作線程執行連接sd的監聽事件,競爭地讀取客戶端發往sd的請求報文。
如果是tcp端口,conn_set_event()向主進程main_base中注冊sd的監聽事件,當sd有連接到來由主進程經過accept()后再交由指定的線程去處理。
       這里的conn代表了一個連接的標識,用完的conn會放入free_connq中,當下次需要conn時就不用重新分配內存了,而會直接從free_connq中復用。

if (udp) {
    int c;
    for (c = 0; c < settings.num_workers; c++) {
        status = thread_dispatch(sd, CONN_READ, EV_READ | EV_PERSIST, 1);
        ......
    }
} else {
    conn = conn_get(sd, CONN_LISTEN, EV_READ | EV_PERSIST, 1, 0);
    ......
    STAILQ_INSERT_HEAD(&listen_connq, conn, c_tqe);

status = conn_set_event(conn, main_base);
......

}</pre>


實例 [TCP連接,客戶端請求”set foo bar”]
CONN_LISTEN -> CONN_NEW_CMD
      當客戶端連接到達后,c->sd監聽事件觸發,調用core_event_handler() -> core_accept()接收客戶端連接。
core_accept()
      它的核心代碼如下,accept()完成與客戶端的三次握手建立連接,返回socket sd,然后主進程將這個連接sd交由一個工作線程去處理,這是由thread_dispatch()完成的。

sd = accept(c->sd, NULL, NULL);
......
status = thread_dispatch(sd, CONN_NEW_CMD, EV_READ | EV_PERSIST, 0);


thread_dispatch()
      conn_get()獲取一個連接conn并將它初始化為本次連接的標識,接下來tid是選擇要處理這個連接的線程,選擇的算法是Round Robin,即每次循環遞增一個id號,然后將連接標識c壓入選定的線程t->new_cq中,它存儲了線程要處理的所有連接,最后向t->notify_send_fd寫一個字符,觸發工作線程t的管道IO事件,讓其處理新的連接。此時,連接由主進程由給了工作線程t,表現在事件域發生了變更,接下來c的處理都在工作線程中,直到連接關閉。

c = conn_get(sd, state, ev_flags, rsize, udp);
......
tid = (last_thread + 1) % settings.num_workers;
t = threads + tid;
last_thread = tid;

conn_cq_push(&t->new_cq, c); n = write(t->notify_send_fd, "", 1);</pre>


CONN_NEW_CMD -> CONN_WAIT
      nreqs是一次事件中,能處理的最大請求數目,避免工作線程被某個連接完全占用,core_reset_cmd_handler()會重新初始化連接相關的數據如req_type, item等,最后設置狀態為CONN_WAIT。

--nreqs;
if (nreqs >= 0) {
 core_reset_cmd_handler(c);
}


CONN_WAIT -> CONN_READ
      更新事件為監聽可讀事件,并設置狀態為CONN_READ,stop是個標志,所有的狀態遷移在一個while(!stop)循環中,只要stop未設為true,則這次狀態遷移還要繼續,只有當stop為true時才代表一次處理完成,重新回到epoll進入監聽狀態。

status = core_update(c, EV_READ | EV_PERSIST);
if (status != MC_OK) {
    log_error("update on c %d failed: %s", c->sd, strerror(errno));
    conn_set_state(c, CONN_CLOSE);
    break;
}
conn_set_state(c, CONN_READ);
stop = true;


CONN_READ -> CONN_PARSE
      狀態CONN_READ作用是完成客戶端命令讀取。假設是TCP連接,core_read()調用core_read_tcp()完成命令讀取,并根據讀取結果設置連接狀態,讀取完整會設為CONN_PARSE狀態。

case CONN_READ:
    core_read(c);
    break;


core_read_tcp()
      c代表了客戶端連接,讀取數據到c->rbuf中,根據返回值會有三種情況:
        1. n<0&&(errno==EGAIN||errno==EWOULDBLOCK)    連接不可讀,返回等待下次讀取
        2. 0         3. n==size    數據占滿了c->rbuf,但仍未讀完,重新分配rbuf大小,并再次讀取數據到c->rbuf中,直到讀取完成。

for (;;) {
    ......
    size = c->rsize - c->rbytes;
    n = read(c->sd, c->rbuf + c->rbytes, size);
    if (n > 0) {
        stats_thread_incr_by(data_read, n);
        gotdata = READ_DATA_RECEIVED;
        c->rbytes += n;
        if (n == size) {
            continue;
        } else {
            break;
        }
    }
    ……
    if (errno == EAGAIN || errno == EWOULDBLOCK) {
        log_debug(LOG_VERB, "recv on c %d not ready - eagain", c->sd);
        break;
    }
}


CONN_PARSE -> CONN_NREAD
      狀態CONN_PARSE作用是完成客戶端命令的分析(命令的分析并不包括附帶的數據),調用core_parse()完成[假設客戶端命令是”set foo bar”]。
      core_parse() -> asc_parse() -> asc_dispatch() -> asc_process_update()
asc_process_update()
      之前的函數對命令進行了解析,假設客戶端命令是”set foo bar”,則收到數據與解析后的結果如圖所示(其中0x20是空格,0x0D 0x0A是回車換行符):

推ter開源庫Twemcache分析

      item_alloc()按前面的item分配策略為本次命令分配了一個item – it,并設置了c->ritem指向item的數據(即value),rlbytes表示仍未讀取的命令長部,即數據部分(“bar”),最后設置狀態為CONN_NREAD。

it = item_alloc(key, nkey, flags, time_reltime(exptime), vlen);
......
c->item = it;
c->ritem = item_data(it);
c->rlbytes = it->nbyte;
conn_set_state(c, CONN_NREAD);


CONN_NREAD -> CONN_WRITE
      狀態CONN_NREAD完成命令數據部分的分析,這個狀態至少要循環兩次,前面幾次將c->rlbytes(數據部分長度)讀入到c->ritem中,這部分數據可能在CONN_READ時已讀入到c->rbuf中,那么此時c->rbytes > 0,直接從緩沖區取這部分數據就可以了,即第二個if語句段;這部分數據可能還沒有讀取,那么調用read()從c->sd中讀取。讀取的數據放到c->ritem即數據區,并更新c->rlbytes,它表示數據部分還有多少字節未讀取,當讀取完后最后一次進入循環,c->rlbytes == 0,此時調用core_complete_nread()完成數據部分的存儲,下面分析這個函數。

if (c->rlbytes == 0) {
 core_complete_nread(c);
 break;
}

if (c->rbytes > 0) { int tocopy = c->rbytes > c->rlbytes ? c->rlbytes : c->rbytes; if (c->ritem != c->rcurr) { memmove(c->ritem, c->rcurr, tocopy); } c->ritem += tocopy; c->rlbytes -= tocopy; c->rcurr += tocopy; c->rbytes -= tocopy; if (c->rlbytes == 0) { break; } }

n = read(c->sd, c->ritem, c->rlbytes); if (n > 0) { stats_thread_incr_by(data_read, n); if (c->rcurr == c->ritem) { c->rcurr += n; } c->ritem += n; c->rlbytes -= n; break; }</pre>


core_complete_nread() -> asc_complete_nread()
      item_store()將讀取的數據部分(即value)存入相應的item,根據返回值,成功存入則返回STORED,執行asc_write_stored()將回送信息”STORED”寫入連接c的發送緩沖區c->wbuf,并設置狀態為CONN_WRITE,c->write_and_go = CONN_NEW_CMD,這個變量指示了CONN_WRITE狀態后要遷移到哪個狀態。至此,這次請求對item的使用已經完成了,調用item_remove()減小計數,因為item還鏈在item_lruq上,所以不并實際刪除,然后c->item = NULL,表示此次請求不再使用item。下面詳細分析item的value存儲函數item_store()。

ret = item_store(it, c->req_type, c);
switch (ret) {
case STORED:
 asc_write_stored(c);
 break;
 ......
}
......
item_remove(c->item);
c->item = NULL;


item_store() -> _item_store()
      在存儲模型中已經描述,item最終存儲的數據結構是hashtable,_item_get()從hashtable中以鍵值key(即foo)查找相應的item。
      如果沒有找到item,則是首次插入,調用_item_link()將它插入到hashtable中;
      如果找到item,則調用_item_replace()替代之前的item。

key = item_key(it);
nit = NULL;
oit = _item_get(key, it->nkey);
……
if (result == NOT_STORED && store_it) {
    if (oit != NULL) {
        _item_replace(oit, it);
    } else {
        _item_link(it);
    }
    result = STORED;
}

       此例中是第一次set foo,_item_get()會返回NULL,最終調用_item_link(),這個函數很簡單,更改它的flags |= ITEM_LINKED表示被鏈接,assoc_insert()將這個item插入到hashtable中,item_link_q()將item鏈到item_lruq上。

it->flags |= ITEM_LINKED;
item_set_cas(it, item_next_cas());
assoc_insert(it);
item_link_q(it);


CONN_WRITE -> CONN_NEW_CMD
      CONN_WRITE狀態完成客戶端的應答,應答內容在CONN_NREAD狀態下已經寫入到c->wbuf中了,首先調用conn_add_iov()將c->wbuf中的內容組裝成msgbuf的形式。

if (c->iov_used == 0 || (c->udp && c->iov_used == 1)) {
 status = conn_add_iov(c, c->wcurr, c->wbytes);
 ......
}

      然后由core_transmit()完成內容的發送,發送成功會返回TRANSMIT_COMPLETE(至少需要兩次循環,同CONN_NREAD),因為此時c->state為CONN_WRITE,變遷狀態至c->write_and_go(即CONN_NEW_CMD),從而完成了這一次請求。當然,core_transmit()也會失敗,最大可能是因為socket當時并不可寫,寫socket的時機并不是由epoll的寫事件觸發的,這種情況下會返回TRANSMIT_SOFT_ERR,它置stop=true,表示此次事件處理完成,等待socket的可寫事件到達。下面分析core_transmit()函數。

switch (core_transmit(c)) {
case TRANSMIT_COMPLETE:
 if (c->state == CONN_MWRITE) {
        ......
  conn_set_state(c, CONN_NEW_CMD);
 } else if (c->state == CONN_WRITE) {
  if (c->write_and_free) {
   mc_free(c->write_and_free);
   c->write_and_free = 0;
  }
  conn_set_state(c, c->write_and_go);
 } else {
  log_debug(LOG_INFO, "unexpected state %d", c->state);
  conn_set_state(c, CONN_CLOSE);
 }
 break;
case TRANSMIT_INCOMPLETE:
case TRANSMIT_HARD_ERROR:
 break;
case TRANSMIT_SOFT_ERROR:
 stop = true;
 break;
}


core_transmit()
      msg_curr和msg_used對比表示是否還有數據需要發送,沒有時返回TRANSMIT_COMPLETE;仍有數據則調用sendmsg()進行發送,res > 0表示發送成功,此時返回TRANSMIT_INCOMPLETE,這和CONN_NREAD狀態下讀取數據的做法是一樣的,至少需要兩次core_transmit,在發送完后最后一次進入會返回TRANSMIT_COMPLETE;res == -1及errno判斷表示c->sd此時并不可寫,我們是在讀事件觸發后直接寫socket,不可寫則core_update()更新c->sd上的監聽事件為寫事件,并返回TRANSMIT_SOFT_ERROR,它會導致此次事件處理結束,結果就是等待寫事件的到來。

if (c->msg_curr < c->msg_used) {
……
res = sendmsg(c->sd, m, 0);
if (res > 0) {
  ......
  return TRANSMIT_INCOMPLETE;
}
if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
  status = core_update(c, EV_WRITE | EV_PERSIST);
  if (status != MC_OK) {
   log_error("update on c %d failed: %s", c->sd, strerror(errno));
   conn_set_state(c, CONN_CLOSE);
   return TRANSMIT_HARD_ERROR;
  }
  return TRANSMIT_SOFT_ERROR;
}
} else {
 return TRANSMIT_COMPLETE;
}


CONN_NEW_CMD
       這個狀態代表了該連接上可以接受下一個請求了,即一次客戶端請求結束。

      由上面的分析可見,所有的狀態轉移都是在core_dirve_machine()函數中完成的,并且并不是每個狀態對應一個事件,twemcache對狀態的劃分是按功能來的,比如在讀事件中就會完成讀數據、分析數據兩個功能,下面的圖表示了各狀態執行時所處的事件:

推ter開源庫Twemcache分析

 

 

轉自:http://blog.csdn.net/qy532846454/article/details/7899780

 本文由用戶 openkk 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!