mongodb連接池c++ 封裝
linux平臺下mongodb c++連接池封裝,線程安全
//函數返回0:成功 >0 出錯
class cmongo{
public:
//默認構造函數,默認連接數為1
cmongo();
//傳入連接數到構造函數,默認連接數為size
cmongo(int size);
//析構函數
~cmongo();
public:
//設置tcp讀寫超時時間
int set_wr_timeout(double t);
//連接
int conn(string mhost="127.0.0.1",int mport=27017);
//設置db collection
int setdb(string mdb,string mcollection);
int setindex(string key);
//查詢
int get(map<string,string>& out,vector<string> in,string key,string key_val);
//投遞一批要查詢的字段,fields為要查詢哪些字段
int gets(map< string,map<string,string> >& rout,vector<string> fields,vector<string> in,string key);
//dump key-value dumpkey對應一個value
int dumpkey(map< string,string >& rout,string key,string val);
//dump key->map<key,value> dumpkey對應一組value
int dumpvals(map< string,map<string,string> >& rout,vector<string> in,string key);
//寫入
int set(map<string,string> in,string key,string key_val);
//批量寫入
//更新接口,批量更新key="id"
// "123456":<key,value>,<key,value>
// "123457":<key,value>,<key,value>
int sets(map< string,map<string,string> > in,string key);
//刪除
int remove(string key,string key_val);
private:
string doc;
//tcp讀寫超時時間
double wr_timeout;
pthread_mutex_t _jobmux;
sem_t _jobsem;
map<DBClientConnection*,bool> _joblst;
pthread_mutex_t _dbmux;
};
cmongo::cmongo(int size){
//doc
doc=string(DB_DB)+"."+string(DB_COLLECTION);
wr_timeout=3;
//最大連接0-200
if(size<0){
size=1;
}
if(size>200){
size=200;
}
if(_joblst.size()>0){
return;
}
bool auto_conn=true;
pthread_mutex_init(&_jobmux,NULL);
if((sem_init(&_jobsem,0,0))<0){
return;
}
pthread_mutex_lock(&_jobmux);
for(int i=0;i<size;++i){
DBClientConnection* pconn = new DBClientConnection(auto_conn,0,wr_timeout);
if(pconn != NULL){
_joblst[pconn]=false;
}
}
pthread_mutex_unlock(&_jobmux);
}
cmongo::~cmongo(){
doc="";
pthread_mutex_lock(&_jobmux);
map<DBClientConnection,bool>::iterator it=_joblst.begin();
while(it != _joblst.end()){
delete it->first;
it++;
}
pthread_mutex_unlock(&_jobmux);
}
int cmongo::set_wr_timeout(double t){
wr_timeout=t;
return RET_OK;
}
int cmongo::conn(string mhost,int mport){
pthread_mutex_lock(&_jobmux);
map<DBClientConnection,bool>::iterator it=_joblst.begin();
while(it!=_joblst.end()){
string errmsg="";
HostAndPort hp(mhost,mport);
if(!(it->first->connect(hp,errmsg))){
cerr<<"connect mhost:"<<mhost<<" mport:"<<mport<<" msg:"<<errmsg<<endl;
it->second=true;
}
sem_post(&_jobsem);
it++;
}
pthread_mutex_unlock(&_jobmux);
return RET_OK;
}
int cmongo::setdb(string mdb,string mcollection){
if(mdb.empty() || mcollection.empty()){
return RET_PARERR;
}
doc=mdb+"."+mcollection;
return RET_OK;
}
int cmongo::setindex(string key){
if(key.empty()){
return RET_PARERR;
}
sem_wait(&_jobsem);
pthread_mutex_lock(&_jobmux);
map<DBClientConnection*,bool>::iterator it=_joblst.begin();
while(it!=_joblst.end()){
if(it->second == false){
it->second=true;
break;
}
it++;
}
pthread_mutex_unlock(&_jobmux);
string bindex="{"+key+":1}";
it->first->ensureIndex(doc,fromjson(bindex));
pthread_mutex_lock(&_jobmux);
it->second=false;
pthread_mutex_unlock(&_jobmux);
sem_post(&_jobsem);
return RET_OK;
}
//out為檢索出來的key-value數據對應,in 為要檢索的字段,key,key_value為要檢索的條件,暫不支持多條件檢索
//單列查詢
int cmongo::get(map<string,string>& out,vector<string> in,string key,string key_val){
//key key_val 要檢索字段
if(key.empty() || key_val.empty() || in.size()<=0){
return RET_PARERR;
}
BSONObjBuilder b;
for(vector<string>::iterator iter=in.begin();iter!=in.end();++iter){
b.append(*iter,1);
}
sem_wait(&_jobsem);
pthread_mutex_lock(&_jobmux);
map<DBClientConnection*,bool>::iterator it=_joblst.begin();
while(it!=_joblst.end()){
if(it->second == false){
it->second=true;
break;
}
it++;
}
pthread_mutex_unlock(&_jobmux);
BSONObj ob=b.obj();
BSONObj p=it->first->findOne(doc,QUERY(key<<key_val),&ob);
map<string,string> temp;
for(vector<string>::iterator iter=in.begin();iter!=in.end();++iter){
string mkey=*iter;
temp[*iter]=p.getStringField(mkey.c_str());
}
out=temp;
pthread_mutex_lock(&_jobmux);
it->second=false;
pthread_mutex_unlock(&_jobmux);
sem_post(&_jobsem);
return RET_OK;
}
//查詢key為key的一批數據的 某些字段
//fields為要查詢的字段集
//key="id" 值為in 一批key
//返回key->map<key,value>
int cmongo::gets(map< string,map<string,string> >& rout,vector<string> fields,vector<string> in,string key){
if(key.empty()){
return RET_PARERR;
}
sem_wait(&_jobsem);
pthread_mutex_lock(&_jobmux);
map<DBClientConnection*,bool>::iterator it=_joblst.begin();
while(it!=_joblst.end()){
if(it->second == false){
it->second=true;
break;
}
it++;
}
pthread_mutex_unlock(&_jobmux);
BSONObjBuilder b;
b.append(key,1);
for(vector<string>::iterator iter=fields.begin();iter!=fields.end();++iter){
b.append(*iter,1);
}
BSONObj p=b.obj();
for(vector<string>::iterator iter2=in.begin();iter2!=in.end();++iter2){
BSONObj ob=it->first->findOne(doc,QUERY(key<<*iter2),&p);
map<string,string> temp;
for(vector<string>::iterator iter=fields.begin();iter!=fields.end();++iter){
string mkey=*iter;
temp[*iter]=ob.getStringField(mkey.c_str());
}
rout[ob.getStringField(key.c_str())]=temp;
}
pthread_mutex_lock(&_jobmux);
it->second=false;
pthread_mutex_unlock(&_jobmux);
sem_post(&_jobsem);
return RET_OK;
}
//dumpkey key-value 返回 key對應的val值
//key val
int cmongo::dumpkey(map< string,string >& rout,string key,string val){
if(key.empty()){
return RET_PARERR;
}
sem_wait(&_jobsem);
pthread_mutex_lock(&_jobmux);
map<DBClientConnection*,bool>::iterator it=_joblst.begin();
while(it!=_joblst.end()){
if(it->second == false){
it->second=true;
break;
}
it++;
}
pthread_mutex_unlock(&_jobmux);
BSONObjBuilder b;
b.append(key,1);
if(!val.empty()){
b.append(val,1);
}
BSONObj p=b.obj();
pthread_mutex_lock(&_dbmux);
auto_ptr<DBClientCursor> cursor = it->first->query(doc,Query(),0,0,&p);
while(cursor->more()){
BSONObj ob=cursor->next();
rout[ob.getStringField(key.c_str())]=ob.getStringField(val.c_str());
}
pthread_mutex_unlock(&_dbmux);
pthread_mutex_lock(&_jobmux);
it->second=false;
pthread_mutex_unlock(&_jobmux);
sem_post(&_jobsem);
return RET_OK;
}
//dumpkey key對應多個value
//key->map<key,value>.
//其實dumpvals接口完全可以包含dumpkey,為了方便運用獨立出來
//out 返回的key 對應的map<key,value>
//in 每個key需要對應的返回哪些字段
//key="id"
int cmongo::dumpvals(map< string,map<string,string> >& rout,vector<string> in,string key){
if(key.empty()){
return RET_PARERR;
}
sem_wait(&_jobsem);
pthread_mutex_lock(&_jobmux);
map<DBClientConnection*,bool>::iterator it=_joblst.begin();
while(it!=_joblst.end()){
if(it->second == false){
it->second=true;
break;
}
it++;
}
pthread_mutex_unlock(&_jobmux);
BSONObjBuilder b;
b.append(key,1);
for(vector<string>::iterator iter=in.begin();iter!=in.end();++iter){
b.append(*iter,1);
}
BSONObj p=b.obj();
pthread_mutex_lock(&_dbmux);
auto_ptr<DBClientCursor> cursor = it->first->query(doc,Query(),0,0,&p);
while(cursor->more()){
BSONObj ob=cursor->next();
map<string,string> temp;
for(vector<string>::iterator iter=in.begin();iter!=in.end();++iter){
string val=*iter;
temp[val]=ob.getStringField(val.c_str());
}
rout[ob.getStringField(key.c_str())]=temp;
temp.clear();
}
pthread_mutex_unlock(&_dbmux);
pthread_mutex_lock(&_jobmux);
it->second=false;
pthread_mutex_unlock(&_jobmux);
sem_post(&_jobsem);
return RET_OK;
}
//更新接口,暫不支持key對應多條記錄的更新
int cmongo::set(map<string,string> in,string key,string key_val){
//如果map沒有數據,返回參數錯誤
if(in.size()<=0 || key.empty() || key_val.empty()){
return RET_PARERR;
}
BSONObjBuilder b;
map<string,string>::iterator iter;
for(iter=in.begin();iter!=in.end();++iter){
b.append(iter->first,iter->second);
}
sem_wait(&_jobsem);
pthread_mutex_lock(&_jobmux);
map<DBClientConnection*,bool>::iterator it=_joblst.begin();
while(it!=_joblst.end()){
if(it->second == false){
it->second=true;
break;
}
it++;
}
pthread_mutex_unlock(&_jobmux);
BSONObj ob=b.obj();
it->first->update(doc,QUERY(key<<key_val),BSON("$set"<<ob),true);
int ret=RET_OK;
string errmsg=it->first->getLastError();
if(!errmsg.empty()){
ret=RET_ERR;
}
pthread_mutex_lock(&_jobmux);
it->second=false;
pthread_mutex_unlock(&_jobmux);
sem_post(&_jobsem);
return ret;
}
//更新接口,批量更新key="id"
// "123456":<key,value>,<key,value>
// "123457":<key,value>,<key,value>
int cmongo::sets(map< string,map<string,string> > in,string key){
//如果map沒有數據,返回參數錯誤
if(in.size()<=0 || key.empty() ){
return RET_PARERR;
}
sem_wait(&_jobsem);
pthread_mutex_lock(&_jobmux);
map<DBClientConnection*,bool>::iterator it=_joblst.begin();
while(it!=_joblst.end()){
if(it->second == false){
it->second=true;
break;
}
it++;
}
pthread_mutex_unlock(&_jobmux);
int ret=RET_OK;
map< string,map<string,string> >::iterator iter;
for(iter=in.begin();iter!=in.end();++iter){
BSONObjBuilder b;
for(map<string,string>::iterator iter2=iter->second.begin();iter2!=iter->second.end();++iter2){
b.append(iter2->first,iter2->second);
}
BSONObj ob=b.obj();
it->first->update(doc,QUERY(key<<iter->first),BSON("$set"<<ob),true);
string errmsg=it->first->getLastError();
if(!errmsg.empty()){
ret=RET_ERR;
}
}
pthread_mutex_lock(&_jobmux);
it->second=false;
pthread_mutex_unlock(&_jobmux);
sem_post(&_jobsem);
return ret;
}
//刪除接口,刪除記錄 key=id key_val=587.即刪除id="587"的記錄
int cmongo::remove(string key,string key_val){
if(key.empty() || key_val.empty()){
return RET_PARERR;
}
sem_wait(&_jobsem);
pthread_mutex_lock(&_jobmux);
map<DBClientConnection*,bool>::iterator it=_joblst.begin();
while(it!=_joblst.end()){
if(it->second == false){
it->second=true;
break;
}
it++;
}
pthread_mutex_unlock(&_jobmux);
it->first->remove(doc,BSON(key << key_val));
pthread_mutex_lock(&_jobmux);
it->second=false;
pthread_mutex_unlock(&_jobmux);
sem_post(&_jobsem);
return RET_OK;
}
</pre>