用libevent改造teamtalk底層網絡框架

jopen 9年前發布 | 25K 次閱讀 libevent 網絡工具包

因為之前群友對teamtalk的底層網絡框架性能諸多抱怨,并且藍狐大大也曾跟我說起過想用libevent重寫底層網絡框架,剛好最近我想用 hiredis的異步連接客戶端給teamtalk加我自己的功能,而hiredis的異步客戶端只能適配libevent, libev, libuv等幾個知名的異步框架。如果我想用只有兩個辦法,一,自己給hiredis新增一個teamtalk的異步網絡適配器, 二, 用libevent或者libuv重寫teamtalk底層網絡框架。

想到改寫teamtalk底層網絡框架并非易事,有可能遷一發動全身,然后就玩大了,所以我就先去看了hiredis的源碼,看了大半天結果發現還是回去改造teamtalk吧。。。

然后就開始在幾個網絡框架里選擇用哪一個來改寫,libev首先被剔除了,原因我就不講了,你懂的。然后對我來講,因為鄙人的node.js功力非常深厚,而libuv則是node.js的底層網絡庫,所以選擇libuv自然應該是理所當然的了。然后就開始弄了,弄了老半天結果發現真的玩大了,至于為什么你可以去嘗試一下,也許你可以的,我是放棄了,然后就還是libevent了。

“你這貨廢話怎么這么多!!!”

“因為其實我不是程序員,鄙人日常工作是拉皮條的。”

關于teamtalk的底層網絡框架的性能問題,我這里先來舉個例子做個說明


int netlib_send(net_handle_t handle, void* buf, int len)
{
    CBaseSocket* pSocket = FindBaseSocket(handle);
    if (!pSocket)
    {
        return NETLIB_ERROR;
    }
    int ret = pSocket->Send(buf, len);
    pSocket->ReleaseRef();
    return ret;
}

上面這段代碼是teamtalk的原始netlib庫的發送函數,所有功能要發送報文最終都會調到這個函數,然后你就可以看到了,這段函數每次發送數據前都要FindBaseSocket一把,當你的并發連接不多時這個查找倒也無所謂,但如果這時你有十萬個并發連接,那么你每次哪怕是發一個小包包都要去這十萬個basesocket里面找一把,這個就讓人不那么爽了(盡管map查找似乎還挺快,然而也并沒有什么軟用)。

所以,如果你不爽,那就想辦法讓自己爽起來吧,人生苦短,請及時行樂。

怎么才能爽起來呢?必須把這里的find給弄掉,并且還不能碰netlib棧以上的代碼,也就是說千萬別去碰CImConn,如果你連那個也改了,相信我你肯定不會爽起來的。不知道CImConn的同學可以讀一下我前面的博文。

所以,原則就定下來了,只能改動netlib內部的實現,不要變動其對外的接口,如果做到這一點,你最終只需要改動很少的代碼并讓自己爽起來。

“好了,今天就到此為止吧,該說的都講清楚了,我特么真是個寫博客的人才~”

 ”臥槽,你還沒講怎么改呢!@!“

 ”不是說了么,只改netlib內部實現,你就能嗨起來了。“

 ”擦!“

好吧,為了不被噴,我還是把我自己改的代碼貢獻出來吧


#include "netlib.h"
#include "BaseSocket.h"
#include "EventDispatch.h"

#define __LIBEVENT__

#ifndef __LIBEVENT__

int netlib_init()
{
    int ret = NETLIB_OK;
#ifdef _WIN32
    WSADATA wsaData;
    WORD wReqest = MAKEWORD(1, 1);
    if (WSAStartup(wReqest, &wsaData) != 0)
    {
        ret = NETLIB_ERROR;
    }
#endif

    return ret;
}

int netlib_destroy()
{
    int ret = NETLIB_OK;
#ifdef _WIN32
    if (WSACleanup() != 0)
    {
        ret = NETLIB_ERROR;
    }
#endif

    return ret;
}

