mongodb連接池c++ 封裝

mx3y 9年前發布 | 6K 次閱讀 C/C++ MongoDB

linux平臺下mongodb c++連接池封裝,線程安全

//函數返回0:成功 >0 出錯
class cmongo{
    public:
        //默認構造函數,默認連接數為1
        cmongo();
        //傳入連接數到構造函數,默認連接數為size
        cmongo(int size);
        //析構函數
        ~cmongo();
    public:
        //設置tcp讀寫超時時間
        int set_wr_timeout(double t);
        //連接
        int conn(string mhost="127.0.0.1",int mport=27017);
        //設置db collection
        int setdb(string mdb,string mcollection);

    int setindex(string key);
    //查詢
    int get(map<string,string>& out,vector<string> in,string key,string key_val);
    //投遞一批要查詢的字段,fields為要查詢哪些字段
    int gets(map< string,map<string,string> >& rout,vector<string> fields,vector<string> in,string key);
    //dump key-value dumpkey對應一個value
    int dumpkey(map< string,string >& rout,string key,string val);
    //dump key->map<key,value> dumpkey對應一組value
    int dumpvals(map< string,map<string,string> >& rout,vector<string> in,string key);
    //寫入
    int set(map<string,string> in,string key,string key_val);
    //批量寫入
    //更新接口,批量更新key="id"
    //  "123456":<key,value>,<key,value>
    //  "123457":<key,value>,<key,value>
    int sets(map< string,map<string,string> > in,string key);
    //刪除
    int remove(string key,string key_val);
private:
    string doc;
    //tcp讀寫超時時間
    double wr_timeout;
    pthread_mutex_t _jobmux;
    sem_t _jobsem;
    map<DBClientConnection*,bool> _joblst;
    pthread_mutex_t _dbmux;

};

cmongo::cmongo(int size){ //doc doc=string(DB_DB)+"."+string(DB_COLLECTION); wr_timeout=3; //最大連接0-200 if(size<0){ size=1; } if(size>200){ size=200; } if(_joblst.size()>0){ return; } bool auto_conn=true; pthread_mutex_init(&_jobmux,NULL); if((sem_init(&_jobsem,0,0))<0){ return; } pthread_mutex_lock(&_jobmux); for(int i=0;i<size;++i){ DBClientConnection* pconn = new DBClientConnection(auto_conn,0,wr_timeout); if(pconn != NULL){ _joblst[pconn]=false; } } pthread_mutex_unlock(&_jobmux);

} cmongo::~cmongo(){ doc=""; pthread_mutex_lock(&_jobmux); map<DBClientConnection,bool>::iterator it=_joblst.begin(); while(it != _joblst.end()){ delete it->first; it++; } pthread_mutex_unlock(&_jobmux); } int cmongo::set_wr_timeout(double t){ wr_timeout=t; return RET_OK; } int cmongo::conn(string mhost,int mport){ pthread_mutex_lock(&_jobmux); map<DBClientConnection,bool>::iterator it=_joblst.begin(); while(it!=_joblst.end()){ string errmsg=""; HostAndPort hp(mhost,mport); if(!(it->first->connect(hp,errmsg))){ cerr<<"connect mhost:"<<mhost<<" mport:"<<mport<<" msg:"<<errmsg<<endl; it->second=true; } sem_post(&_jobsem); it++; } pthread_mutex_unlock(&_jobmux); return RET_OK; } int cmongo::setdb(string mdb,string mcollection){ if(mdb.empty() || mcollection.empty()){ return RET_PARERR; } doc=mdb+"."+mcollection; return RET_OK; } int cmongo::setindex(string key){ if(key.empty()){ return RET_PARERR; }
sem_wait(&_jobsem); pthread_mutex_lock(&_jobmux); map<DBClientConnection*,bool>::iterator it=_joblst.begin(); while(it!=_joblst.end()){ if(it->second == false){ it->second=true; break; } it++; } pthread_mutex_unlock(&_jobmux); string bindex="{"+key+":1}"; it->first->ensureIndex(doc,fromjson(bindex));

pthread_mutex_lock(&_jobmux);
it->second=false;
pthread_mutex_unlock(&_jobmux);
sem_post(&_jobsem);

return RET_OK;

} //out為檢索出來的key-value數據對應,in 為要檢索的字段,key,key_value為要檢索的條件,暫不支持多條件檢索 //單列查詢 int cmongo::get(map<string,string>& out,vector<string> in,string key,string key_val){ //key key_val 要檢索字段 if(key.empty() || key_val.empty() || in.size()<=0){ return RET_PARERR; } BSONObjBuilder b; for(vector<string>::iterator iter=in.begin();iter!=in.end();++iter){ b.append(*iter,1); }

