記一次重構:并行化調用接口實踐

jopen 9年前發布 | 11K 次閱讀 重構

原文  https://zhujun1980.github.io/2014/06/curl-multi-mode/

優化目標

在我現在所在的產品線中 http 接口被大量使用,用來獲取各種開放數據,可以說 http 調用在代碼中隨處可見。比如一個訪問最頻繁的頁面,一次請求將會產生 7~8 次 http 調用。雖然每個接口都非常的快,但 8 次累加起來的消耗還是相當的可觀,所以我最近的優化工作主要是:


并行調用各 http 請求,以縮短腳本的運行時間。

重構起因

實際上,將請求并行化并不十分困難,使用 curl 提供的 multi* 方法族就可以實現,網上有很多類似的文章來介紹其使用方法,大致的思想是把多個 curl 句柄放入一個 curl multi_handler 中,以 nonblocking 的方式執行,然后使用內核提供的 IO 復用機制(select/epoll等)進行事件查詢,當有 response 返回時處理結果。很明顯這是一個異步的過程,而在我們現在用的 php 框架中對于 http 調用則是同步的,建立連接、發送請求、接受響應和處理結果都是串行完成的,這些操作都被封裝到一系列類中,使得上層只用一行代碼就可以完成 API 的調用并獲得 返回結果 ,例如:

$userInfo = $this->apiProxy->general->getUserInfo($uid); //調用 api
echo $userInfo['rst']['username']; //使用結果

上面的返回值userInfo是個 數組 ,包含了調用方感興趣的所有數據。雖然只有 2 行,但是框架默默幫你做了大量的工作,其中包括:建立連接、發送請求、接受結果,檢查狀態 、處理錯誤、格式化輸出、寫緩存等等,而且具體到每個 api 對于返回值的處理還有不同的邏輯。如何用一種優雅的方式對現有框架進行重構,既能符合要求,又能保證改動量最小,成為了現在最重要的問題。

Lazy evaluation

當我開始接手這項工作的時候,腦海中想到的第一個對應思想就是 Lazy evaluation緩式求值 ),維基百科上對于 緩式求值 的定義是:


In programming language theory, lazy evaluation, or call-by-need[1] is an evaluation strategy which delays the evaluation of an expression until its value is needed (non-strict evaluation) and which also avoids repeated evaluations (sharing).

Lazy evaluation 是編程語言設計領域中的一個 表達式求值 策略,它延緩對表達式的求值直到你需要它的時候。看上去 lazy evaluation 好像和我們的問題挨不上邊,而且 php 也不支持 lazy evaluation,不過仔細想一下,如果我們能把對 http 請求的 后續操作 延緩到對 返回結果的使用 時,就可以用一種 優雅 的實現來使框架支持并行執行,而且對于 controller 層的改動也非常的小。

具體點說就是在進行 api 調用的時候,不再返回結果數組,而是返回一個 句柄 ,這個句柄標識了一個被提交到后臺的請求,它被加入到 curl multi_handler 中,你不再關心它,由 curl 替你完成,你的代碼可以繼續往下執行,去完成其他的業務邏輯。而當我們需要這個結果時,檢查這個句柄是否已經完成,如果已經完成則執行上面 接受結果 之后的所有操作,返回結果。那么上面的代碼重構后變成:

$userInfo = $this->apiProxy->async_general->getUserInfo($uid); //使用 curl 異步調用
//
//執行其他的業務邏輯......
//
echo $userInfo['rst']['username']; //檢查句柄是否已經完成,返回結果

async_ 前綴表示使用異步來調用 api 。上面代碼中,在調用 api 和使用結果之間的時間都留給 curl 去連接服務器、發送請求、獲取結果到 socket 輸入buffer等等,就可以達到并行操作節省時間的效果。

ArrayObject

顯然接口調用的返回值必須是個 object 而不能再是個 array ,因為數組的可操作性有限,不能執行邏輯, object 則提供了更大的靈活性,但在原有代碼中 array 已經被大量應用,把它們逐個改為 object 是很不現實的,那么 object 是否可以像數組一樣被使用呢?經過一番搜索,我發現 php 里還真有這樣的東西,它就是 SPL(Standard PHP Library) 提供的 ArrayObject 類,這個類的介紹簡單明了:


This class allows objects to work as arrays.

正好是我們需要的。

ArrayObject 主要通過下面 4 個方法提供對數組的支持(實際上它是通過實現 ArrayAccess 接口來實現的):

public bool offsetExists ( mixed $index )
public mixed offsetGet ( mixed $index )
public void offsetSet ( mixed $index , mixed $newval )
public void offsetUnset ( mixed $index )

有了這個類的幫助,我們的方案就明確了,思路就是:返回的 object 繼承于 ArrayObject ,并覆蓋這 4 個方法,在覆蓋的方法內檢查 http 請求是否完成并獲取結果;而 controller 層對于結果的使用幾乎不用改變,仍然按照數組方式使用。

我們把返回的句柄類命名為 AsyncHandler ,它的定義為:

class AsyncHandler extends ArrayObject 

事件回調

到目前為止,一切都非常的順利,但是還有一個重要的問題沒有解決,那就是對 http response 的處理,就像前面所說的,原有的串行方法,直接返回處理過的結果,而現在只返回一個對象,結果還不知道什么時候能取到呢,這些處理代碼顯然應該等到 http response 確定返回的時候才能執行,這時就要使用回調來實現了,通過對代碼的分析,發現有主要 3 處對結果進行處理的代碼,一處在 http response 返回時,此處做了 http 狀態值的檢查、日志記錄等基本操作,這部分是公共的代碼;另外一處在接口自身的函數內,做了接口特有的處理,這部分是每個接口一份;最后一處是在把結果返回給調用方之前,對結果做格式化,保存緩存等,這部分也是公共的代碼。通過總結出類型,我們對如何修改就胸有成竹了:

