Java Executor 框架學習

jopen 9年前發布 | 9K 次閱讀 Java Java開發

 

大多數并發都是通過任務執行的方式來實現的。一般有兩種方式執行任務:串行和并行。

class SingleThreadWebServer {
  public static void main(String[] args) throws Exception {
    ServerSocket socket = new ServerSocket(80);
    while(true) {
      Socket conn = socket.accept();
      handleRequest(conn);
    }
  }
}
class ThreadPerTaskWebServer {
  public static void main(String[] args) throws Exception {
    ServerSocket socket = new ServerSocket(80);
    while(true) {
      final Socket conn = socket.accept();
      Runnable task = new Runnable() {
        public void run() {
          handleRequest(conn);
        }
      };
      new Thread(task).start();
    }
  }
}

當然上面的這兩種方式都是有問題的。單線程的問題就是并發量會是瓶頸,多線程版本就是無限制的創建線程會導致資源不足問題。

Executor 框架

任務是一組邏輯工作單元,而線程是使任務異步執行的機制。

JDK 提供了 Executor 接口:

public interface Executor {
    void execute(Runnable command);
}

雖然 Executor 接口比較簡單,但是卻是異步任務執行框架的基礎,該框架能支持多種不同類型的任務執行策略。它提供了一種標準的方式把任務的提交過程與執行過程進行了解耦。用 Runnable 來代表任務。Executor 的實現提供了對生命周期的支持以及統計信息應用程序管理等機制。

Executor 是基于生產者消費者模式的,提交任務的操作相當于生產者,執行任務的線程相當于消費。

基于 Executor 的 WebServer 例子如下:

public class TaskExecutorWebServer {
  private static final int NTHREADS = 100;
  private static final Executor exec = Executors.newFixedThreadPool(NTHREADS);
  public static void main(String[] args) throws Exception {
    ServerSocket serverSocket = new ServerSocket(80);
    while (true) {
      final Socket conn = serverSocket.accept();
      Runnable task = new Runnable() {
        @Override
        public void run() {
          handleRequest(conn);
        }
      };
      exec.execute(task);
    }
  }
}

另外可以自己實現 Executor 來控制是并發還是并行的,如下面代碼:

/**

  • 執行已提交的 Runnable 任務的對象。
  • 此接口提供一種將任務提交與每個任務將如何運行的機制(包括線程使用的細節、調度等)分離開來的方法。
  • 通常使用 Executor 而不是顯式地創建線程。
  • @author renchunxiao / public class ExecutorDemo { public static void main(String[] args) { Executor executor = new ThreadExecutor(); executor.execute(new Runnable() { @Override public void run() {
     // do something
    
    } }); Executor executor2 = new SerialExecutor(); executor2.execute(new Runnable() { @Override public void run() {
     // do something
    
    } }); } } /**
  • 創建一個線程來執行 command *
  • @author renchunxiao / class ThreadExecutor implements Executor { @Override public void execute(Runnable command) { new Thread(command).start(); } } /**
  • 串行執行 command *
  • @author renchunxiao / class SerialExecutor implements Executor { @Override public void execute(Runnable command) { command.run(); } }</pre>

    線程池

    線程池就是線程的資源池,可以通過 Executors 中的靜態工廠方法來創建線程池。

    • newFixedThreadPool。創建固定長度的線程池,每次提交任務創建一個線程,直到達到線程池的最大數量,線程池的大小不再變化。
    • newSingleThreadExecutor。單個線程池。
    • newCachedThreadPool。根據任務規模變動的線程池。
    • newScheduledThreadPool。創建固定長度的線程池,以延遲或定時的方式來執行任務。

    JVM 只有在所有非守護線程全部終止后才會退出,所以,如果無法正確的關閉 Executor,那么 JVM 就無法結束。

    為了解決執行服務的生命周期問題,有個擴展 Executor 接口的新接口 ExecutorService。

    public interface ExecutorService extends Executor {
    void shutdown();
    List<Runnable> shutdownNow();
    boolean isShutdown();
    boolean isTerminated();
    boolean awaitTermination(long timeout, TimeUnit unit)
     throws InterruptedException;
    <T> Future<T> submit(Callable<T> task);
    <T> Future<T> submit(Runnable task, T result);
    Future<?> submit(Runnable task);
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
     throws InterruptedException;
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
    
               long timeout, TimeUnit unit)
    
    throws InterruptedException; <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException; <T> T invokeAny(Collection<? extends Callable<T>> tasks,
       long timeout, TimeUnit unit)
    
    throws InterruptedException, ExecutionException, TimeoutException; }</pre>

    ExecutorService 生命周期有三種狀態:運行、關閉、已終止。ExecutorService 在初始創建時處于運行狀態。shutdown 方法會平緩關閉:不在接受新的任務,并且等待已經執行的任務執行完成(包括那些還未開始的任務)。shutdownNow 方法將粗暴關閉:它將嘗試取消所有運行中的任務,并且不再啟動隊列中尚未開始的任務。所有任務都執行完成后進入到已終止狀態。

    Callable 和 Future

    Executor 框架使用 Runnable 作為基本的任務表示形式。Runnable 是一種有局限性的抽象,它的 run 方法不能返回值和拋出一個受檢查異常。

    許多任務實際上是存在延時的計算,例如數據庫查詢,從網絡獲取資源。對于這些任務,Callable 是更好的抽象,它認為 call 將返回一個值,并且可能拋出異常。

    Executor 執行的任務有四個生命周期階段:創建、提交、開始和完成。由于有些任務需要很長時間有可能希望取消,在 Executor 框架當中,已提交未開始的任務可以取消。

    Future 表示一個任務的生命周期,并且提供了相應的方法來判斷是否已經完成或取消,以及獲取任務的結果和取消任務等。

    【參考資料】

    1. Java并發編程實戰
 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!