C++線程池的設計與測試
編寫了一個最基本的線程池類,處理用c_work表示的工作任務。
/////////////////////////////////////////////////////// //線程池類 ///////////////////////////////////////////////////////include <pthread.h>
include <stdlib.h>
include <stdio.h>
include <unistd.h>
include <assert.h>
const int DEFAULT_MAX_THREAD_NUM = 10; const int MAX_WORK_NUM = 100000; //c_worker類 class c_work { public: c_work():process(NULL), arg(NULL), next(NULL){} c_work(void (prcss)(void ), void arg): process(prcss), arg(arg), next(NULL) {} ~c_work();
void *(*process)(void *); void *arg; unsigned char type; //最高位表示arg是否需要delete操作 c_work *next;
};
c_work::~c_work() { unsigned char ifdel = type >> 7; if (ifdel) { delete arg; arg = NULL; } }
class c_thread_pool { public: c_thread_pool(); c_thread_pool(const int max_thread_num); ~c_thread_pool();
int add_work(c_work work); static void *thread_routine(void *arg); pthread_mutex_t queue_lock; pthread_cond_t queue_cond;
// private: c_work queue_head; c_work queue_tail; int shutdown;
pthread_t *threadid; int max_thread_num; int cur_queue_size;
};
c_thread_pool::c_thread_pool() {
pthread_mutex_init(&queue_lock, NULL); pthread_cond_init(&queue_cond, NULL); //工作隊列初始化 queue_head = NULL; queue_tail = NULL; max_thread_num = max_thread_num; cur_queue_size = 0; shutdown = 0; max_thread_num = DEFAULT_MAX_THREAD_NUM; threadid = new pthread_t[max_thread_num]; int i = 0; for (i = 0; i < max_thread_num; i++) { pthread_create(&(threadid[i]), NULL, thread_routine, (void*)this); }
}
c_thread_pool::c_thread_pool(int max_thread_num) {
pthread_mutex_init(&queue_lock, NULL); pthread_cond_init(&queue_cond, NULL); //工作隊列初始化 queue_head = NULL; queue_tail = NULL; max_thread_num = max_thread_num; cur_queue_size = 0; threadid = new pthread_t[max_thread_num]; int i = 0; for (i = 0; i < max_thread_num; i++) { pthread_create(&(threadid[i]), NULL, thread_routine, (void*)this); }
}
/向線程池中的任務隊列加入任務/ int c_thread_pool::add_work(c_work work) { c_work *newwork = new c_work; newwork->process = work.process; newwork->arg = work.arg; newwork->next = NULL;
pthread_mutex_lock(&queue_lock); /*將任務加入到等待隊列中*/ if (queue_head != NULL && queue_tail != NULL) { queue_tail->next = newwork; queue_tail = newwork; } else { //空隊列 queue_head = newwork; queue_tail = newwork; } cur_queue_size++; pthread_mutex_unlock(&queue_lock); /*等待隊列中有任務了,喚醒一個等待線程,注意如果所有線程都在忙碌,這句沒有任何作用*/ pthread_cond_signal(&(queue_cond)); printf("add work returned!\n"); return 0;
}
void c_thread_pool::thread_routine(void arg) { c_thread_pool pool = (c_thread_pool )arg; int i = 0; while (1) { pthread_mutex_lock(&(pool->queue_lock)); //如果等待隊列為0并且不銷毀線程池,則處于阻塞狀態; 注意 // pthread_cond_wait是一個原子操作,等待前會解鎖,喚醒后會加鎖
//標注:注意這一如果任務隊列不為空的話,while語句將被跳過,直接執行下面的調用。 while (pool->cur_queue_size == 0 && pool->shutdown) { pthread_cond_wait(&(pool->queue_cond), &(pool->queue_lock)); } //等待隊列長度減去1,并取出鏈表中的頭元素 if (pool->cur_queue_size > 0 && pool->queue_head != NULL) { printf("IN THREAD ROUTINE size = %d && queue head is not NULL\n", pool->cur_queue_size); pool->cur_queue_size--; c_work *work = pool->queue_head; pool->queue_head = work->next; pthread_mutex_unlock(&(pool->queue_lock)); //調用回調函數,執行測試任務 ////////////////////////////////////////// (*(work->process))(work->arg); free(work); work = NULL; } else //不可達 { pthread_mutex_unlock(&(pool->queue_lock)); } }
}
c_thread_pool::~c_thread_pool() { for (int i = 0; i < max_thread_num; ++i) pthread_cancel(threadid[i]); for (c_work w_t = queue_head; w_t != NULL;) { c_work temp = w_t->next; delete w_t; w_t = temp; } delete [] threadid; }</pre>轉自:http://blog.csdn.net/naturebe/article/details/7901130