int netlib_listen(
        const char* server_ip, 
        uint16_t    port,
        callback_t  callback,
        void*       callback_data)
{
    auto spSocket = sp_CBaseSocket(new CBaseSocket());
    if (!spSocket)
        return NETLIB_ERROR;

    int ret =  spSocket->Listen(server_ip, port, callback, callback_data);
    // if (ret == NETLIB_ERROR)
    //  delete pSocket;
    return ret;
}

net_handle_t netlib_connect(
        const char* server_ip, 
        uint16_t    port, 
        callback_t  callback, 
        void*       callback_data)
{
    auto spSocket = sp_CBaseSocket(new CBaseSocket());
    if (!spSocket)
        return NETLIB_INVALID_HANDLE;

    net_handle_t handle = spSocket->Connect(server_ip, port, callback, callback_data);
    // if (handle == NETLIB_INVALID_HANDLE)
    //  delete pSocket;
    return handle;
}

int netlib_send(net_handle_t handle, void* buf, int len)
{
    auto spSocket = FindBaseSocket(handle);
    if (!spSocket)
    {
        return NETLIB_ERROR;
    }
    int ret = spSocket->Send(buf, len);
    // pSocket->ReleaseRef();
    return ret;
}

int netlib_recv(net_handle_t handle, void* buf, int len)
{
    auto spSocket = FindBaseSocket(handle);
    if (!spSocket)
        return NETLIB_ERROR;

    int ret = spSocket->Recv(buf, len);
    // pSocket->ReleaseRef();
    return ret;
}

int netlib_close(net_handle_t handle)
{
    auto spSocket = FindBaseSocket(handle);
    if (!spSocket)
        return NETLIB_ERROR;

    int ret = spSocket->Close();
    // pSocket->ReleaseRef();
    return ret;
}

int netlib_option(net_handle_t handle, int opt, void* optval)
{
    auto spSocket = FindBaseSocket(handle);
    if (!spSocket)
        return NETLIB_ERROR;

    if ((opt >= NETLIB_OPT_GET_REMOTE_IP) && !optval)
        return NETLIB_ERROR;

    switch (opt)
    {
    case NETLIB_OPT_SET_CALLBACK:
        spSocket->SetCallback((callback_t)optval);
        break;
    case NETLIB_OPT_SET_CALLBACK_DATA:
        spSocket->SetCallbackData(optval);
        break;
    case NETLIB_OPT_GET_REMOTE_IP:
        *(string*)optval = spSocket->GetRemoteIP();
        break;
    case NETLIB_OPT_GET_REMOTE_PORT:
        *(uint16_t*)optval = spSocket->GetRemotePort();
        break;
    case NETLIB_OPT_GET_LOCAL_IP:
        *(string*)optval = spSocket->GetLocalIP();
        break;
    case NETLIB_OPT_GET_LOCAL_PORT:
        *(uint16_t*)optval = spSocket->GetLocalPort();
        break;
    case NETLIB_OPT_SET_SEND_BUF_SIZE:
        spSocket->SetSendBufSize(*(uint32_t*)optval);
        break;
    case NETLIB_OPT_SET_RECV_BUF_SIZE:
        spSocket->SetRecvBufSize(*(uint32_t*)optval);
        break;
    }

    // pSocket->ReleaseRef();
    return NETLIB_OK;
}

int netlib_register_timer(callback_t callback, void* user_data, uint64_t interval)
{
    CEventDispatch::Instance()->AddTimer(callback, user_data, interval);
    return 0;
}

int netlib_delete_timer(callback_t callback, void* user_data)
{
    CEventDispatch::Instance()->RemoveTimer(callback, user_data);
    return 0;
}

int netlib_add_loop(callback_t callback, void* user_data)
{
    CEventDispatch::Instance()->AddLoop(callback, user_data);
    return 0;
}

void netlib_eventloop(uint32_t wait_timeout)
{
    CEventDispatch::Instance()->StartDispatch(wait_timeout);
}

void netlib_stop_event()
{
    CEventDispatch::Instance()->StopDispatch();
}

bool netlib_is_running()
{
    return CEventDispatch::Instance()->isRunning();
}

#else

#include <unordered_map>
#include <event2/event.h>

