java.util.concurrent包下的類詳細解釋

jopen 11年前發布 | 76K 次閱讀 Java開發 Java

1. java.util.concurrent.Executors

import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit;

public class newFixedThreadPool { public static void main(String[] args) throws InterruptedException{ ExecutorService service = Executors.newFixedThreadPool(2); for (int i = 0; i < 4; i++) { Runnable run = new Runnable() { @Override public void run() { System.out.println("thread start"); } }; service.execute(run); } service.shutdown(); service.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); System.out.println("all thread complete");

} }</pre>

output:

thread start
thread start
thread start
thread start
all thread complete

newFixedThreadPool創建一個固定大小的線程池。

shutdown():用于關閉啟動線程,如果不調用該語句,jvm不會關閉。

awaitTermination():用于等待子線程結束,再繼續執行下面的代碼。該例中我設置一直等著子線程結束。

ExecutorService 建立多線程的步驟:

</tr>

</tr>

</tr> </tbody> </table>

幾種不同的ExecutorService線程池對象

1。定義線程類 class Handler implements Runnable{
}
2。建立ExecutorService線程池 ExecutorService executorService = Executors.newCachedThreadPool();

或者

int cpuNums = Runtime.getRuntime().availableProcessors();
                //獲取當前系統的CPU 數目
ExecutorService executorService =Executors.newFixedThreadPool(cpuNums * POOL_SIZE);
                //ExecutorService通常根據系統資源情況靈活定義線程池大小
3。調用線程池操作 循環操作,成為daemon,把新實例放入Executor池中
      while(true){
        executorService.execute(new Handler(socket)); 
           // class Handler implements Runnable{
        或者
        executorService.execute(createTask(i));
            //private static Runnable createTask(final int taskID)
      }

execute(Runnable對象)方法
其實就是對Runnable對象調用start()方法
(當然還有一些其他后臺動作,比如隊列,優先級,IDLE timeout,active激活等)

</tr>

</tr>

</tr>

</tr> </tbody> </table>

上面四種線程池,都使用Executor的缺省線程工廠建立線程,也可單獨定義自己的線程工廠</span>
下面是缺省線程工廠代碼:

1.newCachedThreadPool()  -緩存型池子,先查看池中有沒有以前建立的線程,如果有,就reuse.如果沒有,就建一個新的線程加入池中
-緩存型池子通常用于執行一些生存期很短的異步型任務
 因此在一些面向連接的daemon型SERVER中用得不多。
-能reuse的線程,必須是timeout IDLE內的池中線程,缺省timeout是60s,超過這個IDLE時長,線程實例將被終止及移出池。
  注意,放入CachedThreadPool的線程不必擔心其結束,超過TIMEOUT不活動,其會自動被終止。
2. newFixedThreadPool -newFixedThreadPool與cacheThreadPool差不多,也是能reuse就用,但不能隨時建新的線程
-其獨特之處:任意時間點,最多只能有固定數目的活動線程存在,此時如果有新的線程要建立,只能放在另外的隊列中等待直到當前的線程中某個線程終止直接被移出池子
-和cacheThreadPool不同,FixedThreadPool沒有IDLE機制(可能也有,但既然文檔沒提,肯定非常長,類似依賴上層的TCP或UDP IDLE機制之類的),所以FixedThreadPool多數針對一些很穩定很固定的正規并發線程,多用于服務器
-從方法的源代碼看,cache池和fixed 池調用的是同一個底層池,只不過參數不同:
fixed池線程數固定,并且是0秒IDLE(無IDLE)
cache池線程數支持0-Integer.MAX_VALUE(顯然完全沒考慮主機的資源承受能力),60秒IDLE  
3.ScheduledThreadPool -調度型線程池
-這個池子里的線程可以按schedule依次delay執行,或周期執行
4.SingleThreadExecutor -單例線程,任意時間池中只能有一個線程
-用的是和cache池和fixed池相同的底層池,但線程數目是1-1,0秒IDLE(無IDLE)

