FutureTask源碼分析

OrvilleBurl 7年前發布 | 8K 次閱讀 源碼分析 Java開發

對于java的并發編程來說,我們都知道Thread和runnable,這是創建一個線程最基本的兩種方法,但是這兩種方法創建的線程是不支持對線程的執行結果進行返回的。雖然我們可以通過傳遞引用的方式實現,但是實現起來未免太復雜。這個時候我們可能要用到Callable,callable是一個JDK提供的一個支持線程返回結果的一個接口,通過實現call方法,能返回指定泛型的變量。

class CallableTask implements Callable<Integer>{

    @Override
    public Integer call() throws Exception {
        System.out.println("call runing");
        Thread.sleep(5000);
        return 1;
    }
}
public class CallableTest {

    public static void main(String args[]){

        CallableTask task = new CallableTask();
        try {
            System.out.println("call start");
            ExecutorService service = Executors.newSingleThreadExecutor();
            Future fu = service.submit(task);
            System.out.println(fu.get());
            service.shutdown();
            System.out.println("call end");
        } catch (Exception e) {
            e.printStackTrace();
        }
}
}

可以通過線程池去實現任務的提交,任務提交后會返回future對象,通過get方法即可獲得返回值。

注意:這里其實是不推薦調用call方法的,實際上直接調用call方法和runnable的run方法效果是一樣的。

其實JDK提供了一種更好的提交方式,它可以將Runnable和Callable進行封裝,以便于提交到線程池。并且可以對線程有更好的控制,比如取消線程的執行,它就是FutureTask。

FutureTask只是簡單的對Callable以及Runnable進行了封裝,提供了額外的對線程控制的功能以及阻塞獲取請求結果的功能,其實對于線程池的submit方法,對于每一個任務都會封裝成一個FutureTask來運行。

/**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

    /**
     * Returns a <tt>RunnableFuture</tt> for the given callable task.
     *
     * @param callable the callable task being wrapped
     * @return a <tt>RunnableFuture</tt> which when run will call the
     * underlying callable and which, as a <tt>Future</tt>, will yield
     * the callable's result as its result and provide for
     * cancellation of the underlying task.
     * @since 1.6
     */
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }

那么FutureTask到底是怎么實現的呢?

首先看構造方法:

/**
     * Creates a <tt>FutureTask</tt> that will, upon running, execute the
     * given <tt>Callable</tt>.
     *
     * @param  callable the callable task
     * @throws NullPointerException if callable is null
     */
    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        sync = new Sync(callable);
    }

    /**
     * Creates a <tt>FutureTask</tt> that will, upon running, execute the
     * given <tt>Runnable</tt>, and arrange that <tt>get</tt> will return the
     * given result on successful completion.
     *
     * @param runnable the runnable task
     * @param result the result to return on successful completion. If
     * you don't need a particular result, consider using
     * constructions of the form:
     * {@code Future<?> f = new FutureTask<Void>(runnable, null)}
     * @throws NullPointerException if runnable is null
     */
    public FutureTask(Runnable runnable, V result) {
        sync = new Sync(Executors.callable(runnable, result));
    }

FutureTask可以接受Runnable以及Callable兩種類型的參數,在初始化的時候內部構造了一個Sync的AQS實現類的實例,對于runnable類型的線程需要轉化成Callable,同時可以指定返回值。

當我們再觀察其他方法的時候,幾乎都是委托Sync去處理的,那么重點就放在了Sync上。

首先看看Sync里面有幾個狀態:

/** State value representing that task is ready to run */
        private static final int READY     = 0;//準備就緒
        /** State value representing that task is running */
        private static final int RUNNING   = 1;//正在運行
        /** State value representing that task ran */
        private static final int RAN       = 2;//運行完畢
        /** State value representing that task was cancelled */
        private static final int CANCELLED = 4;//任務取消

一個FutureTask的實例就在上面幾個狀態之間進行輪轉,當執行線程時調用run方法,run方法又委托Syn的innerRun方法:

