Java一個簡單的線程池實現

y0657bys 8年前發布 | 7K 次閱讀 Java

線程池代碼

import java.util.List;
import java.util.Vector;
public class ThreadPool 
{
    private static ThreadPool instance_ = null;
    //定義優先級別常數,空閑的線程按照優先級不同分別存放在三個vector中
    public static final int LOW_PRIORITY = 0; 
    public static final int NORMAL_PRIORITY = 1;
    public static final int HIGH_PRIORITY = 2;
    //保存空閑線程的List,或者說它是"池"
    private List<PooledThread>[] idleThreads_;  
    private boolean shutDown_ = false;
    private int threadCreationCounter_; //以創建的線程的個數
    private boolean debug_ = false;    //是否輸出調試信息
    //構造函數,因為這個類視作為singleton實現的,因此構造函數為私有
    private ThreadPool() 
    {       
        // 產生空閑線程.三個vector分別存放分別處在三個優先級的線程的引用
        List[] idleThreads = {new Vector(5), new Vector(5), new Vector(5)};
        idleThreads_ = idleThreads;
        threadCreationCounter_ = 0;
    }
    
    public int getCreatedThreadsCount() {
        return threadCreationCounter_;
    }
    //通過這個函數得到線程池類的實例
    public static ThreadPool instance() {
        if (instance_ == null)
            instance_ = new ThreadPool();
        return instance_;
    }
    
    public boolean isDebug() {
        return debug_;
    }
    
    //將線程repoolingThread從新放回到池中,這個方式是同步方法。
    //這個方法會在多線程的環境中調用,設計這個方法的目的是讓工作者線程
    //在執行完target中的任務后,調用池類的repool()方法,
    //將線程自身從新放回到池中。只所以這么做是因為線程池并不能預見到
    //工作者線程何時會完成任務。參考PooledThread的相關代碼。
    protected synchronized void repool(PooledThread repoolingThread)
    {
        if (!shutDown_) 
        {
            if (debug_)
            {
                System.out.println("ThreadPool.repool() : repooling ");
            }
            switch (repoolingThread.getPriority())
            {
                case Thread.MIN_PRIORITY :
                {
                    idleThreads_[LOW_PRIORITY].add(repoolingThread);
                    break;
                }
                case Thread.NORM_PRIORITY :
                {
                    idleThreads_[NORMAL_PRIORITY].add(repoolingThread);
                    break;
                }
                case Thread.MAX_PRIORITY :
                {
                    idleThreads_[HIGH_PRIORITY].add(repoolingThread);
                    break;
                }
                default :
                    throw new IllegalStateException("Illegal priority found while repooling a Thread!");
            }
            notifyAll();//通知所有的線程
        }
        else 
        {
            if (debug_)
            {
                System.out.println("ThreadPool.repool() : Destroying incoming thread.");
            }
            repoolingThread.shutDown();//關閉線程
        }
        if (debug_) 
        {
            System.out.println("ThreadPool.recycle() : done.");
        }
    }
    
    public void setDebug(boolean newDebug) 
    {
        debug_ = newDebug;
    }
    
    //停止池中所有線程
    public synchronized void shutdown()
    {
        shutDown_ = true;
        if (debug_)
        {
            System.out.println("ThreadPool : shutting down ");
        }
        for (int prioIndex = 0; prioIndex <= HIGH_PRIORITY; prioIndex++)
        {
            List prioThreads = idleThreads_[prioIndex];
            for (int threadIndex = 0; threadIndex < prioThreads.size(); threadIndex++)
            {
                PooledThread idleThread = (PooledThread) prioThreads.get(threadIndex);
                idleThread.shutDown();
            }
        }
        notifyAll();
        if (debug_)
        {
            System.out.println("ThreadPool : shutdown done.");
        }
    }
    
    //以Runnable為target,從池中選擇一個優先級為priority的線程創建線程
    //并讓線程運行。
    public synchronized void start(Runnable target, int priority)
    {
        PooledThread thread = null;  //被選出來執行target的線程
        List idleList = idleThreads_[priority];
        if (idleList.size() > 0) 
        {
            //如果池中相應優先級的線程有空閑的,那么從中取出一個
            //設置它的target,并喚醒它
            //從空閑的線程隊列中獲取
            int lastIndex = idleList.size() - 1;
            thread = (PooledThread) idleList.get(lastIndex);
            idleList.remove(lastIndex);
            thread.setTarget(target);
        }
        //池中沒有相應優先級的線程
        else 
        { 
            threadCreationCounter_++;
            // 創建新線程,
            thread = new PooledThread(target, "PooledThread #" + threadCreationCounter_, this);
            // 新線程放入池中
            switch (priority) 
            {
                case LOW_PRIORITY :
                {
                    thread.setPriority(Thread.MIN_PRIORITY);
                    break;
                }
                case NORMAL_PRIORITY :
                {
                    thread.setPriority(Thread.NORM_PRIORITY);
                    break;
                }
                case HIGH_PRIORITY :
                {
                    thread.setPriority(Thread.MAX_PRIORITY);
                    break;
                }
                default :
                {
                    thread.setPriority(Thread.NORM_PRIORITY);
                    break;
                }
            }
            //啟動這個線程
            thread.start();
        }
    }
}

