Java8 中的純異步編程
當系統越來越復雜之后,服務化的模塊的接口調用會越來越多,最終模塊之間的IO 成為影響整體系統性能的關鍵因素。傳統的阻塞IO + 線程池模型應對這種場景比較無力,只能依靠增加線程數量,但是服務器本身的線程數是有上線的。一個模塊接口性能的波動,啥有不慎就會造成調用者線程池被 IO打滿,壓垮整個服務。這時候純異步編程就有了用武之地,因為IO 不再占用線程來執行,僅需要開少量的線程用于CPU 密集的操作,模塊本身對服務接口超時的容忍程度也大大增加。
所謂純異步編程,就是中間完全沒有阻塞的操作,所有的IO 調用均是異步的。一般講到純異步編程,都會讓人望而生畏,其實純異步程序好不好寫,和語言的特性是很相關的。像Golang 這樣原生支持協程的語言基本上寫出的程序就是全異步的,使用者就像在寫同步程序一樣;而c#, python, 以及最新的nodejs 這樣的語言,雖然沒有原生協程支持,但是支持Generator,寫出的純異步程序也和同步程序看起來差不多。如果一個語言連Generator 也沒有,就只能使用回調的方法來寫異步程序,而回調本身是編寫難度很高、很容易出錯的方式,尤其是還要考慮異常處理、傳遞這些問題,所謂「回調地獄」即是如此。
為了降低寫回調式的純異步程序的難度,就有了一種可以稱之為「Managed callback」的編程模式,其代表有Google guava 的concurrent lib,推ter 使用scala 完成的Finagle 等。這種模式的特點是程序書寫的順序看起來和執行的順序一致,自動管理異常的捕獲傳遞,避免深層次的嵌套,函數式風格,偏向于使用粒度較小的函數組合來完成程序,使用Immutable的對象等。
Java 中原本的Future 類也是設計用來完成異步編程的,但是Future 本身的接口和功能比較有限,這才有了Guava 中的ListenableFuture 等各種增強的實現。Java8 提供了新的CompletableFuture,并且有了基本的對于函數編程的支持,已經很合適來進行Managed callback 模式的異步編程了。
首先我們基于Netty實現了一個異步的Http Server 和 Http Client,假設有一個Http client 基礎接口如下:
CompletableFuture<ImmutableResponse> request(ImmutableRequest request);
另外有一個Async 的Http Server,可以注冊如下接口的Hanlder:
/** * Async http process interface * * @author Dong Liu dongliu@wandoujia.com */ public interface AsyncHandler { /** * handle request. */ CompletableFuture<ImmutableResponse> handle(ImmutableRequest request); }
假設我們要寫一個純異步的程序,從 A 接口獲取數據,這個數據是一個整數,在這個接口上加 10 再通過Http 接口返回。
首先我們覺得request 方法太原始,封裝一個更簡單的方法:
public CompletableFuture<String> get(String url) { ImmutableRequest request = ImmutableRequest.newBuilder() .withMethod(HttpMethod.GET) .withUri(url) .build(); return request(request).thenApply(response -> new String(response.getBody())); }
這里演示了CompletableFuture 中thenApply 方法的用法,這個方法在CompletableFuture 中管理的回調完成之后進行調用,對結果進行處理。
現在來完成我們的Handler:
public class AddNumberHandler implements AsyncHandler { private HttpClient httpClient = ...; @Override public CompletableFuture<ImmutableResponse> handle(ImmutableRequest request) { CompletableFuture<String> resultFuture = httpClient.get("http://127.0.0.1/a"); return resultFuture.thenApply(result -> { int total = Integer.parseInt(result) + 10; return ImmutableResponse.newBuilder().withBody(String.valueOf(total)).build(); }); } }
讓情況變得更復雜一些。假設我們需要調用A, B, C 三個接口,并把三個接口所返回的數字相加。因為A, B, C三個接口調用是獨立的,所以決定并行的來請求三個接口,以提升效率:
public class AddNumberHandler implements AsyncHandler { private HttpClient httpClient = ...; @Override public CompletableFuture<ImmutableResponse> handle(ImmutableRequest request) { CompletableFuture<String> aFuture = httpClient.get("http://127.0.0.1/a"); CompletableFuture<String> bFuture = httpClient.get("http://127.0.0.1/b"); CompletableFuture<String> cFuture = httpClient.get("http://127.0.0.1/b"); return FutureUtils.combine(aFuture, bFuture, cFuture).thenApply(abc -> { int total = parseInt(abc._1()) + parseInt(abc._2()) + parseInt(abc._3()); return ImmutableResponse.newBuilder().withBody(String.valueOf(total)).build(); }); } }
FutureUtils.combine 方法是一個簡單的封裝,封裝了CompletableFuture 的 Combine 方法以方便使用。可以看到,使用CompletableFuture 來寫這種并行多個請求的異步程序是很容易的事情。
這中間我們都沒有特別的對異常進行處理,如果直接使用callback 的話,這是肯定不可行的,必須手動捕獲所有異常,否則這個callback 就永遠不會返回了。但是現在,CompletableFuture管理了我們提供的回調函數, 會幫我們捕獲異常并進行管理。這個異常在調用thenApply 或者combine 方法的時候是自動傳遞的,如果第一步就失敗了,后面注冊的回調就不會被執行。要對異常進行處理的話,可以使用exceptionally 方法:
public class AddNumberHandler implements AsyncHandler { private HttpClient httpClient = ...; private Logger logger = ...; @Override public CompletableFuture<ImmutableResponse> handle(ImmutableRequest request) { CompletableFuture<String> aFuture = httpClient.get("http://127.0.0.1/a"); CompletableFuture<String> bFuture = httpClient.get("http://127.0.0.1/b"); CompletableFuture<String> cFuture = httpClient.get("http://127.0.0.1/b"); aFuture.isCompletedExceptionally(); CompletableFuture<Integer> totalFuture = FutureUtils.combine(aFuture, bFuture, cFuture) .thenApply(abc -> parseInt(abc._1()) + parseInt(abc._2()) + parseInt(abc._3())) .exceptionally(t -> { logger.error("", t); return -1; }); return totalFuture.thenApply(total -> { return ImmutableResponse.newBuilder().withBody(String.valueOf(total)).build(); }); } }
這里舉得例子都比較簡單,想更多的了解可以參見CompletableFuture 的文檔。