淺入淺出,談談服務化體系中的異步(上)

niuwenyong 8年前發布 | 10K 次閱讀 服務化 異步

一個懂Akka、RxJava,看得懂《七周七并發》的人,和普通程序員完全是兩個世界的人。
那作為一個羞澀的普通程序員,怎么在自己的服務化體系里,滿足自己的異步化需求呢 ?我的思路是這樣的:

1. 先認清自己的需求,不要一開始就站到某個技術上,再倒敘自己的需求。

2. 然后從淺入深,分析可以用什么樣的技術滿足該層次的需求,依然避免一下就沖到某個技術上,比如Akka入門到放棄。

3. 最后找出一條相對平穩的技術路線,能夠貫穿各個層次的需求,而且最好不那么考驗使用者的智商。

 

1. 需求篇

1.1 通過并行的遠程調用,縮短總的響應延時。

服務處理的過程中,總會包含如下幾類的遠程調用:

服務層:RESTful,RPC框架
數據層:JDBC,Memcached,Redis,ZooKeeper...
消息層:Kafka,RabbitMQ...

有些客戶端已經支持異步,比如大部分的RPC框架、RESTful Client、SpyMemcached、ZooKeeper、Kafka 等。 在異步接口之中,又分返回Future 與 設置用戶自定義的CallBack類兩種風格。

但有些客戶端還是同步的,比如JDBC,Jedis,在討論方案時需要把它們也考慮進去。

并行執行的機會有兩種:
一是服務提供者沒有提供批量接口,比如商品查詢,只能以不同的ID分次調用(題外話,始終服務方提供批量接口會更好些)。

一是兩個調用之間沒有依賴關系,一個調用的參數并不依賴另一個調用的返回結果,比如查詢商品信息與查詢庫存。

并行調用又分兩個層次:
一是簡單并行,把所有可并行的服務一下子都撒出去,然后等待所有的異步調用返回,簡單的Future.get()就夠用。

還有一種特例的場景是,先調一個異步接口,然后做一些同步的事情,再回頭Future.get()拿之前異步調用的結果,這也達到了節省總響應時間的效果。

二是調用編排,比如一開始并行A、B、C、D服務,一旦A與B返回則根據結果調用E,一旦C返回則調用F,最后組裝D、E、F的結果返回給客戶端。

如果還是簡單的并行,沒辦法做到最高效的調度,必須有一種機制,定義這種多條異步調用鏈分頭并進的場景。此時就需要或Akka,或RxJava,或JDK8的CompletableFuture與Guava的ListenenableFuture,或Spotify的Trickle,基于Actor/Callback機制來編排了。

 

1.2 希望能用少量的固定線程,處理海量的并發請求。

這個概念并不陌生,NIO就是這個思路。

但是即使你用了Netty的NIO 或 Servlet3.0的異步Servlet 或,也只是解決了傳輸層面用少量傳輸線程處理海量并發的傳輸,并在入口層的編程模式上提供了異步化的可能性。

如果你的服務線程里存在阻塞的遠程調用,那線程還是會等待在遠程調用上而無法處理海量請求,即使異步化如Future.get(),也依然是等待。

所以,你必須有辦法在阻塞等待時把線程給交回去,比如服務線程里采用全異步化的Callback模式,或引入Akka的Actor模式,或基于Quasar引入纖程協程的概念。

 

1.3 小結

在我看來 , 客戶端簡單并行->客戶端并行編排->服務端少量線程處理海量請求。對于大部分普通項目,是由淺入深三個層次的需求。
 

2.第一層,簡單并行實現篇

2.1 最簡單寫法

最簡單就是并發的調用一堆返回Future的異步接口,再一個個Get回來,最慢的那個會阻塞住其他:

 

Future<Product> productFuture = productService.query(id);
Future<Long> stockFuture = stockService.query(id);
.......
Product product = productFuture.get();
Long stock = stockFuture.get();

HttpClient有一個HttpAsyncClient 的子項目提供Http的異步訪問:

 

HttpGet request1 = new HttpGet("http://www.apache.org/");
Future<HttpResponse> future = httpclient.execute(request1, null);
HttpResponse response1 = future.get();

Spring的RestTemplate同樣有AsyncRestTemplate:

 

Future<ResponseEntity<String>> futureEntity = template.getForEntity("http://www.apache.org/" , String.class);
ResponseEntity<String> entity = futureEntity.get();

其他類似的不一一列舉。

 

2.2. 異步接口只有Callback形式時

但如果異步接口只能設置自定義Callback函數,不返回Future呢? 比如Thrift就是這樣。

Callback函數在這種并行調用場景里并不好用,因此建議還是要轉換回Future接口使用。

轉換的方法很簡單,自己實現一個默認的Callback類,里面包含一個Future,然后實現Callback接口所定義的onSucess()和onFail()函數,將結果或異常賦值到Future中。

 

DefaultFutureCallback<Long> callback=new DefaultFutureCallback<Long>();
myService.getPrice(1L, callback);
String result = callback.getFuture().get();

至于Future類的實現,隨便抄個HttpClient的BasicFuture就好了。

題外話,Future.get() 接口只聲明了ExecutionException一種異常,如果你原來的callback函數里有其他不是Runtime Exception的,就要裹在ExecutionException里了,用戶還要自己getCause()把它找出來,唯一不方便的地方。

 

2.3. 對付同步接口

同步接口如JDBC與Jedis,只能在調用它們的地方實現Callable接口,異步的跑在另一個線程池里來完成并行調度。但這其實引入了線程調度的消耗,不得而為之,不可濫用。

 

Future<Product> future = executor.submit(new Callable<Product>(){
  @Override
  public Product call() throws Exception {
    return productDao.query(id);
  }
});

題外話,executor中返回的Future的實現類叫FutureTask,它能以“Callable 或 Runnable+預設結果(因為Runnalbe自身沒結果)“作為參數構建。executor.submit() 接受“Callable 或 Runnable+結果” 做參數,內部構建FutureTask。但FutureTask本身又是個Runnable,網上有些例子讓大家自己把Callable構造成FutureTask,再以Runnable的身份傳給executor,其實不好。

如果是JDK8,用Lambda就可以寫得短一些,只要一行,和平時同步寫法也差不多了。

 

Future<Product> future = executor.submit(()->productDao.query(id));

最后,同步改異步后,原來存在ThreadLocal中的東西如TraceId就沒有了,要用一個Context之類的在進出的時候復制。
 

來自:花錢的年華

 本文由用戶 niuwenyong 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!