static unordered_map<net_handle_t, struct event*> g_read_event_map;
static unordered_map<net_handle_t, struct event*> g_write_event_map;
static unordered_map<callback_t, struct event*> g_timer_map;
struct event_base* g_libevent_base;

static string _GetRemoteIP(net_handle_t hd);
static uint16_t _GetRemotePort(net_handle_t hd);
static string _GetLocalIP(net_handle_t hd);
static uint16_t _GetLocalPort(net_handle_t hd);
static void _SetSendBufSize(net_handle_t hd, uint32_t send_size);
static void _SetRecvBufSize(net_handle_t hd, uint32_t recv_size);

static int _GetErrorCode();
static void _SetNonblock(SOCKET fd);
static bool _IsBlock(int error_code);
static void _SetReuseAddr(SOCKET fd);
static void _SetNoDelay(SOCKET fd);
static void _SetAddr(const char* ip, const uint16_t port, sockaddr_in* pAddr);

void netlib_onconfirm(evutil_socket_t fd, short what, void *arg);
void netlib_onread(evutil_socket_t fd, short what, void *arg);
void netlib_onwrite(evutil_socket_t fd, short what, void *arg);
void netlib_onaccept(evutil_socket_t fd, short what, void *arg);
void netlib_ontimer(evutil_socket_t fd, short what, void* arg);

void netlib_check_write_error(net_handle_t fd, int* error, socklen_t* len);
void netlib_set_onconnect_event(net_handle_t handle, callback_t callback, void* cbdata);

struct EvtArg {
    callback_t callback;
    void* cbdata;

    EvtArg(callback_t c, void* d) : callback(c), cbdata(d) {}
    ~EvtArg() {}
};

struct EvtArg2 {
    callback_t callback;
    void* cbdata;
    struct event* evt;

    EvtArg2(callback_t c, void* d, struct event* e) : callback(c), cbdata(d), evt(e) {}
    ~EvtArg2() {}
};

static int _GetErrorCode()
{
#ifdef _WIN32
    return WSAGetLastError();
#else
    return errno;
#endif
}

static void _SetNonblock(SOCKET fd)
{
#ifdef _WIN32
    u_long nonblock = 1;
    int ret = ioctlsocket(fd, FIONBIO, &nonblock);
#else
    int ret = fcntl(fd, F_SETFL, O_NONBLOCK | fcntl(fd, F_GETFL));
#endif
    if (ret == SOCKET_ERROR)
    {
        log("_SetNonblock failed, err_code=%d", _GetErrorCode());
    }
}

static bool _IsBlock(int error_code)
{
#ifdef _WIN32
    return ( (error_code == WSAEINPROGRESS) || (error_code == WSAEWOULDBLOCK) );
#else
    return ( (error_code == EINPROGRESS) || (error_code == EWOULDBLOCK) );
#endif
}

static void _SetReuseAddr(SOCKET fd)
{
    int reuse = 1;
    int ret = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char*)&reuse, sizeof(reuse));
    if (ret == SOCKET_ERROR)
    {
        log("_SetReuseAddr failed, err_code=%d", _GetErrorCode());
    }
}

static void _SetNoDelay(SOCKET fd)
{
    int nodelay = 1;
    int ret = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char*)&nodelay, sizeof(nodelay));
    if (ret == SOCKET_ERROR)
    {
        log("_SetNoDelay failed, err_code=%d", _GetErrorCode());
    }
}

static void _SetAddr(const char* ip, const uint16_t port, sockaddr_in* pAddr)
{
    memset(pAddr, 0, sizeof(sockaddr_in));
    pAddr->sin_family = AF_INET;
    pAddr->sin_port = htons(port);
    pAddr->sin_addr.s_addr = inet_addr(ip);
    if (pAddr->sin_addr.s_addr == INADDR_NONE)
    {
        hostent* host = gethostbyname(ip);
        if (host == NULL)
        {
            log("gethostbyname failed, ip=%s", ip);
            return;
        }

        pAddr->sin_addr.s_addr = *(uint32_t*)host->h_addr;
    }
}

