mongodb連接池c++ 封裝
linux平臺下mongodb c++連接池封裝,線程安全
//函數返回0:成功 >0 出錯 class cmongo{ public: //默認構造函數,默認連接數為1 cmongo(); //傳入連接數到構造函數,默認連接數為size cmongo(int size); //析構函數 ~cmongo(); public: //設置tcp讀寫超時時間 int set_wr_timeout(double t); //連接 int conn(string mhost="127.0.0.1",int mport=27017); //設置db collection int setdb(string mdb,string mcollection);int setindex(string key); //查詢 int get(map<string,string>& out,vector<string> in,string key,string key_val); //投遞一批要查詢的字段,fields為要查詢哪些字段 int gets(map< string,map<string,string> >& rout,vector<string> fields,vector<string> in,string key); //dump key-value dumpkey對應一個value int dumpkey(map< string,string >& rout,string key,string val); //dump key->map<key,value> dumpkey對應一組value int dumpvals(map< string,map<string,string> >& rout,vector<string> in,string key); //寫入 int set(map<string,string> in,string key,string key_val); //批量寫入 //更新接口,批量更新key="id" // "123456":<key,value>,<key,value> // "123457":<key,value>,<key,value> int sets(map< string,map<string,string> > in,string key); //刪除 int remove(string key,string key_val); private: string doc; //tcp讀寫超時時間 double wr_timeout; pthread_mutex_t _jobmux; sem_t _jobsem; map<DBClientConnection*,bool> _joblst; pthread_mutex_t _dbmux;
};
cmongo::cmongo(int size){ //doc doc=string(DB_DB)+"."+string(DB_COLLECTION); wr_timeout=3; //最大連接0-200 if(size<0){ size=1; } if(size>200){ size=200; } if(_joblst.size()>0){ return; } bool auto_conn=true; pthread_mutex_init(&_jobmux,NULL); if((sem_init(&_jobsem,0,0))<0){ return; } pthread_mutex_lock(&_jobmux); for(int i=0;i<size;++i){ DBClientConnection* pconn = new DBClientConnection(auto_conn,0,wr_timeout); if(pconn != NULL){ _joblst[pconn]=false; } } pthread_mutex_unlock(&_jobmux);
} cmongo::~cmongo(){ doc=""; pthread_mutex_lock(&_jobmux); map<DBClientConnection,bool>::iterator it=_joblst.begin(); while(it != _joblst.end()){ delete it->first; it++; } pthread_mutex_unlock(&_jobmux); } int cmongo::set_wr_timeout(double t){ wr_timeout=t; return RET_OK; } int cmongo::conn(string mhost,int mport){ pthread_mutex_lock(&_jobmux); map<DBClientConnection,bool>::iterator it=_joblst.begin(); while(it!=_joblst.end()){ string errmsg=""; HostAndPort hp(mhost,mport); if(!(it->first->connect(hp,errmsg))){ cerr<<"connect mhost:"<<mhost<<" mport:"<<mport<<" msg:"<<errmsg<<endl; it->second=true; } sem_post(&_jobsem); it++; } pthread_mutex_unlock(&_jobmux); return RET_OK; } int cmongo::setdb(string mdb,string mcollection){ if(mdb.empty() || mcollection.empty()){ return RET_PARERR; } doc=mdb+"."+mcollection; return RET_OK; } int cmongo::setindex(string key){ if(key.empty()){ return RET_PARERR; }
sem_wait(&_jobsem); pthread_mutex_lock(&_jobmux); map<DBClientConnection*,bool>::iterator it=_joblst.begin(); while(it!=_joblst.end()){ if(it->second == false){ it->second=true; break; } it++; } pthread_mutex_unlock(&_jobmux); string bindex="{"+key+":1}"; it->first->ensureIndex(doc,fromjson(bindex));pthread_mutex_lock(&_jobmux); it->second=false; pthread_mutex_unlock(&_jobmux); sem_post(&_jobsem); return RET_OK;
} //out為檢索出來的key-value數據對應,in 為要檢索的字段,key,key_value為要檢索的條件,暫不支持多條件檢索 //單列查詢 int cmongo::get(map<string,string>& out,vector<string> in,string key,string key_val){ //key key_val 要檢索字段 if(key.empty() || key_val.empty() || in.size()<=0){ return RET_PARERR; } BSONObjBuilder b; for(vector<string>::iterator iter=in.begin();iter!=in.end();++iter){ b.append(*iter,1); }
sem_wait(&_jobsem); pthread_mutex_lock(&_jobmux); map<DBClientConnection*,bool>::iterator it=_joblst.begin(); while(it!=_joblst.end()){ if(it->second == false){ it->second=true; break; } it++; } pthread_mutex_unlock(&_jobmux); BSONObj ob=b.obj(); BSONObj p=it->first->findOne(doc,QUERY(key<<key_val),&ob); map<string,string> temp; for(vector<string>::iterator iter=in.begin();iter!=in.end();++iter){ string mkey=*iter; temp[*iter]=p.getStringField(mkey.c_str()); } out=temp; pthread_mutex_lock(&_jobmux); it->second=false; pthread_mutex_unlock(&_jobmux); sem_post(&_jobsem); return RET_OK;
} //查詢key為key的一批數據的 某些字段 //fields為要查詢的字段集 //key="id" 值為in 一批key //返回key->map<key,value> int cmongo::gets(map< string,map<string,string> >& rout,vector<string> fields,vector<string> in,string key){ if(key.empty()){ return RET_PARERR; } sem_wait(&_jobsem); pthread_mutex_lock(&_jobmux); map<DBClientConnection*,bool>::iterator it=_joblst.begin(); while(it!=_joblst.end()){ if(it->second == false){ it->second=true; break; } it++; } pthread_mutex_unlock(&_jobmux);
BSONObjBuilder b; b.append(key,1); for(vector<string>::iterator iter=fields.begin();iter!=fields.end();++iter){ b.append(*iter,1); } BSONObj p=b.obj(); for(vector<string>::iterator iter2=in.begin();iter2!=in.end();++iter2){ BSONObj ob=it->first->findOne(doc,QUERY(key<<*iter2),&p); map<string,string> temp; for(vector<string>::iterator iter=fields.begin();iter!=fields.end();++iter){ string mkey=*iter; temp[*iter]=ob.getStringField(mkey.c_str()); } rout[ob.getStringField(key.c_str())]=temp; } pthread_mutex_lock(&_jobmux); it->second=false; pthread_mutex_unlock(&_jobmux); sem_post(&_jobsem); return RET_OK;
} //dumpkey key-value 返回 key對應的val值 //key val int cmongo::dumpkey(map< string,string >& rout,string key,string val){ if(key.empty()){ return RET_PARERR; } sem_wait(&_jobsem); pthread_mutex_lock(&_jobmux); map<DBClientConnection*,bool>::iterator it=_joblst.begin(); while(it!=_joblst.end()){ if(it->second == false){ it->second=true; break; } it++; } pthread_mutex_unlock(&_jobmux);
BSONObjBuilder b; b.append(key,1); if(!val.empty()){ b.append(val,1); } BSONObj p=b.obj(); pthread_mutex_lock(&_dbmux); auto_ptr<DBClientCursor> cursor = it->first->query(doc,Query(),0,0,&p); while(cursor->more()){ BSONObj ob=cursor->next(); rout[ob.getStringField(key.c_str())]=ob.getStringField(val.c_str()); } pthread_mutex_unlock(&_dbmux); pthread_mutex_lock(&_jobmux); it->second=false; pthread_mutex_unlock(&_jobmux); sem_post(&_jobsem); return RET_OK;
} //dumpkey key對應多個value //key->map<key,value>. //其實dumpvals接口完全可以包含dumpkey,為了方便運用獨立出來 //out 返回的key 對應的map<key,value> //in 每個key需要對應的返回哪些字段 //key="id" int cmongo::dumpvals(map< string,map<string,string> >& rout,vector<string> in,string key){ if(key.empty()){ return RET_PARERR; } sem_wait(&_jobsem); pthread_mutex_lock(&_jobmux); map<DBClientConnection*,bool>::iterator it=_joblst.begin(); while(it!=_joblst.end()){ if(it->second == false){ it->second=true; break; } it++; } pthread_mutex_unlock(&_jobmux);
BSONObjBuilder b; b.append(key,1); for(vector<string>::iterator iter=in.begin();iter!=in.end();++iter){ b.append(*iter,1); } BSONObj p=b.obj(); pthread_mutex_lock(&_dbmux); auto_ptr<DBClientCursor> cursor = it->first->query(doc,Query(),0,0,&p); while(cursor->more()){ BSONObj ob=cursor->next(); map<string,string> temp; for(vector<string>::iterator iter=in.begin();iter!=in.end();++iter){ string val=*iter; temp[val]=ob.getStringField(val.c_str()); } rout[ob.getStringField(key.c_str())]=temp; temp.clear(); } pthread_mutex_unlock(&_dbmux); pthread_mutex_lock(&_jobmux); it->second=false; pthread_mutex_unlock(&_jobmux); sem_post(&_jobsem); return RET_OK;
} //更新接口,暫不支持key對應多條記錄的更新 int cmongo::set(map<string,string> in,string key,string key_val){ //如果map沒有數據,返回參數錯誤 if(in.size()<=0 || key.empty() || key_val.empty()){ return RET_PARERR; } BSONObjBuilder b; map<string,string>::iterator iter; for(iter=in.begin();iter!=in.end();++iter){ b.append(iter->first,iter->second); }
sem_wait(&_jobsem); pthread_mutex_lock(&_jobmux); map<DBClientConnection*,bool>::iterator it=_joblst.begin(); while(it!=_joblst.end()){ if(it->second == false){ it->second=true; break; } it++; } pthread_mutex_unlock(&_jobmux); BSONObj ob=b.obj(); it->first->update(doc,QUERY(key<<key_val),BSON("$set"<<ob),true); int ret=RET_OK; string errmsg=it->first->getLastError(); if(!errmsg.empty()){ ret=RET_ERR; } pthread_mutex_lock(&_jobmux); it->second=false; pthread_mutex_unlock(&_jobmux); sem_post(&_jobsem); return ret;
} //更新接口,批量更新key="id" // "123456":<key,value>,<key,value> // "123457":<key,value>,<key,value> int cmongo::sets(map< string,map<string,string> > in,string key){ //如果map沒有數據,返回參數錯誤 if(in.size()<=0 || key.empty() ){ return RET_PARERR; }
sem_wait(&_jobsem); pthread_mutex_lock(&_jobmux); map<DBClientConnection*,bool>::iterator it=_joblst.begin(); while(it!=_joblst.end()){ if(it->second == false){ it->second=true; break; } it++; } pthread_mutex_unlock(&_jobmux); int ret=RET_OK; map< string,map<string,string> >::iterator iter; for(iter=in.begin();iter!=in.end();++iter){ BSONObjBuilder b; for(map<string,string>::iterator iter2=iter->second.begin();iter2!=iter->second.end();++iter2){ b.append(iter2->first,iter2->second); } BSONObj ob=b.obj(); it->first->update(doc,QUERY(key<<iter->first),BSON("$set"<<ob),true); string errmsg=it->first->getLastError(); if(!errmsg.empty()){ ret=RET_ERR; } } pthread_mutex_lock(&_jobmux); it->second=false; pthread_mutex_unlock(&_jobmux); sem_post(&_jobsem); return ret;
} //刪除接口,刪除記錄 key=id key_val=587.即刪除id="587"的記錄 int cmongo::remove(string key,string key_val){
if(key.empty() || key_val.empty()){ return RET_PARERR; } sem_wait(&_jobsem); pthread_mutex_lock(&_jobmux); map<DBClientConnection*,bool>::iterator it=_joblst.begin(); while(it!=_joblst.end()){ if(it->second == false){ it->second=true; break; } it++; } pthread_mutex_unlock(&_jobmux); it->first->remove(doc,BSON(key << key_val)); pthread_mutex_lock(&_jobmux); it->second=false; pthread_mutex_unlock(&_jobmux); sem_post(&_jobsem); return RET_OK;
} </pre>