JAVA線程池例子

jopen 10年前發布 | 17K 次閱讀 Java開發 Java

用途及用法

       網絡請求通常有兩種形式:第一種,請求不是很頻繁,而且每次連接后會保持相當一段時間來讀數據或者寫數據,最后斷開,如文件下載,網絡流媒體等。另一種形式是請求頻繁,但是連接上以后讀/寫很少量的數據就斷開連接。考慮到服務的并發問題,如果每個請求來到以后服務都為它啟動一個線程,那么這對服務的資源可能會造成很大的浪費,特別是第二種情況。因為通常情況下,創建線程是需要一定的耗時的,設這個時間為T1,而連接后讀/寫服務的時間為T2,當T1>>T2時,我們就應當考慮一種策略或者機制來控制,使得服務對于第二種請求方式也能在較低的功耗下完成。

       通常,我們可以用線程池來解決這個問題,首先,在服務啟動的時候,我們可以啟動好幾個線程,并用一個容器(如線程池)來管理這些線程。當請求到來時,可以從池中去一個線程出來,執行任務(通常是對請求的響應),當任務結束后,再將這個線程放入池中備用;如果請求到來而池中沒有空閑的線程,該請求需要排隊等候。最后,當服務關閉時銷毀該池即可。

結構

線程池中通常由這樣幾個概念(接口)組成:

  1. 線程池(Thread pool ),池是一個容器,容器中有很多個執行器,每一個執行器是一個線程。當然,這個容器的實現,可以是鏈表,可以是數組等等,不需要關心,需要關心的是,池必須提供一個可以從中取出執行器 的方法,可能還需要一個池中現有活動線程數方法,銷毀池的方法等。

  2. 執行器(Executor ),每個執行器是一個線程,每個執行器可以執行一個任務 ,任務是做什么,此時還不很明確,它需要提供任務的setter/getter方法,并且作為一個線程,他可以獨立運行,執行器執行完自身后,需要將自身放入池中。

  3. 任務(Task ),任務是每個線程具體要做的事,如資源下載,播放flash片段,打印一段文字到控制臺等等,它本身不能執行,而需要將自身交給執行器。

       整個池的機制和結構就是這樣,當然,需要一個調度者(scheduler)來協調主線程和池的關系。結構,或者接口的目的是為了讓我們從細節中解脫出來,從一個比較抽象的層次來描述系統,這樣的好處是簡單,而且設計出來的框架比較通用,可以適應很多相近相似的情況。由于Task具體干什么我們不知道,所以它幾乎可以干任何適應于上邊總結的網絡連接的第二種情況(T1>>T2)。

類的結構圖

       雖然為一個簡單的實現設計一個標準的UML視圖是不太現實的,但是這是一種受鼓勵的做法,至少應該用鉛筆在草紙上畫出相關的視圖,這樣可以幫助以后的維護和更高級的擴展。JAVA線程池例子

線程池的簡單實現

       實現可以是通過多種語言的,我們在此選擇面向對象的JAVA,而如果你使用C的話,也沒有問題,問題在上一小節已經描述清楚,語言是不重要的。

       池是一個容器,我們考慮使用java.util.LinkedList類(可能由于它的長度是可變的,而且不需要我們使用者來考慮),也就是說,池需要維護一個鏈表。

public interface Pool {//池接口
    Executor getExecutor();
    void destroy();
}

public interface Executor {//執行器接口
    void setTask(Task task);
    Task getTask();
    void startTask();
}

       鑒于執行器是池中的對象,而且外部沒有必要知道其細節,我們考慮將Executor接口的實現做為Pool接口的實現的內部類。這樣做的另一個好處是,更便于池的管理。

import java.util.LinkedList;
import java.util.Properties;

import redesigned.utils.PropReader;