int netlib_init()
{
    int ret = NETLIB_OK;
#ifdef _WIN32
    WSADATA wsaData;
    WORD wReqest = MAKEWORD(1, 1);
    if (WSAStartup(wReqest, &wsaData) != 0) {
        ret = NETLIB_ERROR;
    }
#endif
    g_libevent_base = event_base_new();
    return ret;
}

int netlib_destroy()
{
    int ret = NETLIB_OK;
#ifdef _WIN32
    if (WSACleanup() != 0) {
        ret = NETLIB_ERROR;
    }
#endif
    event_base_free(g_libevent_base);
    return ret;
}

net_handle_t netlib_connect(
        const char* server_ip, 
        uint16_t    port, 
        callback_t  callback, 
        void*       callback_data)
{
    net_handle_t sock = socket(AF_INET, SOCK_STREAM, 0);
    if (sock == INVALID_SOCKET) {
        loge("socket failed, err_code=%d", _GetErrorCode());
        return NETLIB_INVALID_HANDLE;
    }

    _SetNonblock(sock);
    _SetNoDelay(sock);

    sockaddr_in serv_addr;
    _SetAddr(server_ip, port, &serv_addr);
    int ret = connect(sock, (sockaddr*)&serv_addr, sizeof(serv_addr));
    if ( (ret == SOCKET_ERROR) && (!_IsBlock(_GetErrorCode())) ) {
        loge("connect failed, err_code=%d", _GetErrorCode());
        closesocket(sock);
        return NETLIB_INVALID_HANDLE;
    }

    auto evtArg2 = new EvtArg2(callback, callback_data, NULL);
    evtArg2->evt = event_new(g_libevent_base, sock, EV_WRITE, netlib_onconfirm, evtArg2);
    event_add(evtArg2->evt, NULL);

    return sock;
}

int netlib_close(net_handle_t handle)
{
    auto it = g_read_event_map.find(handle);
    if (it != g_read_event_map.end()) {
        auto ev = it->second;
        g_read_event_map.erase(it);
        event_free(ev);
    }

    auto it2 = g_write_event_map.find(handle);
    if (it2 != g_write_event_map.end()) {
        auto ev = it2->second;
        g_write_event_map.erase(it2);
        event_free(ev);
    }

    closesocket(handle);
    return 0;
}


int netlib_listen(
        const char* server_ip, 
        uint16_t    port,
        callback_t  callback,
        void*       callback_data)
{
    auto sock = socket(AF_INET, SOCK_STREAM, 0);
    if (sock == INVALID_SOCKET) {
        printf("socket failed, err_code=%d\n", _GetErrorCode());
        return NETLIB_ERROR;
    }

    _SetReuseAddr(sock);
    _SetNonblock(sock);

    sockaddr_in serv_addr;
    _SetAddr(server_ip, port, &serv_addr);
    int ret = bind(sock, (sockaddr*)&serv_addr, sizeof(serv_addr));
    if (ret == SOCKET_ERROR) {
        loge("bind failed, err_code=%d", _GetErrorCode());
        closesocket(sock);
        return NETLIB_ERROR;
    }

    ret = listen(sock, 64);
    if (ret == SOCKET_ERROR) {
        loge("listen failed, err_code=%d", _GetErrorCode());
        closesocket(sock);
        return NETLIB_ERROR;
    }

    auto evtArg = new EvtArg(callback, callback_data);
    struct event* ev = event_new(g_libevent_base, sock, EV_READ|EV_PERSIST, netlib_onaccept, evtArg);
    event_add(ev, NULL);

    return NETLIB_OK;

}


void netlib_onaccept(evutil_socket_t fd, short what, void *arg)
{
    sockaddr_in peer_addr;
    socklen_t addr_len = sizeof(sockaddr_in);
    char ip_str[64];

    while ( (fd = accept(fd, (sockaddr*)&peer_addr, &addr_len)) != INVALID_SOCKET )
    {
        _SetNoDelay(fd);
        _SetNonblock(fd);

        auto evtArg = (EvtArg*)arg;
        evtArg->callback(evtArg->cbdata, NETLIB_MSG_CONNECT, (net_handle_t)fd, NULL);
    }   
}

