Java 線程池的原理與實現
線程池就是其中之一,一提到線程,我們會想到以前《操作系統》的生產者與消費者,信號量,同步控制等等。
一提到池,我們會想到數據庫連接池,但是線程池又如何呢?
建議:在閱讀本文前,先理一理同步的知識,特別是syncronized同步關鍵字的用法。
關于我對同步的認識,要緣于大三年的一本書,書名好像是 Java 實戰,這本書寫得實在太妙了,真正的從理論到實踐,從截圖分析到.class字節碼分析。哇,我想市場上很難買到這么精致的書了。作為一個Java愛好者,我覺得絕對值得一讀。
我對此書印象最深之一的就是:equal()方法,由淺入深,經典!
還有就是同步了,其中提到了我的幾個編程誤區,以前如何使用同步提高性能等等,通過學習,使我對同步的認識進一步加深。
簡單介紹
創建線程有兩種方式:繼承Thread或實現Runnable。Thread實現了Runnable接口,提供了一個空的run()方法,所以不論是繼承Thread還是實現Runnable,都要有自己的run()方法。
一個線程創建后就存在,調用start()方法就開始運行(執行run()方法),調用wait進入等待或調用sleep進入休眠期,順利運行完畢或休眠被中斷或運行過程中出現異常而退出。
wait和sleep比較:
sleep方法有:sleep(long millis),sleep(long millis, long nanos),調用sleep方法后,當前線程進入休眠期,暫停執行,但該線程繼續擁有監視資源的所有權。到達休眠時間后線程將繼續執行,直到完成。若在 休眠期另一線程中斷該線程,則該線程退出。
wait方法有:wait(),wait(long timeout),wait(long timeout, long nanos),調用wait方法后,該線程放棄監視資源的所有權進入等待狀態;
wait():等待有其它的線程調用notify()或notifyAll()進入調度狀態,與其它線程共同爭奪監視。wait()相當于wait(0),wait(0, 0)。
wait(long timeout):當其它線程調用notify()或notifyAll(),或時間到達timeout亳秒,或有其它某線程中斷該線程,則該線程進入調度狀態。
wait(long timeout, long nanos):相當于wait(1000000*timeout + nanos),只不過時間單位為納秒。
線程池:
多線程技術主要解決處理器單元內多個線程執行的問題,它可以顯著減少處理器單元的閑置時間,增加處理器單元的吞吐能力。
假設一個服務器完成一項任務所需時間為:T1 創建線程時間,T2 在線程中執行任務的時間,T3 銷毀線程時間。
如果:T1 + T3 遠大于 T2,則可以采用線程池,以提高服務器性能。
一個線程池包括以下四個基本組成部分:
1、線程池管理器(ThreadPool):用于創建并管理線程池,包括 創建線程池,銷毀線程池,添加新任務;
2、工作線程(PoolWorker):線程池中線程,在沒有任務時處于等待狀態,可以循環的執行任務;
3、任務接口(Task):每個任務必須實現的接口,以供工作線程調度任務的執行,它主要規定了任務的入口,任務執行完后的收尾工作,任務的執行狀態等;
4、任務隊列(taskQueue):用于存放沒有處理的任務。提供一種緩沖機制。
線程池技術正是關注如何縮短或調整T1,T3時間的技術,從而提高服務器程序性能的。它把T1,T3分別安排在服務器程序的啟動和結束的時間段或者一些空閑的時間段,這樣在服務器程序處理客戶請求時,不會有T1,T3的開銷了。
線程池不僅調整T1,T3產生的時間段,而且它還顯著減少了創建線程的數目,看一個例子:
假設一個服務器一天要處理50000個請求,并且每個請求需要一個單獨的線程完成。在線程池中,線程數一般是固定的,所以產生線程總數不會超過線程池中線 程的數目,而如果服務器不利用線程池來處理這些請求則線程總數為50000。一般線程池大小是遠小于50000。所以利用線程池的服務器程序不會為了創建 50000而在處理請求時浪費時間,從而提高效率。
/ 線程池類,工作線程作為其內部類 /package org.ymcn.util;
import java.util.Collections; import java.util.Date; import java.util.LinkedList; import java.util.List;
import org.apache.log4j.Logger;
/**
- 線程池
- 創建線程池,銷毀線程池,添加新任務 *
@author obullxl */ public final class ThreadPool { private static Logger logger = Logger.getLogger(ThreadPool.class); private static Logger taskLogger = Logger.getLogger("TaskLogger");
private static boolean debug = taskLogger.isDebugEnabled(); // private static boolean debug = taskLogger.isInfoEnabled(); / 單例 / private static ThreadPool instance = ThreadPool.getInstance();
public static final int SYSTEM_BUSY_TASK_COUNT = 150; / 默認池中線程數 / public static int worker_num = 5; / 已經處理的任務數 / private static int taskCounter = 0;
public static boolean systemIsBusy = false;
private static List<Task> taskQueue = Collections
.synchronizedList(new LinkedList<Task>());
/ 池中的所有線程 / public PoolWorker[] workers;
private ThreadPool() {
workers = new PoolWorker[5]; for (int i = 0; i < workers.length; i++) { workers[i] = new PoolWorker(i); }
}
private ThreadPool(int pool_worker_num) {
worker_num = pool_worker_num; workers = new PoolWorker[worker_num]; for (int i = 0; i < workers.length; i++) { workers[i] = new PoolWorker(i); }
}
public static synchronized ThreadPool getInstance() {
if (instance == null) return new ThreadPool(); return instance;
} /**
- 增加新的任務
- 每增加一個新任務,都要喚醒任務隊列
- @param newTask
*/
public void addTask(Task newTask) {
synchronized (taskQueue) {
} logger.info("Submit Task<" + newTask.getTaskId() + ">: "newTask.setTaskId(++taskCounter); newTask.setSubmitTime(new Date()); taskQueue.add(newTask); /* 喚醒隊列, 開始執行 */ taskQueue.notifyAll();
} /**+ newTask.info());
- 批量增加新任務
- @param taskes
*/
public void batchAddTask(Task[] taskes) {
if (taskes == null || taskes.length == 0) {
} synchronized (taskQueue) {return;
} for (int i = 0; i < taskes.length; i++) {for (int i = 0; i < taskes.length; i++) { if (taskes[i] == null) { continue; } taskes[i].setTaskId(++taskCounter); taskes[i].setSubmitTime(new Date()); taskQueue.add(taskes[i]); } /* 喚醒隊列, 開始執行 */ taskQueue.notifyAll();
} } /**if (taskes[i] == null) { continue; } logger.info("Submit Task<" + taskes[i].getTaskId() + ">: " + taskes[i].info());
- 線程池信息
- @return
*/
public String getInfo() {
StringBuffer sb = new StringBuffer();
sb.append("\nTask Queue Size:" + taskQueue.size());
for (int i = 0; i < workers.length; i++) {
} return sb.toString(); } /**sb.append("\nWorker " + i + " is " + ((workers[i].isWaiting()) ? "Waiting." : "Running."));
銷毀線程池 */ public synchronized void destroy() { for (int i = 0; i < worker_num; i++) {
workers[i].stopWorker(); workers[i] = null;
} taskQueue.clear(); }
/**
- 池中工作線程 *
@author obullxl / private class PoolWorker extends Thread { private int index = -1; / 該工作線程是否有效 / private boolean isRunning = true; / 該工作線程是否可以執行新任務 */ private boolean isWaiting = true;
public PoolWorker(int index) {
this.index = index; start();
}
public void stopWorker() {
this.isRunning = false;
}
public boolean isWaiting() {
return this.isWaiting;
} /**
- 循環執行任務
- 這也許是線程池的關鍵所在
*/
public void run() {
while (isRunning) {
} } } }Task r = null; synchronized (taskQueue) { while (taskQueue.isEmpty()) { try { /* 任務隊列為空,則等待有新任務加入從而被喚醒 */ taskQueue.wait(20); } catch (InterruptedException ie) { logger.error(ie); } } /* 取出任務執行 */ r = (Task) taskQueue.remove(0); } if (r != null) { isWaiting = false; try { if (debug) { r.setBeginExceuteTime(new Date()); taskLogger.debug("Worker<" + index + "> start execute Task<" + r.getTaskId() + ">"); if (r.getBeginExceuteTime().getTime() - r.getSubmitTime().getTime() > 1000) taskLogger.debug("longer waiting time. " + r.info() + ",<" + index + ">,time:" + (r.getFinishTime().getTime() - r .getBeginExceuteTime().getTime())); } /* 該任務是否需要立即執行 */ if (r.needExecuteImmediate()) { new Thread(r).start(); } else { r.run(); } if (debug) { r.setFinishTime(new Date()); taskLogger.debug("Worker<" + index + "> finish task<" + r.getTaskId() + ">"); if (r.getFinishTime().getTime() - r.getBeginExceuteTime().getTime() > 1000) taskLogger.debug("longer execution time. " + r.info() + ",<" + index + ">,time:" + (r.getFinishTime().getTime() - r .getBeginExceuteTime().getTime())); } } catch (Exception e) { e.printStackTrace(); logger.error(e); } isWaiting = true; r = null; }
/ 任務接口類 /
package org.ymcn.util;
import java.util.Date;
/**
- 所有任務接口
- 其他任務必須繼承訪類 *
@author obullxl / public abstract class Task implements Runnable { // private static Logger logger = Logger.getLogger(Task.class); / 產生時間 / private Date generateTime = null; / 提交執行時間 / private Date submitTime = null; / 開始執行時間 / private Date beginExceuteTime = null; / 執行完成時間 */ private Date finishTime = null;
private long taskId;
public Task() {
this.generateTime = new Date();
}
/**
任務執行入口 */ public void run() { /**
- 相關執行代碼 *
- beginTransaction(); *
- 執行過程中可能產生新的任務 subtask = taskCore(); *
- commitTransaction(); *
- 增加新產生的任務 ThreadPool.getInstance().batchAddTask(taskCore()); */ }
/**
- 所有任務的核心 所以特別的業務邏輯執行之處 *
@throws Exception */ public abstract Task[] taskCore() throws Exception;
/**
- 是否用到數據庫 *
@return */ protected abstract boolean useDb();
/**
- 是否需要立即執行 *
@return */ protected abstract boolean needExecuteImmediate();
/**
- 任務信息 *
@return String */ public abstract String info();
public Date getGenerateTime() { return generateTime; }
public Date getBeginExceuteTime() { return beginExceuteTime; }
public void setBeginExceuteTime(Date beginExceuteTime) { this.beginExceuteTime = beginExceuteTime; }
public Date getFinishTime() { return finishTime; }
public void setFinishTime(Date finishTime) { this.finishTime = finishTime; }
public Date getSubmitTime() { return submitTime; }
public void setSubmitTime(Date submitTime) { this.submitTime = submitTime; }
public long getTaskId() { return taskId; }
public void setTaskId(long taskId) { this.taskId = taskId; }
}</pre>