首先,在 AsyncHandler 的中定義 3 個回調函數:

//Callback functions
protected $onRecvResponse = NULL;
protected $recvCtx = NULL;

protected $onBusiness = NULL;
protected $busCtx = NULL;

protected $onAPIReturn = NULL;
protected $apiCtx = NULL;

onXXX是回調函數,xxxCtx 是回調函數的上下文信息,它是個數組,保存了回調函數執行過程中需要的變量。

在上述 3 處對 http response 進行處理的地方,把原有的代碼封裝成一個匿名函數,在異步 curl 模式下,把這個匿名函數和相關上下文傳入到 AsyncHandler 中;如果是同步curl模式,就直接執行這個匿名函數(和原來一樣)。下面以一處代碼為例:

$c['var'] = $varname; //相關上下文,就是回調函數里面用到的一些變量
$onApiReturn = function($response, $c) {
  //
  //response 就是 api 返回的結果字符串
  //對結果進行處理,就不詳細列出了
  //.......
  //
  return $response; //這里把結果返回
};
//this->response 異步模式下就是 AsyncHandler ,同步模式下就是結果數組。
if(ApiProxy::$async && is_object($this->response)) {
  $this->response->setOnAPIReturn($onApiReturn, $c);
}
else {
  $this->response = $onApiReturn($this->response, $c);//如果是同步,直接執行處理函數
}

在第一處處理的位置,創建 AsyncHandler 對象:

$c['var'] = $varname; //相關上下文,就是回調函數里面用到的一些變量
if(ApiProxy::$async) {
  $mhttp = new curl_http();     //curl_http 類是對 curl 基本功能的封裝。
  $mhttp->setConfig($config);
  $mhttp->setUrl($url);
  $mhttp->setData($queryData);
  $mhttp->prepareRequest($reqMethod, false);

  $asynchandler = new AsyncHandler($mhttp);
  $asynchandler->setOnRecvResponse($onRecvResponse, $c);
  async_http::in()->add($asynchandler);   //async_http 是處理 curl multi 相關操作的單例
  return $asynchandler;
}
return $onRecvResponse(self::$http, $result, $c); //如果是同步,直接執行處理函數

mhttp 是對 curl 功能的封裝,它保存 http 鏈接的基本信息,創建完后它被加到了 AsyncHandler 的實例中,以備后續使用,而 AsyncHandler 的實例被加到了 async_http 中,后者會把 AsyncHandler 的 curl 對象加入到 curl_multi_add_handle 方法中。

獲取結果

最后,我們看一下獲取結果的流程,先看一下 AsyncHandler 的 offsetGet 方法:

public function offsetGet($index) {
  if(!$this->finished) async_http::in()->wait((string)$this);
  return parent::offsetGet($index);
}

首先判斷是否已經拿到結果了,如果已經完成了,則調用父類的方法返回結果中對應的 index;如果沒有則調用 async_http 的 wait 方法取結果:

public function wait($ah = false) {
  $fin = false;
  do {
    do {
      $mrc = curl_multi_exec($this->mh, $this->active);
    } while($mrc == CURLM_CALL_MULTI_PERFORM);

    while ($done = curl_multi_info_read($this->mh)) {
      $asynchandler = $this->map[(string)$done['handle']];

      $asynchandler->RequestCompeleted($done['result']);

      if((string)$asynchandler === $ah) {
        $fin = true;
      }
      curl_multi_remove_handle($this->mh, $done['handle']);
    }
    if($fin)
      break;    //讀完了當前能夠讀取的所有數據,包括想要的數據

    if($this->active > 0) {
      curl_multi_select($this->mh, 0.1);
    }
  } while($this->active > 0);
}

wait 方法不斷的詢問 curl :請求是否已經完成?當有結果返回時,調用對應 AsyncHandler 的 RequestCompeleted 方法:

public function RequestCompeleted() {
  $this->_http->processRequest();
  $data = curl_multi_getcontent($this->getHttp());

  if(is_callable($this->onRecvResponse)) {
    $data = call_user_func($this->onRecvResponse, $this->_http, $data, $this->recvCtx);
  }
  if(is_callable($this->onBusiness)) {
    $data = call_user_func($this->onBusiness, $data, $this->busCtx);
  }
  if(is_callable($this->onAPIReturn)) {
    $data = call_user_func($this->onAPIReturn, $data, $this->apiCtx);
  }
  $this->exchangeArray($data);
  $this->finished = true;
}

RequestCompeleted 方法獲取返回結果,調用回調函數,保存結果,并置標志位。

到此我們就實現了幾乎所有的異步模式功能,而且對于原來的同步模式代碼改動很小,并復用了原來的大部分代碼,沒有冗余。代碼量不大,除了新增的 2 個文件,修改部分主要是把原有的代碼塊用匿名函數包起來,所以也比較好測試,bug 也不多。

使用提示

對于希望改成并行調用的多個 api ,調用方只要給它們加上 async_ 前綴,并把它們放在一起,就可以并行調用了。最后還有一點,AsyncHandler 只是個 object,雖然可以作為數組一樣使用,但是對于is_array還是會返回 false 的。

總結

本文提供了一種在已有框架下增加異步模式的設計方法,對于想要引入并行調用的朋友,希望這種方法能帶來一些幫助。

代碼

AsyncHandler.php 和 async_http.class.php 的 代碼 ,curl_http 類就是對 curl 功能的封裝,就不再上傳了,其他的代碼因為和業務參考意義不大,而且也不方便上傳。

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!