void netlib_check_write_error(net_handle_t fd, int* error, socklen_t* len)
{
#ifdef _WIN32
    getsockopt(fd, SOL_SOCKET, SO_ERROR, (char*)error, len);
#else
    getsockopt(fd, SOL_SOCKET, SO_ERROR, (void*)error, len);
#endif  
}

void netlib_onconfirm(evutil_socket_t fd, short what, void *arg)
{
    auto evtArg2 = (EvtArg2*)arg;
    int error = 0;
    socklen_t len = sizeof(error);
    netlib_check_write_error((net_handle_t)fd, &error, &len);
    if (error) {
        evtArg2->callback(evtArg2->cbdata, NETLIB_MSG_CLOSE, (net_handle_t)fd, NULL);
    } else {
        event_free(evtArg2->evt);
        auto arg = new EvtArg(evtArg2->callback, evtArg2->cbdata);
        struct event* evread = event_new(g_libevent_base, fd, EV_READ|EV_PERSIST|EV_ET, netlib_onread, arg);
        struct event* evwrite = event_new(g_libevent_base, fd, EV_WRITE|EV_PERSIST|EV_ET, netlib_onwrite, arg);
        event_add(evread, NULL);
        event_add(evwrite, NULL);
        g_read_event_map[fd] = evread;
        g_write_event_map[fd] = evwrite;
        evtArg2->callback(evtArg2->cbdata, NETLIB_MSG_CONFIRM, (net_handle_t)fd, NULL);
    }
}

void netlib_onread(evutil_socket_t fd, short what, void *arg)
{
    auto evtArg = (EvtArg*)arg;
    evtArg->callback(evtArg->cbdata, NETLIB_MSG_READ, (net_handle_t)fd, NULL);
}

void netlib_onwrite(evutil_socket_t fd, short what, void *arg)
{
    auto evtArg = (EvtArg*)arg;
    evtArg->callback(evtArg->cbdata, NETLIB_MSG_WRITE, (net_handle_t)fd, NULL);
}

void netlib_set_onconnect_event(net_handle_t handle, callback_t callback, void* cbdata)
{
    auto arg = new EvtArg(callback, cbdata);
    struct event* evread = event_new(g_libevent_base, handle, EV_READ|EV_PERSIST|EV_ET, netlib_onread, arg);
    struct event* evwrite = event_new(g_libevent_base, handle, EV_WRITE|EV_PERSIST|EV_ET, netlib_onwrite, arg);
    event_add(evread, NULL);
    event_add(evwrite, NULL);
    g_read_event_map[handle] = evread;
    g_write_event_map[handle] = evwrite;
}

string _GetRemoteIP(net_handle_t hd)
{
    struct sockaddr_in sa;
    socklen_t len = sizeof(sa);
    if (!getpeername(hd, (struct sockaddr*)&sa, &len)) {
        return inet_ntoa(sa.sin_addr);
    } else {
        return "";
    }
}

uint16_t _GetRemotePort(net_handle_t hd)
{
    struct sockaddr_in sa;
    socklen_t len = sizeof(sa);
    if (!getpeername(hd, (struct sockaddr*)&sa, &len)) {
        return ntohs(sa.sin_port);
    } else {
        return 0;
    }   
}

string _GetLocalIP(net_handle_t hd)
{
    struct sockaddr_in sa;
    socklen_t len = sizeof(sa);
    if (!getsockname(hd, (struct sockaddr*)&sa, &len)) {
        return inet_ntoa(sa.sin_addr);
    } else {
        return "";
    }   
}

uint16_t _GetLocalPort(net_handle_t hd)
{
    struct sockaddr_in sa;
    socklen_t len = sizeof(sa);
    if (!getsockname(hd, (struct sockaddr*)&sa, &len)) {
        return ntohs(sa.sin_port);
    } else {
        return 0;
    }   
}

