java線程池的原理、實現和應用

冰藍Crystal 8年前發布 | 8K 次閱讀 Java開發

來自: http://blog.csdn.net//chenleixing/article/details/42672525



一. 線程池:

其實到目前為止我們接觸過很多關于池的概念:string池、連接池,之所以要用這個池,目的只有一個:資源的重復使用。

線程池:首先創建一些線程,當服務器接收到一個客戶請求后,就從線程池中取出一個空閑的線程為之服務,服務完后并不關閉該線程,而是將該線程還回到線程池中。

在線程池的編程模式下,任務是提交給整個線程池的,而不是直接交給某個線程,線程池在拿到任務后,它就在內部找有無空閑的線程,找到后再把任務交給內部某個空閑的線程,這就是封裝。記住:任務是提交給整個線程池的,一個線程同時只能執行一個任務,但可以同時向一個線程池提交多個任務。

  多線程技術主要解決處理器單元內多個線程執行的問題,它可以顯著減少處理器單元的閑置時間,增加處理器單元的吞吐能力。
    
    假設一個服務器完成一項任務所需時間為:T1 創建線程時間,T2 在線程中執行任務的時間,T3 銷毀線程時間。
    
    如果:T1 + T3 遠大于 T2,則可以采用線程池,以提高服務器性能。
                一個線程池包括以下四個基本組成部分:
                1、線程池管理器(ThreadPool):用于創建并管理線程池,包括 創建線程池,銷毀線程池,添加新任務;
                2、工作線程(PoolWorker):線程池中線程,在沒有任務時處于等待狀態,可以循環的執行任務;
                3、任務接口(Task):每個任務必須實現的接口,以供工作線程調度任務的執行,它主要規定了任務的入口,任務執行完后的收尾工作,任務的執行狀態等;
                4、任務隊列(taskQueue):用于存放沒有處理的任務。提供一種緩沖機制。
                
    線程池技術正是關注如何縮短或調整T1,T3時間的技術,從而提高服務器程序性能的。它把T1,T3分別安排在服務器程序的啟動和結束的時間段或者一些空閑的時間段,這樣在服務器程序處理客戶請求時,不會有T1,T3的開銷了。

    線程池不僅調整T1,T3產生的時間段,而且它還顯著減少了創建線程的數目,看一個例子:

    假設一個服務器一天要處理50000個請求,并且每個請求需要一個單獨的線程完成。在線程池中,線程數一般是固定的,所以產生線程總數不會超過線程池中線程的數目,而如果服務器不利用線程池來處理這些請求則線程總數為50000。一般線程池大小是遠小于50000。所以利用線程池的服務器程序不會為了創建50000而在處理請求時浪費時間,從而提高效率。

二. 線程池的創建與關閉:

可以創建三種不同的線程池:

1. 創建固定大小的線程池

ExecutorService threadPool = Executors.newFixedThreadPool(3);  // 線程數為3

2. 創建緩存線程池

ExecutorService threadPool = Executors.newCachedThreadPool();

3. 創建單一線程池:線程死后又會重新創建一個新的

ExecutorService threadPool = Executors.newSingleThreadExecutor();
</span>

<span style="font-size:14px;">public class ThreadPoolTest {

public static void main(String[] args) {
    ExecutorService threadPool = Executors.newSingleThreadExecutor();
    for (int i = 1; i <= 10; i++) {
        final int task = i;
        threadPool.execute(new Runnable() {
            @Override
            public void run() {
                for (int j = 1; j <= 10; j++) {
                    try {
                        Thread.sleep(20);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                   System.out.println(Thread.currentThread().getName() + " loop of " + j + " for task of " + task);
                }
            }
        });
    }
    System.out.println("all of 10 takes have committed!!");
    threadPool.shutdown();  
}

}</span></pre>

</div>

</div> </div> 關閉線程池:

threadPool.shutdown()   所有任務完成就結束線程

threadPool.shutdownNow() 馬上結束線程


三. 用線程池啟動定時器:

支持間隔重復任務的定時方式,不直接支持絕對定時方式,需要轉換成相對時間方式:

<span style="font-size:14px;">public class TimerTest {
    public static void main(String[] args) {
        // 只爆炸一次的炸彈
Executors.newScheduledThreadPool(3).schedule(new Runnable() { @Override public void run() { System.out.println("bombing!!!"); } }, 4, TimeUnit.SECONDS);

    // 會爆炸多次的連環炸彈  
    // 隔4秒炸一下,隔2秒再炸  
    Executors.newScheduledThreadPool(3).scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            System.out.println("bombings!!!");
        }
    }, 4, 2, TimeUnit.SECONDS);

}

}</span></pre>