工作者線程代碼:

public class PooledThread extends Thread 
{
    private ThreadPool pool_;  // 池中線程需要知道自己所在的池
    private Runnable target_;   // 線程的任務
    private boolean shutDown_ = false;
    private boolean idle_ = false;//設置是否讓線程處于等待狀態
    
    private PooledThread() {
        super();
    }
    
    private PooledThread(Runnable target)
    {
        super(target); //初始化父類
    }
    
    private PooledThread(Runnable target, String name) 
    {
        super(target, name);
    }
    
    public PooledThread(Runnable target, String name, ThreadPool pool)
    {
        super(name);
        pool_ = pool;
        target_ = target;
    }
    
    private PooledThread(String name) 
    {
        super(name);//初始化父類
    }
    
    private PooledThread(ThreadGroup group, Runnable target)
    {
        super(group, target);
    }
    
    private PooledThread(ThreadGroup group, Runnable target, String name) 
    {
        super(group, target, name);
    }
    
    private PooledThread(ThreadGroup group, String name) 
    {
        super(group, name);
    }
    
    public java.lang.Runnable getTarget() 
    {
        return target_;
    }
    
    public boolean isIdle() 
    {
        return idle_;//返回當前的狀態
    }
    
    //工作者線程與通常線程不同之處在于run()方法的不同。通常的線程,
    //完成線程應該執行的代碼后,自然退出,線程結束。
    //虛擬機在線程結束后收回分配給線程的資源,線程對象被垃圾回收。]
    //而這在池化的工作者線程中是應該避免的,否則線程池就失去了意義。
    //作為可以被放入池中并重新利用的工作者線程,它的run()方法不應該結束,
    //隨意,在隨后可以看到的實現中,run()方法執行完target對象的代碼后,
    //就將自身repool(),然后調用wait()方法,使自己睡眠而不是退出循環和run()。
    //這就使線程池實現的要點。
    public void run() 
    {
        // 這個循環不能結束,除非池類要求線程結束
        // 每一次循環都會執行一次池類分配給的任務target
        while (!shutDown_) 
        {  
            idle_ = false;
            if (target_ != null) 
            {
                target_.run();  // 運行target中的代碼
            }
            idle_ = true;
            try 
            {
                //線程通知池重新將自己放回到池中
                pool_.repool(this);  // 
                //進入池中后睡眠,等待被喚醒執行新的任務,
                //這里是線程池中線程于普通線程的run()不同的地方。
                synchronized (this) 
                {
                    wait();
                }
            }
            catch (InterruptedException ie)
            {
            }
            idle_ = false;
        }
        //循環這里不能結束,否則線程結束,資源被VM收回,
        //就無法起到線程池的作用了
    }
    
    
    public synchronized void setTarget(java.lang.Runnable newTarget) 
    {//設置新的target,并喚醒睡眠中的線程
        target_ = newTarget;  // 新任務
        notifyAll();          // 喚醒睡眠的線程
    }
    
    public synchronized void shutDown()
    {
        shutDown_ = true;
        notifyAll();
    }
}

測試代碼:

public static void main(String[] args)
    {
        System.out.println("Testing ThreadPool ");
        System.out.println("Creating ThreadPool ");
        ThreadPool pool = ThreadPool.instance();
        pool.setDebug(true);
        class TestRunner implements Runnable 
        {
            public int count = 0;
            public void run() 
            {
                System.out.println("Testrunner sleeping 5 seconds ");
                //此方法使本線程睡眠5秒
                synchronized (this) 
                {
                    try 
                    {
                        wait(5000);//等待5秒時間
                    }
                    catch (InterruptedException ioe) 
                    {
                    }
                }
                System.out.println("Testrunner leaving  ");
                count++;
            }
        }
        System.out.println("Starting a new thread ");
        TestRunner runner = new TestRunner();
        pool.start(runner, pool.HIGH_PRIORITY);
        System.out.println("count : " + runner.count);
        System.out.println("Thread count : " + pool.getCreatedThreadsCount());
        pool.shutdown();
    }
}

結果

Testing ThreadPool 
Creating ThreadPool 

Starting a new thread 

Testrunner sleeping 
5 seconds 
count : 
0
Thread count : 
1
ThreadPool : shutting down 

ThreadPool : shutdown done
.
Testrunner leaving  

ThreadPool
.repool() : Destroying incoming thread.
ThreadPool
.recycle() : done.



 本文由用戶 y0657bys 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!