void _SetSendBufSize(net_handle_t hd, uint32_t send_size)
{
    int ret = setsockopt(hd, SOL_SOCKET, SO_SNDBUF, &send_size, 4);
    if (ret == SOCKET_ERROR) {
        loge("set SO_SNDBUF failed for fd=%d", hd);
    }

    socklen_t len = 4;
    int size = 0;
    getsockopt(hd, SOL_SOCKET, SO_SNDBUF, &size, &len);
    loge("socket=%d send_buf_size=%d", hd, size);   
}

void _SetRecvBufSize(net_handle_t hd, uint32_t recv_size)
{
    int ret = setsockopt(hd, SOL_SOCKET, SO_RCVBUF, &recv_size, 4);
    if (ret == SOCKET_ERROR) {
        loge("set SO_RCVBUF failed for fd=%d", hd);
    }

    socklen_t len = 4;
    int size = 0;
    getsockopt(hd, SOL_SOCKET, SO_RCVBUF, &size, &len);
    loge("socket=%d recv_buf_size=%d", hd, size);   
}

int netlib_option(net_handle_t handle, int opt, void* optval)
{
    static callback_t cb;

    if ((opt >= NETLIB_OPT_GET_REMOTE_IP) && !optval)
        return NETLIB_ERROR;

    switch (opt) {
        case NETLIB_OPT_SET_CALLBACK:
            cb = (callback_t)optval;
            break;
        case NETLIB_OPT_SET_CALLBACK_DATA:
            netlib_set_onconnect_event(handle, cb, optval);
            break;
        case NETLIB_OPT_GET_REMOTE_IP:
            *(string*)optval = _GetRemoteIP(handle);
            break;
        case NETLIB_OPT_GET_REMOTE_PORT:
            *(uint16_t*)optval = _GetRemotePort(handle);
            break;
        case NETLIB_OPT_GET_LOCAL_IP:
            *(string*)optval = _GetLocalIP(handle);
            break;
        case NETLIB_OPT_GET_LOCAL_PORT:
            *(uint16_t*)optval = _GetLocalPort(handle);
            break;
        case NETLIB_OPT_SET_SEND_BUF_SIZE:
            _SetSendBufSize(handle, *(uint32_t*)optval);
            break;
        case NETLIB_OPT_SET_RECV_BUF_SIZE:
            _SetRecvBufSize(handle, *(uint32_t*)optval);
            break;
        default:
            break;
    }
    return NETLIB_OK;
}

int netlib_send(net_handle_t handle, void* buf, int len)
{
    return send(handle, (char*)buf, len, 0);
}

int netlib_recv(net_handle_t handle, void* buf, int len)
{
    return recv(handle, (char*)buf, len, 0);
}


int netlib_register_timer(callback_t callback, void* user_data, uint64_t interval)
{
    long int sec = interval/1000L;
    long int usec = interval%1000L;
    struct timeval t = {sec, usec};
    auto arg = new EvtArg(callback, user_data);
    struct event* ev = event_new(g_libevent_base, -1, EV_PERSIST, netlib_ontimer, arg);
    event_add(ev, &t);
    g_timer_map[callback] = ev;
    return 0;
}

int netlib_delete_timer(callback_t callback, void* user_data)
{
    auto it = g_timer_map.find(callback);
    if (it != g_timer_map.end()) {
        auto ev = it->second;
        g_timer_map.erase(it);
        event_free(ev);
    }
    return 0;
}

void netlib_ontimer(evutil_socket_t fd, short what, void* arg)
{
    EvtArg* evtArg = (EvtArg*)arg;
    evtArg->callback(evtArg->cbdata, NETLIB_MSG_TIMER, 0, NULL);
}

int netlib_add_loop(callback_t callback, void* user_data)
{
    struct timeval t = {0, 100};
    auto arg = new EvtArg(callback, user_data);
    struct event* ev = event_new(g_libevent_base, -1, EV_PERSIST, netlib_ontimer, arg);
    event_add(ev, &t);
    // g_timer_map[callback] = ev;
    return 0;
}

void netlib_eventloop(uint32_t wait_timeout)
{
    event_base_dispatch(g_libevent_base);
}

