Java8 中的純異步編程

jopen 9年前發布 | 66K 次閱讀 Java8 Java開發

當系統越來越復雜之后,服務化的模塊的接口調用會越來越多,最終模塊之間的IO 成為影響整體系統性能的關鍵因素。傳統的阻塞IO + 線程池模型應對這種場景比較無力,只能依靠增加線程數量,但是服務器本身的線程數是有上線的。一個模塊接口性能的波動,啥有不慎就會造成調用者線程池被 IO打滿,壓垮整個服務。這時候純異步編程就有了用武之地,因為IO 不再占用線程來執行,僅需要開少量的線程用于CPU 密集的操作,模塊本身對服務接口超時的容忍程度也大大增加。

所謂純異步編程,就是中間完全沒有阻塞的操作,所有的IO 調用均是異步的。一般講到純異步編程,都會讓人望而生畏,其實純異步程序好不好寫,和語言的特性是很相關的。像Golang 這樣原生支持協程的語言基本上寫出的程序就是全異步的,使用者就像在寫同步程序一樣;而c#, python, 以及最新的nodejs 這樣的語言,雖然沒有原生協程支持,但是支持Generator,寫出的純異步程序也和同步程序看起來差不多。如果一個語言連Generator 也沒有,就只能使用回調的方法來寫異步程序,而回調本身是編寫難度很高、很容易出錯的方式,尤其是還要考慮異常處理、傳遞這些問題,所謂「回調地獄」即是如此。

為了降低寫回調式的純異步程序的難度,就有了一種可以稱之為「Managed callback」的編程模式,其代表有Google guava 的concurrent lib,推ter 使用scala 完成的Finagle 等。這種模式的特點是程序書寫的順序看起來和執行的順序一致,自動管理異常的捕獲傳遞,避免深層次的嵌套,函數式風格,偏向于使用粒度較小的函數組合來完成程序,使用Immutable的對象等。

Java 中原本的Future 類也是設計用來完成異步編程的,但是Future 本身的接口和功能比較有限,這才有了Guava 中的ListenableFuture 等各種增強的實現。Java8 提供了新的CompletableFuture,并且有了基本的對于函數編程的支持,已經很合適來進行Managed callback 模式的異步編程了。

首先我們基于Netty實現了一個異步的Http Server 和 Http Client,假設有一個Http client 基礎接口如下:

CompletableFuture<ImmutableResponse> request(ImmutableRequest request);

 另外有一個Async 的Http Server,可以注冊如下接口的Hanlder:

/**
 * Async http process interface
 *
 * @author Dong Liu dongliu@wandoujia.com
 */
public interface AsyncHandler {

    /**
     * handle request.
     */
    CompletableFuture<ImmutableResponse> handle(ImmutableRequest request);
}

假設我們要寫一個純異步的程序,從 A 接口獲取數據,這個數據是一個整數,在這個接口上加 10 再通過Http 接口返回。

首先我們覺得request 方法太原始,封裝一個更簡單的方法:

public CompletableFuture<String> get(String url) {
    ImmutableRequest request = ImmutableRequest.newBuilder()
            .withMethod(HttpMethod.GET)
            .withUri(url)
            .build();
    return request(request).thenApply(response -> new String(response.getBody()));
}

這里演示了CompletableFuture 中thenApply 方法的用法,這個方法在CompletableFuture 中管理的回調完成之后進行調用,對結果進行處理。

現在來完成我們的Handler:

public class AddNumberHandler implements AsyncHandler {

    private HttpClient httpClient = ...;

    @Override
    public CompletableFuture<ImmutableResponse> handle(ImmutableRequest request) {
        CompletableFuture<String> resultFuture = httpClient.get("http://127.0.0.1/a");
        return resultFuture.thenApply(result -> {
            int total = Integer.parseInt(result) + 10;
            return ImmutableResponse.newBuilder().withBody(String.valueOf(total)).build();
        });
    }
}

讓情況變得更復雜一些。假設我們需要調用A, B, C 三個接口,并把三個接口所返回的數字相加。因為A, B, C三個接口調用是獨立的,所以決定并行的來請求三個接口,以提升效率:

public class AddNumberHandler implements AsyncHandler {

    private HttpClient httpClient = ...;

    @Override
    public CompletableFuture<ImmutableResponse> handle(ImmutableRequest request) {
        CompletableFuture<String> aFuture = httpClient.get("http://127.0.0.1/a");
        CompletableFuture<String> bFuture = httpClient.get("http://127.0.0.1/b");
        CompletableFuture<String> cFuture = httpClient.get("http://127.0.0.1/b");

        return FutureUtils.combine(aFuture, bFuture, cFuture).thenApply(abc -> {
            int total = parseInt(abc._1()) + parseInt(abc._2()) + parseInt(abc._3());
            return ImmutableResponse.newBuilder().withBody(String.valueOf(total)).build();
        });
    }
}

FutureUtils.combine 方法是一個簡單的封裝,封裝了CompletableFuture 的 Combine 方法以方便使用。可以看到,使用CompletableFuture 來寫這種并行多個請求的異步程序是很容易的事情。

這中間我們都沒有特別的對異常進行處理,如果直接使用callback 的話,這是肯定不可行的,必須手動捕獲所有異常,否則這個callback 就永遠不會返回了。但是現在,CompletableFuture管理了我們提供的回調函數, 會幫我們捕獲異常并進行管理。這個異常在調用thenApply 或者combine 方法的時候是自動傳遞的,如果第一步就失敗了,后面注冊的回調就不會被執行。要對異常進行處理的話,可以使用exceptionally 方法:

public class AddNumberHandler implements AsyncHandler {

    private HttpClient httpClient = ...;
    private Logger logger = ...;

    @Override
    public CompletableFuture<ImmutableResponse> handle(ImmutableRequest request) {
        CompletableFuture<String> aFuture = httpClient.get("http://127.0.0.1/a");
        CompletableFuture<String> bFuture = httpClient.get("http://127.0.0.1/b");
        CompletableFuture<String> cFuture = httpClient.get("http://127.0.0.1/b");

        aFuture.isCompletedExceptionally();

        CompletableFuture<Integer> totalFuture = FutureUtils.combine(aFuture, bFuture, cFuture)
                .thenApply(abc -> parseInt(abc._1()) + parseInt(abc._2()) + parseInt(abc._3()))
                .exceptionally(t -> {
                    logger.error("", t);
                    return -1;
                });

        return totalFuture.thenApply(total -> {
            return ImmutableResponse.newBuilder().withBody(String.valueOf(total)).build();
        });
    }
}

 這里舉得例子都比較簡單,想更多的了解可以參見CompletableFuture 的文檔。

原文鏈接: http://www.dongliu.net/post/622452

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!