</tr> </tbody> </table> 也可自己定義ThreadFactory,加入建立池的參數中

    static class DefaultThreadFactory implements ThreadFactory {
        static final AtomicInteger poolNumber = new AtomicInteger(1);
        final ThreadGroup group;
        final AtomicInteger threadNumber = new AtomicInteger(1);
        final String namePrefix;

        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null)? s.getThreadGroup() :Thread.currentThread().getThreadGroup();
          
            namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-";
        }

        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,namePrefix + threadNumber.getAndIncrement(),0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }

</tr> </tbody> </table>

Executor的execute()方法</span>
execute() 方法將Runnable實例加入pool中,并進行一些pool size計算和優先級處理
execute() 方法本身在Executor接口中定義,有多個實現類都定義了不同的execute()方法
如ThreadPoolExecutor類(cache,fiexed,single三種池子都是調用它)的execute方法如下:

 public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {

</tr> </tbody> </table>

 

2. CyclicBarrier

假設有只有的一個場景:每個線程代表一個跑步運動員,當運動員都準備好后,才一起出發,只要有一個人沒有準備好,大家都等待.

import java.io.IOException;
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

class Runner implements Runnable {

private CyclicBarrier barrier;

private String name;

public Runner(CyclicBarrier barrier, String name) { super(); this.barrier = barrier; this.name = name; }

@Override public void run() { try { Thread.sleep(1000 * (new Random()).nextInt(8)); System.out.println(name + " 準備OK."); barrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } System.out.println(name + " Go!!"); } }

public class Race {

public static void main(String[] args) throws IOException, InterruptedException { CyclicBarrier barrier = new CyclicBarrier(3);

ExecutorService executor = Executors.newFixedThreadPool(3); executor.submit(new Thread(new Runner(barrier, "zhangsan"))); executor.submit(new Thread(new Runner(barrier, "lisi"))); executor.submit(new Thread(new Runner(barrier, "wangwu")));

executor.shutdown(); }

}</pre>

output:

lisi 準備OK.
wangwu 準備OK.
zhangsan 準備OK.
zhangsan Go!!
lisi Go!!
wangwu Go!!

 

3. ThreadPoolExecutor

newFixedThreadPool生成一個固定的線程池,顧名思義,線程池的線程是不會釋放的,即使它是Idle。這就會產生性能問題,比如如果線程池的大小為200,當全部使用完畢后,所有的線程會繼續留在池中,相應的內存和線程切換(while(true)+sleep循環)都會增加。如果要避免這個問題,就必須直接使用ThreadPoolExecutor()來構造。可以像Tomcat的線程池一樣設置“最大線程數”、“最小線程數”和 “空閑線程keepAlive的時間”。

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class newThreadPoolExecute { public static void main(String[] args) { BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(); ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 1, TimeUnit.DAYS, queue);

for (int i = 0; i < 20; i++) { final int index = i; executor.execute(new Runnable() { public void run() { try { Thread.sleep(4000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(String.format("thread %d finished", index)); } }); } executor.shutdown(); }

}</pre>

output:

thread 1 finished
thread 0 finished
thread 2 finished
thread 5 finished
thread 3 finished
thread 4 finished
thread 8 finished
thread 7 finished
thread 6 finished
thread 9 finished
thread 11 finished
thread 10 finished
thread 13 finished
thread 14 finished
thread 12 finished
thread 17 finished
thread 15 finished
thread 16 finished
thread 18 finished
thread 19 finished

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
            if (runState == RUNNING && workQueue.offer(command)) {
                if (runState != RUNNING || poolSize == 0)
                    ensureQueuedTaskHandled(command);
            }
            else if (!addIfUnderMaximumPoolSize(command))
                reject(command); // is shutdown or saturated
        }
    }
  • sesese色