C++線程池的設計與測試

openkk 12年前發布 | 24K 次閱讀 C/C++開發 C/C++

編寫了一個最基本的線程池類,處理用c_work表示的工作任務。

///////////////////////////////////////////////////////
//線程池類 
///////////////////////////////////////////////////////

include <pthread.h>

include <stdlib.h>

include <stdio.h>

include <unistd.h>

include <assert.h>

const int DEFAULT_MAX_THREAD_NUM = 10; const int MAX_WORK_NUM = 100000; //c_worker類 class c_work { public: c_work():process(NULL), arg(NULL), next(NULL){} c_work(void (prcss)(void ), void arg): process(prcss), arg(arg), next(NULL) {} ~c_work();

    void *(*process)(void *);
    void *arg;
    unsigned char type; //最高位表示arg是否需要delete操作
    c_work *next;

};

c_work::~c_work() { unsigned char ifdel = type >> 7; if (ifdel) { delete arg; arg = NULL; } }

class c_thread_pool { public: c_thread_pool(); c_thread_pool(const int max_thread_num); ~c_thread_pool();

    int add_work(c_work work);
    static void *thread_routine(void *arg);

    pthread_mutex_t queue_lock;
    pthread_cond_t queue_cond;

// private: c_work queue_head; c_work queue_tail; int shutdown;

    pthread_t *threadid;
    int max_thread_num;
    int cur_queue_size;

};

c_thread_pool::c_thread_pool() {

pthread_mutex_init(&queue_lock, NULL);
pthread_cond_init(&queue_cond, NULL);

//工作隊列初始化
queue_head = NULL;
queue_tail = NULL;

max_thread_num = max_thread_num;
cur_queue_size = 0;

shutdown = 0;

max_thread_num = DEFAULT_MAX_THREAD_NUM;
threadid = new pthread_t[max_thread_num];
int i = 0;

for (i = 0; i < max_thread_num; i++)
{
    pthread_create(&(threadid[i]), NULL, thread_routine, (void*)this);
}

}

c_thread_pool::c_thread_pool(int max_thread_num) {

pthread_mutex_init(&queue_lock, NULL);
pthread_cond_init(&queue_cond, NULL);

//工作隊列初始化
queue_head = NULL;
queue_tail = NULL;

max_thread_num = max_thread_num;
cur_queue_size = 0;

threadid = new pthread_t[max_thread_num];
int i = 0;
for (i = 0; i < max_thread_num; i++)
{
    pthread_create(&(threadid[i]), NULL, thread_routine, (void*)this);
}

}

/向線程池中的任務隊列加入任務/ int c_thread_pool::add_work(c_work work) { c_work *newwork = new c_work; newwork->process = work.process; newwork->arg = work.arg; newwork->next = NULL;

pthread_mutex_lock(&queue_lock);

/*將任務加入到等待隊列中*/
if (queue_head != NULL && queue_tail != NULL)
{
    queue_tail->next = newwork;
    queue_tail = newwork;   
}
else
{
    //空隊列
    queue_head = newwork;
    queue_tail = newwork;
}

cur_queue_size++;
pthread_mutex_unlock(&queue_lock);
/*等待隊列中有任務了,喚醒一個等待線程,注意如果所有線程都在忙碌,這句沒有任何作用*/
pthread_cond_signal(&(queue_cond)); 
printf("add work returned!\n");
return 0;

}

void c_thread_pool::thread_routine(void arg) { c_thread_pool pool = (c_thread_pool )arg; int i = 0; while (1) { pthread_mutex_lock(&(pool->queue_lock)); //如果等待隊列為0并且不銷毀線程池,則處于阻塞狀態; 注意 // pthread_cond_wait是一個原子操作,等待前會解鎖,喚醒后會加鎖

    //標注:注意這一如果任務隊列不為空的話,while語句將被跳過,直接執行下面的調用。
    while (pool->cur_queue_size == 0 && pool->shutdown)
    {
        pthread_cond_wait(&(pool->queue_cond), &(pool->queue_lock));
    }


    //等待隊列長度減去1,并取出鏈表中的頭元素
    if (pool->cur_queue_size > 0 && pool->queue_head != NULL)
    {
        printf("IN THREAD ROUTINE size = %d && queue head is not NULL\n", pool->cur_queue_size);
        pool->cur_queue_size--;
        c_work *work = pool->queue_head;
        pool->queue_head = work->next;
        pthread_mutex_unlock(&(pool->queue_lock));

        //調用回調函數,執行測試任務
        //////////////////////////////////////////
        (*(work->process))(work->arg);
        free(work);
        work = NULL;
    }
    else //不可達
    {
        pthread_mutex_unlock(&(pool->queue_lock));
    }
}

}

c_thread_pool::~c_thread_pool() { for (int i = 0; i < max_thread_num; ++i) pthread_cancel(threadid[i]); for (c_work w_t = queue_head; w_t != NULL;) { c_work temp = w_t->next; delete w_t; w_t = temp; } delete [] threadid; }</pre>轉自:http://blog.csdn.net/naturebe/article/details/7901130

 本文由用戶 openkk 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!