Java多線程編程模式實戰指南一:Active Object模式(下)
轉載自:http://www.infoq.com/cn/articles/Java-multithreaded-programming-mode-active-object-part2
Active Object模式的評價與實現考量
Active Object模式通過將方法的調用與執行分離,實現了異步編程。有利于提高并發性,從而提高系統的吞吐率。
Active Object模式還有個好處是它可以將任務(MethodRequest)的提交(調用異步方法)和任務的執行策略(Execution Policy)分離。任務的執行策略被封裝在Scheduler的實現類之內,因此它對外是不“可見”的,一旦需要變動也不會影響其它代碼,降低了系統的耦合性。任務的執行策略可以反映以下一些問題:
- 采用什么順序去執行任務,如FIFO、LIFO、或者基于任務中包含的信息所定的優先級?
- 多少個任務可以并發執行?
- 多少個任務可以被排隊等待執行?
- 如果有任務由于系統過載被拒絕,此時哪個任務該被選中作為犧牲品,應用程序該如何被通知到?
- 任務執行前、執行后需要執行哪些操作?
這意味著,任務的執行順序可以和任務的提交順序不同,可以采用單線程也可以采用多線程去執行任務等等。
當然,好處的背后總是隱藏著代價,Active Object模式實現異步編程也有其代價。該模式的參與者有6個之多,其實現過程也包含了不少中間的處理:MethodRequest對象的生成、MethodRequest對象的移動(進出緩沖區)、MethodRequest對象的運行調度和線程上下文切換等。這些處理都有其空間和時間的代價。因此,Active Object模式適合于分解一個比較耗時的任務(如涉及I/O操作的任務):將任務的發起和執行進行分離,以減少不必要的等待時間。
雖然模式的參與者較多,但正如本文案例的實現代碼所展示的,其中大部分的參與者我們可以利用JDK自身提供的類來實現,以節省編碼時間。如表1所示。
表 1. 使用JDK現有類實現Active Object的一些參與者
參與者名稱 |
可以借用的JDK類 |
備注 |
Scheduler |
Java Executor Framework中的java.util.concurrent.ExecutorService接口的相關實現類,如java.util.concurrent.ThreadPoolExecutor。 |
ExecutorService接口所定義的submit(Callable<T> task)方法相當于圖2中的enqueue方法。 |
ActivationQueue |
java.util.concurrent.LinkedBlockingQueue |
若Scheduler采用java.util.concurrent.ThreadPoolExecutor,則java.util.concurrent.LinkedBlockingQueue實例作為ThreadPoolExecutor構造器的參數。 |
MethodRequest |
java.util.concurrent.Callable接口的匿名實現類。 |
Callable接口比起Runnable接口的優勢在于它定義的call方法有返回值,便于將該返回值傳遞給Future實例。 |
Future |
java.util.concurrent.Future |
ExecutorService接口所定義的submit(Callable<T> task)方法的返回值類型就是java.util.concurrent.Future。 |
錯誤隔離
錯誤隔離指一個任務的處理失敗不影響其它任務的處理。每個MethodRequest實例可以看作一個任務。那么,Scheduler的實現類在執行MethodRequest時需要注意錯誤隔離。選用JDK中現成的類(如ThreadPoolExecutor)來實現Scheduler的一個好處就是這些類可能已經實現了錯誤隔離。而如果自己編寫代碼實現Scheduler,用單個Active Object工作線程逐一執行所有任務,則需要特別注意線程的run方法的異常處理,確保不會因為個別任務執行時遇到一些運行時異常而導致整個線程終止。如清單6的示例代碼所示。
清單 6. 自己動手實現Scheduler的錯誤隔離示例代碼
public class CustomScheduler implements Runnable { private LinkedBlockingQueue<Runnable> activationQueue = new LinkedBlockingQueue<Runnable>(); @Override public void run() { dispatch(); } public <T> Future<T> enqueue(Callable<T> methodRequest) { final FutureTask<T> task = new FutureTask<T>(methodRequest) { @Override public void run() { try { super.run(); //捕獲所以可能拋出的對象,避免該任務運行失敗而導致其所在的線程終止。 } catch (Throwable t) { this.setException(t); } } }; try { activationQueue.put(task); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } return task; } public void dispatch() { while (true) { Runnable methodRequest; try { methodRequest = activationQueue.take(); //防止個別任務執行失敗導致線程終止的代碼在run方法中 methodRequest.run(); } catch (InterruptedException e) { // 處理該異常 } } } }
緩沖區監控
如果ActivationQueue是有界緩沖區,則對緩沖區的當前大小進行監控無論是對于運維還是測試來說都有其意義。從測試的角度來看,監控緩沖區有助于確定緩沖區容量的建議值(合理值)。清單3所示的代碼,即是通過定時任務周期性地調用ThreadPoolExecutor的getQueue方法對緩沖區的大小進行監控。當然,在監控緩沖區的時候,往往只需要大致的值,因此在監控代碼中要避免不必要的鎖。
緩沖區飽和處理策略
當任務的提交速率大于任務的執行數率時,緩沖區可能逐漸積壓到滿。這時新提交的任務會被拒絕。無論是自己編寫代碼還是利用JDK現有類來實現Scheduler,對于緩沖區滿時新任務提交失敗,我們需要一個處理策略用于決定此時哪個任務會成為“犧牲品”。若使用ThreadPoolExecutor來實現Scheduler有個好處是它已經提供了幾個緩沖區飽和處理策略的實現代碼,應用代碼可以直接調用。如清單3的代碼所示,本文案例中我們選擇了拋棄最老的任務作為處理策略。java.util.concurrent.RejectedExecutionHandler接口是ThreadPoolExecutor對緩沖區飽和處理策略的抽象,JDK中提供的具體實現如表2所示。
表 2. JDK提供的緩沖區飽和處理策略實現類
實現類 |
所實現的處理策略 |
ThreadPoolExecutor.AbortPolicy |
直接拋出異常。 |
ThreadPoolExecutor.DiscardPolicy |
放棄當前被拒絕的任務(而不拋出任何異常)。 |
ThreadPoolExecutor.DiscardOldestPolicy |
將緩沖區中最老的任務放棄,然后重新嘗試接納被拒絕的任務。 |
ThreadPoolExecutor.CallerRunsPolicy |
在任務的提交方線程中運行被拒絕的任務。 |
當然,對于ThreadPoolExecutor而言,其工作隊列滿不一定就意味著新提交的任務會被拒絕。當其最大線程池大小大于其核心線程池大小時,工作隊列滿的情況下,新提交的任務會用所有核心線程之外的新增線程來執行,直到工作線程數達到最大線程數時,新提交的任務會被拒絕。
Scheduler空閑工作線程清理
如果Scheduler采用多個工作線程(如采用ThreadPoolExecutor這樣的線程池)來執行任務。則可能需要清理空閑的線程以節約資源。清單3的代碼就是直接使用了ThreadPoolExecutor的現有功能,在初始化其實例時通過指定其構造器的第3、4個參數( long keepAliveTime, TimeUnit unit),告訴ThreadPoolExecutor對于核心工作線程以外的線程若其已經空閑了指定時間,則將其清理掉。
可復用的Active Object模式實現
盡管利用JDK中的現成類可以極大地簡化Active Object模式的實現。但如果需要頻繁地在不同場景下使用Active Object模式,則需要一套更利于復用的代碼,以節約編碼的時間和使代碼更加易于理解。清單7展示一段基于Java動態代理的可復用的Active Object模式的Proxy參與者的實現代碼。
清單 7. 可復用的Active Object模式Proxy參與者實現
import java.lang.reflect.InvocationHandler; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; public abstract class ActiveObjectProxy { private static class DispatchInvocationHandler implements InvocationHandler { private final Object delegate; private final ExecutorService scheduler; public DispatchInvocationHandler(Object delegate, ExecutorService executorService) { this.delegate = delegate; this.scheduler = executorService; } private String makeDelegateMethodName(final Method method, final Object[] arg) { String name = method.getName(); name = "do" + Character.toUpperCase(name.charAt(0)) + name.substring(1); return name; } @Override public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable { Object returnValue = null; final Object delegate = this.delegate; final Method delegateMethod; //如果攔截到的被調用方法是異步方法,則將其轉發到相應的doXXX方法 if (Future.class.isAssignableFrom(method.getReturnType())) { delegateMethod = delegate.getClass().getMethod( makeDelegateMethodName(method, args), method.getParameterTypes()); final ExecutorService scheduler = this.scheduler; Callable<Object> methodRequest = new Callable<Object>() { @Override public Object call() throws Exception { Object rv = null; try { rv = delegateMethod.invoke(delegate, args); } catch (IllegalArgumentException e) { throw new Exception(e); } catch (IllegalAccessException e) { throw new Exception(e); } catch (InvocationTargetException e) { throw new Exception(e); } return rv; } }; Future<Object> future = scheduler.submit(methodRequest); returnValue = future; } else { //若攔截到的方法調用不是異步方法,則直接轉發 delegateMethod = delegate.getClass() .getMethod(method.getName(),method.getParameterTypes()); returnValue = delegateMethod.invoke(delegate, args); } return returnValue; } } /** * 生成一個實現指定接口的Active Object proxy實例。 * 對interf所定義的異步方法的調用會被裝發到servant的相應doXXX方法。 * @param interf 要實現的Active Object接口 * @param servant Active Object的Servant參與者實例 * @param scheduler Active Object的Scheduler參與者實例 * @return Active Object的Proxy參與者實例 */ public static <T> T newInstance(Class<T> interf, Object servant, ExecutorService scheduler) { @SuppressWarnings("unchecked") T f = (T) Proxy.newProxyInstance(interf.getClassLoader(), new Class[] { interf }, new DispatchInvocationHandler(servant, scheduler)); return f; } }
清單7的代碼實現了可復用的Active Object模式的Proxy參與者ActiveObjectProxy。ActiveObjectProxy通過使用Java動態代理,動態生成指定接口的代理對象。對該代理對象的異步方法(即返回值類型為java.util.concurrent.Future的方法)的調用會被ActiveObjectProxy實現InvocationHandler(DispatchInvocationHandler)所攔截,并轉發給ActiveObjectProxy的newInstance方法中指定的Servant處理。
清單8所示的代碼展示了通過使用ActiveObjectProxy快速Active Object模式。
清單 8. 基于可復用的API快速實現Active Object模式
public static void main(String[] args) throws InterruptedException, ExecutionException { SampleActiveObject sao = ActiveObjectProxy.newInstance( SampleActiveObject.class, new SampleActiveObjectImpl(), Executors.newCachedThreadPool()); Future<String> ft = sao.process("Something", 1); Thread.sleep(500); System.out.println(ft.get());
從清單8的代碼可見,利用可復用的Active Object模式Proxy實現,應用開發人員只要指定Active Object模式對外保留的接口(對應ActiveObjectProxy.newInstance方法的第1個參數),并提供一個該接口的實現類(對應ActiveObjectProxy.newInstance方法的第2個參數),再指定一個java.util.concurrent.ExecutorService實例(對應ActiveObjectProxy.newInstance方法的第3個參數)即可以實現Active Object模式。
總結
本文介紹了Active Object模式的意圖及架構。并提供了一個實際的案例用于展示使用Java代碼實現Active Object模式,在此基礎上對該模式進行了評價并分享了在實際運用該模式時需要注意的事項。