sem_wait(&_jobsem);
pthread_mutex_lock(&_jobmux);
map<DBClientConnection*,bool>::iterator it=_joblst.begin();
while(it!=_joblst.end()){
    if(it->second == false){
        it->second=true;
        break;
    }
    it++;
}
pthread_mutex_unlock(&_jobmux);
BSONObj ob=b.obj();

BSONObj p=it->first->findOne(doc,QUERY(key<<key_val),&ob);

map<string,string> temp;
for(vector<string>::iterator iter=in.begin();iter!=in.end();++iter){
    string mkey=*iter;
    temp[*iter]=p.getStringField(mkey.c_str());
}
out=temp;

pthread_mutex_lock(&_jobmux);
it->second=false;
pthread_mutex_unlock(&_jobmux);
sem_post(&_jobsem);

return RET_OK;

} //查詢key為key的一批數據的 某些字段 //fields為要查詢的字段集 //key="id" 值為in 一批key //返回key->map<key,value> int cmongo::gets(map< string,map<string,string> >& rout,vector<string> fields,vector<string> in,string key){ if(key.empty()){ return RET_PARERR; } sem_wait(&_jobsem); pthread_mutex_lock(&_jobmux); map<DBClientConnection*,bool>::iterator it=_joblst.begin(); while(it!=_joblst.end()){ if(it->second == false){ it->second=true; break; } it++; } pthread_mutex_unlock(&_jobmux);

BSONObjBuilder b;
b.append(key,1);
for(vector<string>::iterator iter=fields.begin();iter!=fields.end();++iter){
    b.append(*iter,1);
}

BSONObj p=b.obj();
for(vector<string>::iterator iter2=in.begin();iter2!=in.end();++iter2){
    BSONObj ob=it->first->findOne(doc,QUERY(key<<*iter2),&p);
    map<string,string> temp;
    for(vector<string>::iterator iter=fields.begin();iter!=fields.end();++iter){
        string mkey=*iter;
        temp[*iter]=ob.getStringField(mkey.c_str());   
    }
    rout[ob.getStringField(key.c_str())]=temp;
}

pthread_mutex_lock(&_jobmux);
it->second=false;
pthread_mutex_unlock(&_jobmux);
sem_post(&_jobsem);

return RET_OK;

} //dumpkey key-value 返回 key對應的val值 //key val int cmongo::dumpkey(map< string,string >& rout,string key,string val){ if(key.empty()){ return RET_PARERR; } sem_wait(&_jobsem); pthread_mutex_lock(&_jobmux); map<DBClientConnection*,bool>::iterator it=_joblst.begin(); while(it!=_joblst.end()){ if(it->second == false){ it->second=true; break; } it++; } pthread_mutex_unlock(&_jobmux);

BSONObjBuilder b;
b.append(key,1);
if(!val.empty()){
    b.append(val,1);
}

BSONObj p=b.obj();

pthread_mutex_lock(&_dbmux);
auto_ptr<DBClientCursor> cursor = it->first->query(doc,Query(),0,0,&p);
while(cursor->more()){
    BSONObj ob=cursor->next();
    rout[ob.getStringField(key.c_str())]=ob.getStringField(val.c_str());
}
pthread_mutex_unlock(&_dbmux);

pthread_mutex_lock(&_jobmux);
it->second=false;
pthread_mutex_unlock(&_jobmux);
sem_post(&_jobsem);

return RET_OK;

} //dumpkey key對應多個value //key->map<key,value>. //其實dumpvals接口完全可以包含dumpkey,為了方便運用獨立出來 //out 返回的key 對應的map<key,value> //in 每個key需要對應的返回哪些字段 //key="id" int cmongo::dumpvals(map< string,map<string,string> >& rout,vector<string> in,string key){ if(key.empty()){ return RET_PARERR; } sem_wait(&_jobsem); pthread_mutex_lock(&_jobmux); map<DBClientConnection*,bool>::iterator it=_joblst.begin(); while(it!=_joblst.end()){ if(it->second == false){ it->second=true; break; } it++; } pthread_mutex_unlock(&_jobmux);

