FutureTask源碼分析
對于java的并發編程來說,我們都知道Thread和runnable,這是創建一個線程最基本的兩種方法,但是這兩種方法創建的線程是不支持對線程的執行結果進行返回的。雖然我們可以通過傳遞引用的方式實現,但是實現起來未免太復雜。這個時候我們可能要用到Callable,callable是一個JDK提供的一個支持線程返回結果的一個接口,通過實現call方法,能返回指定泛型的變量。
class CallableTask implements Callable<Integer>{
@Override
public Integer call() throws Exception {
System.out.println("call runing");
Thread.sleep(5000);
return 1;
}
}
public class CallableTest {
public static void main(String args[]){
CallableTask task = new CallableTask();
try {
System.out.println("call start");
ExecutorService service = Executors.newSingleThreadExecutor();
Future fu = service.submit(task);
System.out.println(fu.get());
service.shutdown();
System.out.println("call end");
} catch (Exception e) {
e.printStackTrace();
}
}
}
可以通過線程池去實現任務的提交,任務提交后會返回future對象,通過get方法即可獲得返回值。
注意:這里其實是不推薦調用call方法的,實際上直接調用call方法和runnable的run方法效果是一樣的。
其實JDK提供了一種更好的提交方式,它可以將Runnable和Callable進行封裝,以便于提交到線程池。并且可以對線程有更好的控制,比如取消線程的執行,它就是FutureTask。
FutureTask只是簡單的對Callable以及Runnable進行了封裝,提供了額外的對線程控制的功能以及阻塞獲取請求結果的功能,其實對于線程池的submit方法,對于每一個任務都會封裝成一個FutureTask來運行。
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
/**
* Returns a <tt>RunnableFuture</tt> for the given callable task.
*
* @param callable the callable task being wrapped
* @return a <tt>RunnableFuture</tt> which when run will call the
* underlying callable and which, as a <tt>Future</tt>, will yield
* the callable's result as its result and provide for
* cancellation of the underlying task.
* @since 1.6
*/
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
那么FutureTask到底是怎么實現的呢?
首先看構造方法:
/**
* Creates a <tt>FutureTask</tt> that will, upon running, execute the
* given <tt>Callable</tt>.
*
* @param callable the callable task
* @throws NullPointerException if callable is null
*/
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
sync = new Sync(callable);
}
/**
* Creates a <tt>FutureTask</tt> that will, upon running, execute the
* given <tt>Runnable</tt>, and arrange that <tt>get</tt> will return the
* given result on successful completion.
*
* @param runnable the runnable task
* @param result the result to return on successful completion. If
* you don't need a particular result, consider using
* constructions of the form:
* {@code Future<?> f = new FutureTask<Void>(runnable, null)}
* @throws NullPointerException if runnable is null
*/
public FutureTask(Runnable runnable, V result) {
sync = new Sync(Executors.callable(runnable, result));
}
FutureTask可以接受Runnable以及Callable兩種類型的參數,在初始化的時候內部構造了一個Sync的AQS實現類的實例,對于runnable類型的線程需要轉化成Callable,同時可以指定返回值。
當我們再觀察其他方法的時候,幾乎都是委托Sync去處理的,那么重點就放在了Sync上。
首先看看Sync里面有幾個狀態:
/** State value representing that task is ready to run */
private static final int READY = 0;//準備就緒
/** State value representing that task is running */
private static final int RUNNING = 1;//正在運行
/** State value representing that task ran */
private static final int RAN = 2;//運行完畢
/** State value representing that task was cancelled */
private static final int CANCELLED = 4;//任務取消
一個FutureTask的實例就在上面幾個狀態之間進行輪轉,當執行線程時調用run方法,run方法又委托Syn的innerRun方法:
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
public void run() {
sync.innerRun();
}
//首先CAS將status置為RUNING,可以防止結束前重復提交
void innerRun() {
if (!compareAndSetState(READY, RUNNING))
return;
runner = Thread.currentThread();
//double check 防止在此之前被cancel
if (getState() == RUNNING) { // recheck after setting thread
V result;
try {
result = callable.call();
} catch (Throwable ex) {
setException(ex);
return;
}
//設置結果
set(result);
} else {
//清除runner,喚醒阻塞線程
releaseShared(0); // cancel
}
}
當執行線程的時候,首先做的是將AQS的狀態由READY變成RUNNING,因為Sync是AQS的實現類,這個也是改變AQS的狀態,改變狀態之后進行double check,此時是為了防止在這之前有Cancel的請求。如果Cancel了,那么releaseShared清除狀態并且喚醒get等待的線程。如果為Running狀態,接下來調用call方法,這里也就是為什么要提交到線程池執行了,注意call方法調用只是一個方法調用,而不像Thread.start那樣會直接返回,并且開啟新線程執行。當執行完畢之后,調用Set,Set其實也是委托給Sync的innerSet:
/**
* Sets the result of this Future to the given value unless
* this future has already been set or has been cancelled.
* This method is invoked internally by the <tt>run</tt> method
* upon successful completion of the computation.
* @param v the value
*/
protected void set(V v) {
sync.innerSet(v);
}
void innerSet(V v) {
for (;;) {
int s = getState();
if (s == RAN)
return;
//收到取消信號,不設置結果,直接返回
if (s == CANCELLED) {
// aggressively release to set runner to null,
// in case we are racing with a cancel request
// that will try to interrupt runner
releaseShared(0);
return;
}
//設置結果,并設置當前的狀態為RAN
if (compareAndSetState(s, RAN)) {
//設置內容
result = v;
//喚醒阻塞線程
releaseShared(0);
done();
return;
}
}
}
這里在Set的時候呢,首先也是判斷狀態如果是RAN直接返回,如果取消了,那么喚醒get等待的線程,并且返回。如果都沒有,那么設置FutureTask狀態為RAN,表示線程執行完了,同時設置restult為返回值,喚醒所有的等待線程。
上面其實在執行前和執行后都做了Cancel的檢查,如果取消,無論執行前后都是沒有結果set給result的。
接下來看看是怎么實現阻塞等待結果的,首先看get方法:
/**
* @throws CancellationException {@inheritDoc}
*/
public V get() throws InterruptedException, ExecutionException {
return sync.innerGet();
}
V innerGet() throws InterruptedException, ExecutionException {
//共享鎖,沒有完成會阻塞在這
acquireSharedInterruptibly(0);
//如果已經取消,那么拋出異常
if (getState() == CANCELLED)
throw new CancellationException();
if (exception != null)
throw new ExecutionException(exception);
return result;
}
同樣是委托機制,其實關鍵在于acquireSharedInterruptibly方法。
/**
* Acquires in shared mode, aborting if interrupted. Implemented
* by first checking interrupt status, then invoking at least once
* {@link #tryAcquireShared}, returning on success. Otherwise the
* thread is queued, possibly repeatedly blocking and unblocking,
* invoking {@link #tryAcquireShared} until success or the thread
* is interrupted.
* @param arg the acquire argument
* This value is conveyed to {@link #tryAcquireShared} but is
* otherwise uninterpreted and can represent anything
* you like.
* @throws InterruptedException if the current thread is interrupted
*/
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0) //如果目前是RAN狀態或者是Cancel狀態的話標識已經完成或者結束
doAcquireSharedInterruptibly(arg);//等待Task運行結束,喚醒阻塞隊列
}
/**
* Implements AQS base acquire to succeed if ran or cancelled
*/
protected int tryAcquireShared(int ignore) {
return innerIsDone() ? 1 : -1;
}
boolean innerIsDone() {
return ranOrCancelled(getState()) && runner == null;
}
private boolean ranOrCancelled(int state) {
return (state & (RAN | CANCELLED)) != 0;
}
其實這里還是使用了委托的機制,同時呢采用了一個共享鎖去實現同步,共享鎖有一個特點就是允許多個線程獲取鎖,其實這里對于get操作,其實多個線程同時get是沒有問題的,并且如果使用獨占鎖會降低性能,這里引入共享鎖感覺是比較巧妙的。
上面代碼將的是,首先線程回去check當前FutureTask的狀態,如果是RAN或者Cancel,表示線程已經結束,那么直接返回,如果當前不是上面狀態,證明此時線程沒執行或者沒執行完,那么需要阻塞等待,所以執行doAcquireSharedInterruptibly,讓線程等待,等待innerSet之后或者Cancel之后的releaseShared。releaseShared會逐步的喚醒所有阻塞在get上的線程,這樣所以線程都能get到結果。提高了效率。
FutureTask實現不但簡單而且巧妙(比如巧妙的運用了共享鎖),最重要的是使用的也是十分廣泛:
-
做異步處理,對于下載,或者生成PDF這種比較重的場景,我們可以通過將請求異步化,抽象成FutureTask提交到線程池中運行,從而避免占用大量的Worker線程(Tomcat或者RPC框架),導致后面的請求阻塞。
-
對于服務的同步調用,我們可以利用FutureTask進行服務的并行調用,而在最后進行結果的匯總,這樣就能變串行調用為并行調用,大大的減小請求的時間(類似于Fork-Join)。
最后,異步線程處理和并行處理是個好東西,需要用起來!!!。
來自:http://www.jianshu.com/p/dfff17300a87