void netlib_stop_event()
{
    event_base_loopbreak(g_libevent_base);
}

bool netlib_is_running()
{
    bool ret = !event_base_got_break(g_libevent_base);
    return ret;
}



#endif



上面的代碼用了宏控制選擇編譯老的netlib庫還是新的被libevent修改的庫,老的庫可能跟你的也略有不同,因為我用了c++11的智能指針改造了一下。代碼沒有注釋,如果你對libevent和socket挺熟悉的話我相信能很快看懂的,如果不熟,那就直接替換你的netlib.cpp吧,我自己已經自測過沒有問題,.h頭文件不需要修改。


另外還有個地方需要說明一下,我把原來BaseSocket.cpp里面的OnRead和OnWrite里的出錯檢查給去掉了,因為我覺得也讓我不爽,也就是下面這兩段代碼,并且把連接事件明確的和onaccept以及onconfirm對應起來,所以不再需要在onread和onwrite里來判斷當前 socket狀態是listening還是connecting來選擇accept和confirm了


u_long avail = 0;
        if ( (ioctlsocket(m_socket, FIONREAD, &avail) == SOCKET_ERROR) || (avail == 0) )
        {
            m_callback(m_callback_data, NETLIB_MSG_CLOSE, (net_handle_t)m_socket, NULL);
        }
#if ((defined _WIN32) || (defined __APPLE__))
    CEventDispatch::Instance()->RemoveEvent(m_socket, SOCKET_WRITE);
#endif

    if (m_state == SOCKET_STATE_CONNECTING)
    {
        int error = 0;
        socklen_t len = sizeof(error);
#ifdef _WIN32

        getsockopt(m_socket, SOL_SOCKET, SO_ERROR, (char*)&error, &len);
#else
        getsockopt(m_socket, SOL_SOCKET, SO_ERROR, (void*)&error, &len);
#endif
        if (error) {
            m_callback(m_callback_data, NETLIB_MSG_CLOSE, (net_handle_t)m_socket, NULL);
        } else {
            m_state = SOCKET_STATE_CONNECTED;



那么對于socket讀寫異常的判斷在哪里做的呢?答案是交給netlib的調用者做。

”臥槽,你不是說不要碰netlib上層的代碼嗎!?“

”但是我還是碰了,因為不碰不能爽。。“

請自己搜一下你代碼里調用netlib_recv和netlib_send的地方,然后略作如下修改

void CImConn::OnRead()
{
    for (;;)
    {
        uint32_t free_buf_len = m_in_buf.GetAllocSize() - m_in_buf.GetWriteOffset();
        if (free_buf_len < READ_BUF_SIZE)
            m_in_buf.Extend(READ_BUF_SIZE);

        int ret = netlib_recv(m_handle, m_in_buf.GetBuffer() + m_in_buf.GetWriteOffset(), READ_BUF_SIZE);
        if (ret == 0) {
            log("close on netlib_recv=0");
            OnClose();
            return;
        } else if (ret < 0) {
            if (errno == EAGAIN || errno == EWOULDBLOCK) {
                break;
            } else {
                log("close on error=%d", errno);
                OnClose();
                return;
            }
        }



void CImConn::OnWrite()
{
    // CAutoLock autoLock(&s_send_lock);
    if (!m_busy)
        return;

    while (m_out_buf.GetWriteOffset() > 0) {
        int send_size = m_out_buf.GetWriteOffset();
        if (send_size > NETLIB_MAX_SOCKET_BUF_SIZE) {
            send_size = NETLIB_MAX_SOCKET_BUF_SIZE;
        }

        int ret = netlib_send(m_handle, m_out_buf.GetBuffer(), send_size);
        if (ret <= 0) {
            if (errno == EAGAIN || errno == EWOULDBLOCK) {
                break;
            } else {
                log("close on error=%d", errno);
                OnClose();
                return;
            }
        }


好了,就到這里了,未來如果我發現改動有什么bug的話,我會更新這篇博文,請關注我即可。

來自:http://my.oschina.net/u/877397/blog/500660

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!