BSONObjBuilder b;
b.append(key,1);
for(vector<string>::iterator iter=in.begin();iter!=in.end();++iter){
    b.append(*iter,1);
}

BSONObj p=b.obj();

pthread_mutex_lock(&_dbmux);
auto_ptr<DBClientCursor> cursor = it->first->query(doc,Query(),0,0,&p);
while(cursor->more()){
    BSONObj ob=cursor->next();
    map<string,string> temp;
    for(vector<string>::iterator iter=in.begin();iter!=in.end();++iter){
        string val=*iter;
        temp[val]=ob.getStringField(val.c_str());
    }
    rout[ob.getStringField(key.c_str())]=temp;
    temp.clear();
}
pthread_mutex_unlock(&_dbmux);

pthread_mutex_lock(&_jobmux);
it->second=false;
pthread_mutex_unlock(&_jobmux);
sem_post(&_jobsem);

return RET_OK;

} //更新接口,暫不支持key對應多條記錄的更新 int cmongo::set(map<string,string> in,string key,string key_val){ //如果map沒有數據,返回參數錯誤 if(in.size()<=0 || key.empty() || key_val.empty()){ return RET_PARERR; } BSONObjBuilder b; map<string,string>::iterator iter; for(iter=in.begin();iter!=in.end();++iter){ b.append(iter->first,iter->second); }

sem_wait(&_jobsem);
pthread_mutex_lock(&_jobmux);
map<DBClientConnection*,bool>::iterator it=_joblst.begin();
while(it!=_joblst.end()){
    if(it->second == false){
        it->second=true;
        break;
    }
    it++;
}
pthread_mutex_unlock(&_jobmux);
BSONObj ob=b.obj();
it->first->update(doc,QUERY(key<<key_val),BSON("$set"<<ob),true);

int ret=RET_OK;
string errmsg=it->first->getLastError();
if(!errmsg.empty()){
    ret=RET_ERR;
}

pthread_mutex_lock(&_jobmux);
it->second=false;
pthread_mutex_unlock(&_jobmux);
sem_post(&_jobsem);

return ret;

} //更新接口,批量更新key="id" // "123456":<key,value>,<key,value> // "123457":<key,value>,<key,value> int cmongo::sets(map< string,map<string,string> > in,string key){ //如果map沒有數據,返回參數錯誤 if(in.size()<=0 || key.empty() ){ return RET_PARERR; }

sem_wait(&_jobsem);
pthread_mutex_lock(&_jobmux);
map<DBClientConnection*,bool>::iterator it=_joblst.begin();
while(it!=_joblst.end()){
    if(it->second == false){
        it->second=true;
        break;
    }
    it++;
}
pthread_mutex_unlock(&_jobmux);

int ret=RET_OK;
map< string,map<string,string> >::iterator iter;
for(iter=in.begin();iter!=in.end();++iter){
    BSONObjBuilder b;
    for(map<string,string>::iterator iter2=iter->second.begin();iter2!=iter->second.end();++iter2){
        b.append(iter2->first,iter2->second);
    }
    BSONObj ob=b.obj();
    it->first->update(doc,QUERY(key<<iter->first),BSON("$set"<<ob),true);
    string errmsg=it->first->getLastError();
    if(!errmsg.empty()){
        ret=RET_ERR;
    }
}


pthread_mutex_lock(&_jobmux);
it->second=false;
pthread_mutex_unlock(&_jobmux);
sem_post(&_jobsem);

return ret;

} //刪除接口,刪除記錄 key=id key_val=587.即刪除id="587"的記錄 int cmongo::remove(string key,string key_val){

if(key.empty() || key_val.empty()){
    return RET_PARERR;
}
sem_wait(&_jobsem);
pthread_mutex_lock(&_jobmux);
map<DBClientConnection*,bool>::iterator it=_joblst.begin();
while(it!=_joblst.end()){
    if(it->second == false){
        it->second=true;
        break;
    }
    it++;
}
pthread_mutex_unlock(&_jobmux);

it->first->remove(doc,BSON(key << key_val));

pthread_mutex_lock(&_jobmux);
it->second=false;
pthread_mutex_unlock(&_jobmux);
sem_post(&_jobsem);

return RET_OK;

} </pre>

 本文由用戶 mx3y 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!