四. 線程池實現原理:

</div>

</div> </div> /** 線程池類,工作線程作為其內部類 **/

package org.ymcn.util;

import java.util.Collections;
import java.util.Date;
import java.util.LinkedList;
import java.util.List;

import org.apache.log4j.Logger;

/**

  • 線程池
  • 創建線程池,銷毀線程池,添加新任務

  • @author obullxl
    /
    public final class ThreadPool {
        private static Logger logger = Logger.getLogger(ThreadPool.class);
        private static Logger taskLogger = Logger.getLogger("TaskLogger");

        private static boolean debug = taskLogger.isDebugEnabled();
        // private static boolean debug = taskLogger.isInfoEnabled();
        /
    單例 /
        private static ThreadPool instance = ThreadPool.getInstance();

        public static final int SYSTEM_BUSY_TASK_COUNT = 150;
        /
    默認池中線程數 /
        public static int worker_num = 5;
        /
    已經處理的任務數 /
        private static int taskCounter = 0;

        public static boolean systemIsBusy = false;

        private static List<Task> taskQueue = Collections
                .synchronizedList(new LinkedList<Task>());
        /
    池中的所有線程 /
        public PoolWorker[] workers;

        private ThreadPool() {
            workers = new PoolWorker[5];
            for (int i = 0; i < workers.length; i++) {
                workers[i] = new PoolWorker(i);
            }
        }

        private ThreadPool(int pool_worker_num) {
            worker_num = pool_worker_num;
            workers = new PoolWorker[worker_num];
            for (int i = 0; i < workers.length; i++) {
                workers[i] = new PoolWorker(i);
            }
        }

        public static synchronized ThreadPool getInstance() {
            if (instance == null)
                return new ThreadPool();
            return instance;
        }
        /**
       
    增加新的任務
        每增加一個新任務,都要喚醒任務隊列
       
    @param newTask
        /
        public void addTask(Task newTask) {
            synchronized (taskQueue) {
                newTask.setTaskId(++taskCounter);
                newTask.setSubmitTime(new Date());
                taskQueue.add(newTask);
                /
    喚醒隊列, 開始執行 /
                taskQueue.notifyAll();
            }
            logger.info("Submit Task<" + newTask.getTaskId() + ">: "
                    + newTask.info());
        }
        /**
       
    批量增加新任務
        @param taskes
       
    /
        public void batchAddTask(Task[] taskes) {
            if (taskes == null || taskes.length == 0) {
                return;
            }
            synchronized (taskQueue) {
                for (int i = 0; i < taskes.length; i++) {
                    if (taskes[i] == null) {
                        continue;
                    }
                    taskes[i].setTaskId(++taskCounter);
                    taskes[i].setSubmitTime(new Date());
                    taskQueue.add(taskes[i]);
                }
                / 喚醒隊列, 開始執行 /
                taskQueue.notifyAll();
            }
            for (int i = 0; i < taskes.length; i++) {
                if (taskes[i] == null) {
                    continue;
                }
                logger.info("Submit Task<" + taskes[i].getTaskId() + ">: "
                        + taskes[i].info());
            }
        }
        /
        線程池信息
       
    @return
        */
        public String getInfo() {
            StringBuffer sb = new StringBuffer();
            sb.append("\nTask Queue Size:" + taskQueue.size());
            for (int i = 0; i < workers.length; i++) {
                sb.append("\nWorker " + i + " is "
                        + ((workers[i].isWaiting()) ? "Waiting." : "Running."));
            }
            return sb.toString();
        }
        /

        銷毀線程池
       
    /
        public synchronized void destroy() {
            for (int i = 0; i < worker_num; i++) {
                workers[i].stopWorker();
                workers[i] = null;
            }
            taskQueue.clear();
        }

        /
        池中工作線程
       

        @author obullxl
       
    /
        private class PoolWorker extends Thread {
            private int index = -1;
            / 該工作線程是否有效 /
            private boolean isRunning = true;
            / 該工作線程是否可以執行新任務 /
            private boolean isWaiting = true;

            public PoolWorker(int index) {
                this.index = index;
                start();
            }

            public void stopWorker() {
                this.isRunning = false;
            }

            public boolean isWaiting() {
                return this.isWaiting;
            }
            /

            循環執行任務
           
    這也許是線程池的關鍵所在
            /
            public void run() {
                while (isRunning) {
                    Task r = null;
                    synchronized (taskQueue) {
                        while (taskQueue.isEmpty()) {
                            try {
                                /
    任務隊列為空,則等待有新任務加入從而被喚醒 /
                                taskQueue.wait(20);
                            } catch (InterruptedException ie) {
                                logger.error(ie);
                            }
                        }
                        /
    取出任務執行 /
                        r = (Task) taskQueue.remove(0);
                    }
                    if (r != null) {
                        isWaiting = false;
                        try {
                            if (debug) {
                                r.setBeginExceuteTime(new Date());
                                taskLogger.debug("Worker<" + index
                                        + "> start execute Task<" + r.getTaskId() + ">");
                                if (r.getBeginExceuteTime().getTime()
                                        - r.getSubmitTime().getTime() > 1000)
                                    taskLogger.debug("longer waiting time. "
                                            + r.info() + ",<" + index + ">,time:"
                                            + (r.getFinishTime().getTime() - r
                                                    .getBeginExceuteTime().getTime()));
                            }
                            /
    該任務是否需要立即執行 */
                            if (r.needExecuteImmediate()) {
                                new Thread(r).start();
                            } else {
                                r.run();
                            }
                            if (debug) {
                                r.setFinishTime(new Date());
                                taskLogger.debug("Worker<" + index
                                        + "> finish task<" + r.getTaskId() + ">");
                                if (r.getFinishTime().getTime()
                                        - r.getBeginExceuteTime().getTime() > 1000)
                                    taskLogger.debug("longer execution time. "
                                            + r.info() + ",<" + index + ">,time:"
                                            + (r.getFinishTime().getTime() - r
                                                    .getBeginExceuteTime().getTime()));
                            }
                        } catch (Exception e) {
                            e.printStackTrace();
                            logger.error(e);
                        }
                        isWaiting = true;
                        r = null;
                    }
                }
            }
        }
    }

    /** 任務接口類 **/

    package org.ymcn.util;

    import java.util.Date;

    /**
  • 所有任務接口
  • 其他任務必須繼承訪類

  • @author obullxl
    /
    public abstract class Task implements Runnable {
        // private static Logger logger = Logger.getLogger(Task.class);
        /
    產生時間 /
        private Date generateTime = null;
        /
    提交執行時間 /
        private Date submitTime = null;
        /
    開始執行時間 /
        private Date beginExceuteTime = null;
        /
    執行完成時間 /
        private Date finishTime = null;

        private long taskId;

        public Task() {
            this.generateTime = new Date();
        }

        /**
       
    任務執行入口
        /
        public void run() {
            /**
           
    相關執行代碼
           
           
    beginTransaction();
           
           
    執行過程中可能產生新的任務 subtask = taskCore();
           
           
    commitTransaction();
           
           
    增加新產生的任務 ThreadPool.getInstance().batchAddTask(taskCore());
            /
        }

        /**
       
    所有任務的核心 所以特別的業務邏輯執行之處
       
       
    @throws Exception
        /
        public abstract Task[] taskCore() throws Exception;

        /**
       
    是否用到數據庫
       
       
    @return
        /
        protected abstract boolean useDb();

        /**
       
    是否需要立即執行
       
       
    @return
        /
        protected abstract boolean needExecuteImmediate();

        /**
       
    任務信息
       
       
    @return String
        */
        public abstract String info();

        public Date getGenerateTime() {
            return generateTime;
        }

        public Date getBeginExceuteTime() {
            return beginExceuteTime;
        }

        public void setBeginExceuteTime(Date beginExceuteTime) {
            this.beginExceuteTime = beginExceuteTime;
        }

        public Date getFinishTime() {
            return finishTime;
        }

        public void setFinishTime(Date finishTime) {
            this.finishTime = finishTime;
        }

        public Date getSubmitTime() {
            return submitTime;
        }

        public void setSubmitTime(Date submitTime) {
            this.submitTime = submitTime;
        }

        public long getTaskId() {
            return taskId;
        }

        public void setTaskId(long taskId) {
            this.taskId = taskId;
        }

    }</span>
 本文由用戶 冰藍Crystal 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!