用libevent改造teamtalk底層網絡框架
因為之前群友對teamtalk的底層網絡框架性能諸多抱怨,并且藍狐大大也曾跟我說起過想用libevent重寫底層網絡框架,剛好最近我想用 hiredis的異步連接客戶端給teamtalk加我自己的功能,而hiredis的異步客戶端只能適配libevent, libev, libuv等幾個知名的異步框架。如果我想用只有兩個辦法,一,自己給hiredis新增一個teamtalk的異步網絡適配器, 二, 用libevent或者libuv重寫teamtalk底層網絡框架。
想到改寫teamtalk底層網絡框架并非易事,有可能遷一發動全身,然后就玩大了,所以我就先去看了hiredis的源碼,看了大半天結果發現還是回去改造teamtalk吧。。。
然后就開始在幾個網絡框架里選擇用哪一個來改寫,libev首先被剔除了,原因我就不講了,你懂的。然后對我來講,因為鄙人的node.js功力非常深厚,而libuv則是node.js的底層網絡庫,所以選擇libuv自然應該是理所當然的了。然后就開始弄了,弄了老半天結果發現真的玩大了,至于為什么你可以去嘗試一下,也許你可以的,我是放棄了,然后就還是libevent了。
“你這貨廢話怎么這么多!!!”
“因為其實我不是程序員,鄙人日常工作是拉皮條的。”
關于teamtalk的底層網絡框架的性能問題,我這里先來舉個例子做個說明
int netlib_send(net_handle_t handle, void* buf, int len)
{
CBaseSocket* pSocket = FindBaseSocket(handle);
if (!pSocket)
{
return NETLIB_ERROR;
}
int ret = pSocket->Send(buf, len);
pSocket->ReleaseRef();
return ret;
} 上面這段代碼是teamtalk的原始netlib庫的發送函數,所有功能要發送報文最終都會調到這個函數,然后你就可以看到了,這段函數每次發送數據前都要FindBaseSocket一把,當你的并發連接不多時這個查找倒也無所謂,但如果這時你有十萬個并發連接,那么你每次哪怕是發一個小包包都要去這十萬個basesocket里面找一把,這個就讓人不那么爽了(盡管map查找似乎還挺快,然而也并沒有什么軟用)。
所以,如果你不爽,那就想辦法讓自己爽起來吧,人生苦短,請及時行樂。
怎么才能爽起來呢?必須把這里的find給弄掉,并且還不能碰netlib棧以上的代碼,也就是說千萬別去碰CImConn,如果你連那個也改了,相信我你肯定不會爽起來的。不知道CImConn的同學可以讀一下我前面的博文。
所以,原則就定下來了,只能改動netlib內部的實現,不要變動其對外的接口,如果做到這一點,你最終只需要改動很少的代碼并讓自己爽起來。
“好了,今天就到此為止吧,該說的都講清楚了,我特么真是個寫博客的人才~”
”臥槽,你還沒講怎么改呢!@!“
”不是說了么,只改netlib內部實現,你就能嗨起來了。“
”擦!“
好吧,為了不被噴,我還是把我自己改的代碼貢獻出來吧
#include "netlib.h"
#include "BaseSocket.h"
#include "EventDispatch.h"
#define __LIBEVENT__
#ifndef __LIBEVENT__
int netlib_init()
{
int ret = NETLIB_OK;
#ifdef _WIN32
WSADATA wsaData;
WORD wReqest = MAKEWORD(1, 1);
if (WSAStartup(wReqest, &wsaData) != 0)
{
ret = NETLIB_ERROR;
}
#endif
return ret;
}
int netlib_destroy()
{
int ret = NETLIB_OK;
#ifdef _WIN32
if (WSACleanup() != 0)
{
ret = NETLIB_ERROR;
}
#endif
return ret;
}
int netlib_listen(
const char* server_ip,
uint16_t port,
callback_t callback,
void* callback_data)
{
auto spSocket = sp_CBaseSocket(new CBaseSocket());
if (!spSocket)
return NETLIB_ERROR;
int ret = spSocket->Listen(server_ip, port, callback, callback_data);
// if (ret == NETLIB_ERROR)
// delete pSocket;
return ret;
}
net_handle_t netlib_connect(
const char* server_ip,
uint16_t port,
callback_t callback,
void* callback_data)
{
auto spSocket = sp_CBaseSocket(new CBaseSocket());
if (!spSocket)
return NETLIB_INVALID_HANDLE;
net_handle_t handle = spSocket->Connect(server_ip, port, callback, callback_data);
// if (handle == NETLIB_INVALID_HANDLE)
// delete pSocket;
return handle;
}
int netlib_send(net_handle_t handle, void* buf, int len)
{
auto spSocket = FindBaseSocket(handle);
if (!spSocket)
{
return NETLIB_ERROR;
}
int ret = spSocket->Send(buf, len);
// pSocket->ReleaseRef();
return ret;
}
int netlib_recv(net_handle_t handle, void* buf, int len)
{
auto spSocket = FindBaseSocket(handle);
if (!spSocket)
return NETLIB_ERROR;
int ret = spSocket->Recv(buf, len);
// pSocket->ReleaseRef();
return ret;
}
int netlib_close(net_handle_t handle)
{
auto spSocket = FindBaseSocket(handle);
if (!spSocket)
return NETLIB_ERROR;
int ret = spSocket->Close();
// pSocket->ReleaseRef();
return ret;
}
int netlib_option(net_handle_t handle, int opt, void* optval)
{
auto spSocket = FindBaseSocket(handle);
if (!spSocket)
return NETLIB_ERROR;
if ((opt >= NETLIB_OPT_GET_REMOTE_IP) && !optval)
return NETLIB_ERROR;
switch (opt)
{
case NETLIB_OPT_SET_CALLBACK:
spSocket->SetCallback((callback_t)optval);
break;
case NETLIB_OPT_SET_CALLBACK_DATA:
spSocket->SetCallbackData(optval);
break;
case NETLIB_OPT_GET_REMOTE_IP:
*(string*)optval = spSocket->GetRemoteIP();
break;
case NETLIB_OPT_GET_REMOTE_PORT:
*(uint16_t*)optval = spSocket->GetRemotePort();
break;
case NETLIB_OPT_GET_LOCAL_IP:
*(string*)optval = spSocket->GetLocalIP();
break;
case NETLIB_OPT_GET_LOCAL_PORT:
*(uint16_t*)optval = spSocket->GetLocalPort();
break;
case NETLIB_OPT_SET_SEND_BUF_SIZE:
spSocket->SetSendBufSize(*(uint32_t*)optval);
break;
case NETLIB_OPT_SET_RECV_BUF_SIZE:
spSocket->SetRecvBufSize(*(uint32_t*)optval);
break;
}
// pSocket->ReleaseRef();
return NETLIB_OK;
}
int netlib_register_timer(callback_t callback, void* user_data, uint64_t interval)
{
CEventDispatch::Instance()->AddTimer(callback, user_data, interval);
return 0;
}
int netlib_delete_timer(callback_t callback, void* user_data)
{
CEventDispatch::Instance()->RemoveTimer(callback, user_data);
return 0;
}
int netlib_add_loop(callback_t callback, void* user_data)
{
CEventDispatch::Instance()->AddLoop(callback, user_data);
return 0;
}
void netlib_eventloop(uint32_t wait_timeout)
{
CEventDispatch::Instance()->StartDispatch(wait_timeout);
}
void netlib_stop_event()
{
CEventDispatch::Instance()->StopDispatch();
}
bool netlib_is_running()
{
return CEventDispatch::Instance()->isRunning();
}
#else
#include <unordered_map>
#include <event2/event.h>
static unordered_map<net_handle_t, struct event*> g_read_event_map;
static unordered_map<net_handle_t, struct event*> g_write_event_map;
static unordered_map<callback_t, struct event*> g_timer_map;
struct event_base* g_libevent_base;
static string _GetRemoteIP(net_handle_t hd);
static uint16_t _GetRemotePort(net_handle_t hd);
static string _GetLocalIP(net_handle_t hd);
static uint16_t _GetLocalPort(net_handle_t hd);
static void _SetSendBufSize(net_handle_t hd, uint32_t send_size);
static void _SetRecvBufSize(net_handle_t hd, uint32_t recv_size);
static int _GetErrorCode();
static void _SetNonblock(SOCKET fd);
static bool _IsBlock(int error_code);
static void _SetReuseAddr(SOCKET fd);
static void _SetNoDelay(SOCKET fd);
static void _SetAddr(const char* ip, const uint16_t port, sockaddr_in* pAddr);
void netlib_onconfirm(evutil_socket_t fd, short what, void *arg);
void netlib_onread(evutil_socket_t fd, short what, void *arg);
void netlib_onwrite(evutil_socket_t fd, short what, void *arg);
void netlib_onaccept(evutil_socket_t fd, short what, void *arg);
void netlib_ontimer(evutil_socket_t fd, short what, void* arg);
void netlib_check_write_error(net_handle_t fd, int* error, socklen_t* len);
void netlib_set_onconnect_event(net_handle_t handle, callback_t callback, void* cbdata);
struct EvtArg {
callback_t callback;
void* cbdata;
EvtArg(callback_t c, void* d) : callback(c), cbdata(d) {}
~EvtArg() {}
};
struct EvtArg2 {
callback_t callback;
void* cbdata;
struct event* evt;
EvtArg2(callback_t c, void* d, struct event* e) : callback(c), cbdata(d), evt(e) {}
~EvtArg2() {}
};
static int _GetErrorCode()
{
#ifdef _WIN32
return WSAGetLastError();
#else
return errno;
#endif
}
static void _SetNonblock(SOCKET fd)
{
#ifdef _WIN32
u_long nonblock = 1;
int ret = ioctlsocket(fd, FIONBIO, &nonblock);
#else
int ret = fcntl(fd, F_SETFL, O_NONBLOCK | fcntl(fd, F_GETFL));
#endif
if (ret == SOCKET_ERROR)
{
log("_SetNonblock failed, err_code=%d", _GetErrorCode());
}
}
static bool _IsBlock(int error_code)
{
#ifdef _WIN32
return ( (error_code == WSAEINPROGRESS) || (error_code == WSAEWOULDBLOCK) );
#else
return ( (error_code == EINPROGRESS) || (error_code == EWOULDBLOCK) );
#endif
}
static void _SetReuseAddr(SOCKET fd)
{
int reuse = 1;
int ret = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char*)&reuse, sizeof(reuse));
if (ret == SOCKET_ERROR)
{
log("_SetReuseAddr failed, err_code=%d", _GetErrorCode());
}
}
static void _SetNoDelay(SOCKET fd)
{
int nodelay = 1;
int ret = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char*)&nodelay, sizeof(nodelay));
if (ret == SOCKET_ERROR)
{
log("_SetNoDelay failed, err_code=%d", _GetErrorCode());
}
}
static void _SetAddr(const char* ip, const uint16_t port, sockaddr_in* pAddr)
{
memset(pAddr, 0, sizeof(sockaddr_in));
pAddr->sin_family = AF_INET;
pAddr->sin_port = htons(port);
pAddr->sin_addr.s_addr = inet_addr(ip);
if (pAddr->sin_addr.s_addr == INADDR_NONE)
{
hostent* host = gethostbyname(ip);
if (host == NULL)
{
log("gethostbyname failed, ip=%s", ip);
return;
}
pAddr->sin_addr.s_addr = *(uint32_t*)host->h_addr;
}
}
int netlib_init()
{
int ret = NETLIB_OK;
#ifdef _WIN32
WSADATA wsaData;
WORD wReqest = MAKEWORD(1, 1);
if (WSAStartup(wReqest, &wsaData) != 0) {
ret = NETLIB_ERROR;
}
#endif
g_libevent_base = event_base_new();
return ret;
}
int netlib_destroy()
{
int ret = NETLIB_OK;
#ifdef _WIN32
if (WSACleanup() != 0) {
ret = NETLIB_ERROR;
}
#endif
event_base_free(g_libevent_base);
return ret;
}
net_handle_t netlib_connect(
const char* server_ip,
uint16_t port,
callback_t callback,
void* callback_data)
{
net_handle_t sock = socket(AF_INET, SOCK_STREAM, 0);
if (sock == INVALID_SOCKET) {
loge("socket failed, err_code=%d", _GetErrorCode());
return NETLIB_INVALID_HANDLE;
}
_SetNonblock(sock);
_SetNoDelay(sock);
sockaddr_in serv_addr;
_SetAddr(server_ip, port, &serv_addr);
int ret = connect(sock, (sockaddr*)&serv_addr, sizeof(serv_addr));
if ( (ret == SOCKET_ERROR) && (!_IsBlock(_GetErrorCode())) ) {
loge("connect failed, err_code=%d", _GetErrorCode());
closesocket(sock);
return NETLIB_INVALID_HANDLE;
}
auto evtArg2 = new EvtArg2(callback, callback_data, NULL);
evtArg2->evt = event_new(g_libevent_base, sock, EV_WRITE, netlib_onconfirm, evtArg2);
event_add(evtArg2->evt, NULL);
return sock;
}
int netlib_close(net_handle_t handle)
{
auto it = g_read_event_map.find(handle);
if (it != g_read_event_map.end()) {
auto ev = it->second;
g_read_event_map.erase(it);
event_free(ev);
}
auto it2 = g_write_event_map.find(handle);
if (it2 != g_write_event_map.end()) {
auto ev = it2->second;
g_write_event_map.erase(it2);
event_free(ev);
}
closesocket(handle);
return 0;
}
int netlib_listen(
const char* server_ip,
uint16_t port,
callback_t callback,
void* callback_data)
{
auto sock = socket(AF_INET, SOCK_STREAM, 0);
if (sock == INVALID_SOCKET) {
printf("socket failed, err_code=%d\n", _GetErrorCode());
return NETLIB_ERROR;
}
_SetReuseAddr(sock);
_SetNonblock(sock);
sockaddr_in serv_addr;
_SetAddr(server_ip, port, &serv_addr);
int ret = bind(sock, (sockaddr*)&serv_addr, sizeof(serv_addr));
if (ret == SOCKET_ERROR) {
loge("bind failed, err_code=%d", _GetErrorCode());
closesocket(sock);
return NETLIB_ERROR;
}
ret = listen(sock, 64);
if (ret == SOCKET_ERROR) {
loge("listen failed, err_code=%d", _GetErrorCode());
closesocket(sock);
return NETLIB_ERROR;
}
auto evtArg = new EvtArg(callback, callback_data);
struct event* ev = event_new(g_libevent_base, sock, EV_READ|EV_PERSIST, netlib_onaccept, evtArg);
event_add(ev, NULL);
return NETLIB_OK;
}
void netlib_onaccept(evutil_socket_t fd, short what, void *arg)
{
sockaddr_in peer_addr;
socklen_t addr_len = sizeof(sockaddr_in);
char ip_str[64];
while ( (fd = accept(fd, (sockaddr*)&peer_addr, &addr_len)) != INVALID_SOCKET )
{
_SetNoDelay(fd);
_SetNonblock(fd);
auto evtArg = (EvtArg*)arg;
evtArg->callback(evtArg->cbdata, NETLIB_MSG_CONNECT, (net_handle_t)fd, NULL);
}
}
void netlib_check_write_error(net_handle_t fd, int* error, socklen_t* len)
{
#ifdef _WIN32
getsockopt(fd, SOL_SOCKET, SO_ERROR, (char*)error, len);
#else
getsockopt(fd, SOL_SOCKET, SO_ERROR, (void*)error, len);
#endif
}
void netlib_onconfirm(evutil_socket_t fd, short what, void *arg)
{
auto evtArg2 = (EvtArg2*)arg;
int error = 0;
socklen_t len = sizeof(error);
netlib_check_write_error((net_handle_t)fd, &error, &len);
if (error) {
evtArg2->callback(evtArg2->cbdata, NETLIB_MSG_CLOSE, (net_handle_t)fd, NULL);
} else {
event_free(evtArg2->evt);
auto arg = new EvtArg(evtArg2->callback, evtArg2->cbdata);
struct event* evread = event_new(g_libevent_base, fd, EV_READ|EV_PERSIST|EV_ET, netlib_onread, arg);
struct event* evwrite = event_new(g_libevent_base, fd, EV_WRITE|EV_PERSIST|EV_ET, netlib_onwrite, arg);
event_add(evread, NULL);
event_add(evwrite, NULL);
g_read_event_map[fd] = evread;
g_write_event_map[fd] = evwrite;
evtArg2->callback(evtArg2->cbdata, NETLIB_MSG_CONFIRM, (net_handle_t)fd, NULL);
}
}
void netlib_onread(evutil_socket_t fd, short what, void *arg)
{
auto evtArg = (EvtArg*)arg;
evtArg->callback(evtArg->cbdata, NETLIB_MSG_READ, (net_handle_t)fd, NULL);
}
void netlib_onwrite(evutil_socket_t fd, short what, void *arg)
{
auto evtArg = (EvtArg*)arg;
evtArg->callback(evtArg->cbdata, NETLIB_MSG_WRITE, (net_handle_t)fd, NULL);
}
void netlib_set_onconnect_event(net_handle_t handle, callback_t callback, void* cbdata)
{
auto arg = new EvtArg(callback, cbdata);
struct event* evread = event_new(g_libevent_base, handle, EV_READ|EV_PERSIST|EV_ET, netlib_onread, arg);
struct event* evwrite = event_new(g_libevent_base, handle, EV_WRITE|EV_PERSIST|EV_ET, netlib_onwrite, arg);
event_add(evread, NULL);
event_add(evwrite, NULL);
g_read_event_map[handle] = evread;
g_write_event_map[handle] = evwrite;
}
string _GetRemoteIP(net_handle_t hd)
{
struct sockaddr_in sa;
socklen_t len = sizeof(sa);
if (!getpeername(hd, (struct sockaddr*)&sa, &len)) {
return inet_ntoa(sa.sin_addr);
} else {
return "";
}
}
uint16_t _GetRemotePort(net_handle_t hd)
{
struct sockaddr_in sa;
socklen_t len = sizeof(sa);
if (!getpeername(hd, (struct sockaddr*)&sa, &len)) {
return ntohs(sa.sin_port);
} else {
return 0;
}
}
string _GetLocalIP(net_handle_t hd)
{
struct sockaddr_in sa;
socklen_t len = sizeof(sa);
if (!getsockname(hd, (struct sockaddr*)&sa, &len)) {
return inet_ntoa(sa.sin_addr);
} else {
return "";
}
}
uint16_t _GetLocalPort(net_handle_t hd)
{
struct sockaddr_in sa;
socklen_t len = sizeof(sa);
if (!getsockname(hd, (struct sockaddr*)&sa, &len)) {
return ntohs(sa.sin_port);
} else {
return 0;
}
}
void _SetSendBufSize(net_handle_t hd, uint32_t send_size)
{
int ret = setsockopt(hd, SOL_SOCKET, SO_SNDBUF, &send_size, 4);
if (ret == SOCKET_ERROR) {
loge("set SO_SNDBUF failed for fd=%d", hd);
}
socklen_t len = 4;
int size = 0;
getsockopt(hd, SOL_SOCKET, SO_SNDBUF, &size, &len);
loge("socket=%d send_buf_size=%d", hd, size);
}
void _SetRecvBufSize(net_handle_t hd, uint32_t recv_size)
{
int ret = setsockopt(hd, SOL_SOCKET, SO_RCVBUF, &recv_size, 4);
if (ret == SOCKET_ERROR) {
loge("set SO_RCVBUF failed for fd=%d", hd);
}
socklen_t len = 4;
int size = 0;
getsockopt(hd, SOL_SOCKET, SO_RCVBUF, &size, &len);
loge("socket=%d recv_buf_size=%d", hd, size);
}
int netlib_option(net_handle_t handle, int opt, void* optval)
{
static callback_t cb;
if ((opt >= NETLIB_OPT_GET_REMOTE_IP) && !optval)
return NETLIB_ERROR;
switch (opt) {
case NETLIB_OPT_SET_CALLBACK:
cb = (callback_t)optval;
break;
case NETLIB_OPT_SET_CALLBACK_DATA:
netlib_set_onconnect_event(handle, cb, optval);
break;
case NETLIB_OPT_GET_REMOTE_IP:
*(string*)optval = _GetRemoteIP(handle);
break;
case NETLIB_OPT_GET_REMOTE_PORT:
*(uint16_t*)optval = _GetRemotePort(handle);
break;
case NETLIB_OPT_GET_LOCAL_IP:
*(string*)optval = _GetLocalIP(handle);
break;
case NETLIB_OPT_GET_LOCAL_PORT:
*(uint16_t*)optval = _GetLocalPort(handle);
break;
case NETLIB_OPT_SET_SEND_BUF_SIZE:
_SetSendBufSize(handle, *(uint32_t*)optval);
break;
case NETLIB_OPT_SET_RECV_BUF_SIZE:
_SetRecvBufSize(handle, *(uint32_t*)optval);
break;
default:
break;
}
return NETLIB_OK;
}
int netlib_send(net_handle_t handle, void* buf, int len)
{
return send(handle, (char*)buf, len, 0);
}
int netlib_recv(net_handle_t handle, void* buf, int len)
{
return recv(handle, (char*)buf, len, 0);
}
int netlib_register_timer(callback_t callback, void* user_data, uint64_t interval)
{
long int sec = interval/1000L;
long int usec = interval%1000L;
struct timeval t = {sec, usec};
auto arg = new EvtArg(callback, user_data);
struct event* ev = event_new(g_libevent_base, -1, EV_PERSIST, netlib_ontimer, arg);
event_add(ev, &t);
g_timer_map[callback] = ev;
return 0;
}
int netlib_delete_timer(callback_t callback, void* user_data)
{
auto it = g_timer_map.find(callback);
if (it != g_timer_map.end()) {
auto ev = it->second;
g_timer_map.erase(it);
event_free(ev);
}
return 0;
}
void netlib_ontimer(evutil_socket_t fd, short what, void* arg)
{
EvtArg* evtArg = (EvtArg*)arg;
evtArg->callback(evtArg->cbdata, NETLIB_MSG_TIMER, 0, NULL);
}
int netlib_add_loop(callback_t callback, void* user_data)
{
struct timeval t = {0, 100};
auto arg = new EvtArg(callback, user_data);
struct event* ev = event_new(g_libevent_base, -1, EV_PERSIST, netlib_ontimer, arg);
event_add(ev, &t);
// g_timer_map[callback] = ev;
return 0;
}
void netlib_eventloop(uint32_t wait_timeout)
{
event_base_dispatch(g_libevent_base);
}
void netlib_stop_event()
{
event_base_loopbreak(g_libevent_base);
}
bool netlib_is_running()
{
bool ret = !event_base_got_break(g_libevent_base);
return ret;
}
#endif
另外還有個地方需要說明一下,我把原來BaseSocket.cpp里面的OnRead和OnWrite里的出錯檢查給去掉了,因為我覺得也讓我不爽,也就是下面這兩段代碼,并且把連接事件明確的和onaccept以及onconfirm對應起來,所以不再需要在onread和onwrite里來判斷當前 socket狀態是listening還是connecting來選擇accept和confirm了。
u_long avail = 0;
if ( (ioctlsocket(m_socket, FIONREAD, &avail) == SOCKET_ERROR) || (avail == 0) )
{
m_callback(m_callback_data, NETLIB_MSG_CLOSE, (net_handle_t)m_socket, NULL);
}#if ((defined _WIN32) || (defined __APPLE__))
CEventDispatch::Instance()->RemoveEvent(m_socket, SOCKET_WRITE);
#endif
if (m_state == SOCKET_STATE_CONNECTING)
{
int error = 0;
socklen_t len = sizeof(error);
#ifdef _WIN32
getsockopt(m_socket, SOL_SOCKET, SO_ERROR, (char*)&error, &len);
#else
getsockopt(m_socket, SOL_SOCKET, SO_ERROR, (void*)&error, &len);
#endif
if (error) {
m_callback(m_callback_data, NETLIB_MSG_CLOSE, (net_handle_t)m_socket, NULL);
} else {
m_state = SOCKET_STATE_CONNECTED;
”臥槽,你不是說不要碰netlib上層的代碼嗎!?“
”但是我還是碰了,因為不碰不能爽。。“
請自己搜一下你代碼里調用netlib_recv和netlib_send的地方,然后略作如下修改
void CImConn::OnRead()
{
for (;;)
{
uint32_t free_buf_len = m_in_buf.GetAllocSize() - m_in_buf.GetWriteOffset();
if (free_buf_len < READ_BUF_SIZE)
m_in_buf.Extend(READ_BUF_SIZE);
int ret = netlib_recv(m_handle, m_in_buf.GetBuffer() + m_in_buf.GetWriteOffset(), READ_BUF_SIZE);
if (ret == 0) {
log("close on netlib_recv=0");
OnClose();
return;
} else if (ret < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
break;
} else {
log("close on error=%d", errno);
OnClose();
return;
}
}
void CImConn::OnWrite()
{
// CAutoLock autoLock(&s_send_lock);
if (!m_busy)
return;
while (m_out_buf.GetWriteOffset() > 0) {
int send_size = m_out_buf.GetWriteOffset();
if (send_size > NETLIB_MAX_SOCKET_BUF_SIZE) {
send_size = NETLIB_MAX_SOCKET_BUF_SIZE;
}
int ret = netlib_send(m_handle, m_out_buf.GetBuffer(), send_size);
if (ret <= 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
break;
} else {
log("close on error=%d", errno);
OnClose();
return;
}
}
來自:http://my.oschina.net/u/877397/blog/500660