public class ThreadPool implements Pool{
    private boolean isShut;
    private LinkedList pool;
    private static Properties prop = PropReader.getProperties("webconfig.properties");
    private int size = Integer.parseInt(prop.getProperty("threadsperpage", "3"));
    public ThreadPool(){
        // read configuration and set the
        // content of pool by objects of Executor
        isShut = false;//set the status of pool to active
        pool = new LinkedList();
        for(int i = 0; i < size; i++){
            Executor executor = new ExecutorImpl();//new a executor thread
            pool.add(executor);//add it to pool
            ((ExecutorImpl)executor).start();//start it
        }
    }
    public void destroy() {//Destroy
        synchronized(pool){
            isShut = true;//set the status of pool to inactive
            pool.notifyAll();//notify all listener.
            pool.clear();//clear the list of threads
        }
    }

    public Executor getExecutor(){
        Executor ret = null;
        synchronized(pool){//return if any.
            if(pool.size() > 0){
                ret = (Executor)pool.removeFirst();
            }else{
                try {
                    pool.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                ret = (Executor)pool.removeFirst();
            }
        }
        return ret;
    }

    // Executor接口的實現作為ThreadPool的內部類
    private class ExecutorImpl extends Thread implements Executor{
        private Task task;
        private Object lock = new Object();
        //private boolean loop = true;
        public ExecutorImpl(){}
        public Task getTask() {
            return this.task;
        }

        public void setTask(Task task) {
            this.task = task;
        }
        public void startTask(){
            //System.out.println("start here");
            synchronized(lock){
                lock.notify();
            }
        }
        public void run(){
            //get a task if any
            //then run it
            //then put self to pool
            while(!isShut){
                synchronized(lock){
                    try {
                        lock.wait();//wait for resource
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                getTask().execute();//execute the task
                synchronized(pool){//put it self to the pool when finish the task
                    pool.addFirst(ExecutorImpl.this);
                    pool.notifyAll();
                }
            }
        }
    }
}

好了,池設計好了,再來看看任務(Task)的接口和實現。

public interface Task {//這個接口也比較簡單,可以執行,可以取到執行結果
    void execute();
    byte[] getResult();
}

Task的實現可以是多種多樣的,下邊的例子是一個加載資源的Task.使用方式。

Pool pool = new ThreadPool();// new a ThreadPool
//load resources on each page, and start #s of thread.
for(int i = 0; i < resourceList.size();i++){
    Executor executor = pool.getExecutor(); // get Executor form pool
    Task resourceLoader = new ResourceLoader((String)resourceList.get(i));
    executor.setTask(resourceLoader); // set the task to executor
    executor.startTask(); // try to start the executor.
}

//wait while all task are done, the destroy the pool.
pool.destroy();

優勢或者適用范圍

  1. 在并發時,需要被并發的線程不需要知道自己什么時候需要被啟動,它子需要考慮這樣一種情況:它自己從一個地方取出來一個執行器,然后把任務交給執行器,然后等待執行器結束即可,他關心的是自己所需要干的事,或者自己負責的事。這樣,大家都簡單,因為只需要做好自己的事情就好了。面向對象的一個秘訣為:永遠相信合作者,使用別人的接口而不是自己去實現所有的接口。

  2. 這種T1>>T2的請求方式在網絡中固然是常見的,在實際問題中同樣是常見的。因此,掌握這種模式可能會對我們以后的程序設計提供方便和好處。

小結

同步問題: 同步在線程的并發中意義非常之大,對臨界資源的控制是并發時最關鍵的地方。如在線程池中,當池中沒有空閑的線程時,新來的請求就必須等待,而一旦一個Task運行結束后,一方面將自己放入池中,一方面需要通知等待在pool中的其他線程。每一個執行器線程,一開始啟動,則進入等待狀態,此時不會消耗CPU資源。而當在外部調用執行器的startTask()方法,即可通知線程從等待狀態中醒來,去出Task,執行之,將執行器本身放入池中,然后繼續等待。

當然,實現的策略是可以多種多樣的,但是問題的本質已經在第二小節結構 很明確的被定義了。

最近開始學習JAVA,同時也開始熟悉面向對象的思維方式,這篇日志,一來作為一個備忘,二來可以對可能需要的人提供幫助。以上可以算作是小結。

 JAVA線程池例子原文地址:http://abruzzi.iteye.com/blog/266335#

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!