/**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    public void run() {
        sync.innerRun();
    }
   //首先CAS將status置為RUNING,可以防止結束前重復提交
        void innerRun() {
            if (!compareAndSetState(READY, RUNNING))
                return;

            runner = Thread.currentThread();
            //double check 防止在此之前被cancel
            if (getState() == RUNNING) { // recheck after setting thread
                V result;
                try {
                    result = callable.call();
                } catch (Throwable ex) {
                    setException(ex);
                    return;
                }
                //設置結果
                set(result);
            } else {
                //清除runner,喚醒阻塞線程
                releaseShared(0); // cancel
            }
        }

當執行線程的時候,首先做的是將AQS的狀態由READY變成RUNNING,因為Sync是AQS的實現類,這個也是改變AQS的狀態,改變狀態之后進行double check,此時是為了防止在這之前有Cancel的請求。如果Cancel了,那么releaseShared清除狀態并且喚醒get等待的線程。如果為Running狀態,接下來調用call方法,這里也就是為什么要提交到線程池執行了,注意call方法調用只是一個方法調用,而不像Thread.start那樣會直接返回,并且開啟新線程執行。當執行完畢之后,調用Set,Set其實也是委托給Sync的innerSet:

/**
     * Sets the result of this Future to the given value unless
     * this future has already been set or has been cancelled.
     * This method is invoked internally by the <tt>run</tt> method
     * upon successful completion of the computation.
     * @param v the value
     */
    protected void set(V v) {
        sync.innerSet(v);
    }

        void innerSet(V v) {
            for (;;) {
                int s = getState();
                if (s == RAN)
                    return;
                //收到取消信號,不設置結果,直接返回
                if (s == CANCELLED) {
                    // aggressively release to set runner to null,
                    // in case we are racing with a cancel request
                    // that will try to interrupt runner
                    releaseShared(0);
                    return;
                }
                //設置結果,并設置當前的狀態為RAN
                if (compareAndSetState(s, RAN)) {
                    //設置內容
                    result = v;
                    //喚醒阻塞線程
                    releaseShared(0);
                    done();
                    return;
                }
            }
        }

這里在Set的時候呢,首先也是判斷狀態如果是RAN直接返回,如果取消了,那么喚醒get等待的線程,并且返回。如果都沒有,那么設置FutureTask狀態為RAN,表示線程執行完了,同時設置restult為返回值,喚醒所有的等待線程。

上面其實在執行前和執行后都做了Cancel的檢查,如果取消,無論執行前后都是沒有結果set給result的。

接下來看看是怎么實現阻塞等待結果的,首先看get方法:

/**
     * @throws CancellationException {@inheritDoc}
     */
    public V get() throws InterruptedException, ExecutionException {
        return sync.innerGet();
    }

            V innerGet() throws InterruptedException, ExecutionException {
            //共享鎖,沒有完成會阻塞在這
            acquireSharedInterruptibly(0);
            //如果已經取消,那么拋出異常
            if (getState() == CANCELLED)
                throw new CancellationException();
            if (exception != null)
                throw new ExecutionException(exception);
            return result;
        }

同樣是委托機制,其實關鍵在于acquireSharedInterruptibly方法。

/**
     * Acquires in shared mode, aborting if interrupted.  Implemented
     * by first checking interrupt status, then invoking at least once
     * {@link #tryAcquireShared}, returning on success.  Otherwise the
     * thread is queued, possibly repeatedly blocking and unblocking,
     * invoking {@link #tryAcquireShared} until success or the thread
     * is interrupted.
     * @param arg the acquire argument
     * This value is conveyed to {@link #tryAcquireShared} but is
     * otherwise uninterpreted and can represent anything
     * you like.
     * @throws InterruptedException if the current thread is interrupted
     */
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0) //如果目前是RAN狀態或者是Cancel狀態的話標識已經完成或者結束
            doAcquireSharedInterruptibly(arg);//等待Task運行結束,喚醒阻塞隊列
    }
       /**
         * Implements AQS base acquire to succeed if ran or cancelled
         */
       protected int tryAcquireShared(int ignore) {
            return innerIsDone() ? 1 : -1;
        }
        boolean innerIsDone() {
            return ranOrCancelled(getState()) && runner == null;
        }
        private boolean ranOrCancelled(int state) {
            return (state & (RAN | CANCELLED)) != 0;
        }

其實這里還是使用了委托的機制,同時呢采用了一個共享鎖去實現同步,共享鎖有一個特點就是允許多個線程獲取鎖,其實這里對于get操作,其實多個線程同時get是沒有問題的,并且如果使用獨占鎖會降低性能,這里引入共享鎖感覺是比較巧妙的。

上面代碼將的是,首先線程回去check當前FutureTask的狀態,如果是RAN或者Cancel,表示線程已經結束,那么直接返回,如果當前不是上面狀態,證明此時線程沒執行或者沒執行完,那么需要阻塞等待,所以執行doAcquireSharedInterruptibly,讓線程等待,等待innerSet之后或者Cancel之后的releaseShared。releaseShared會逐步的喚醒所有阻塞在get上的線程,這樣所以線程都能get到結果。提高了效率。

FutureTask實現不但簡單而且巧妙(比如巧妙的運用了共享鎖),最重要的是使用的也是十分廣泛:

  1. 做異步處理,對于下載,或者生成PDF這種比較重的場景,我們可以通過將請求異步化,抽象成FutureTask提交到線程池中運行,從而避免占用大量的Worker線程(Tomcat或者RPC框架),導致后面的請求阻塞。

  2. 對于服務的同步調用,我們可以利用FutureTask進行服務的并行調用,而在最后進行結果的匯總,這樣就能變串行調用為并行調用,大大的減小請求的時間(類似于Fork-Join)。

最后,異步線程處理和并行處理是個好東西,需要用起來!!!。

 

來自:http://www.jianshu.com/p/dfff17300a87

 

 本文由用戶 OrvilleBurl 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!