在 PHP 中使用 Promise + co/yield 協程
摘要: 我們知道 JavaScript 自從有了 Generator 之后,就有了各種基于 Generator 封裝的協程。其中 hprose 中封裝的 Promise 和協程庫實現了跟 ES2016 的 async/await 一樣的功能,并且更加靈活。我們還知道 PHP 自從 5.5 之后,也引入了 Generator,同樣也有了各種基于它封裝的 PHP 協程庫,hprose 同樣也為 PHP 提供的跟 JavaScript 版本類似的 Promise 和協程庫。下面我們就來看一下它跟 swoole 結合的效果。
為什么需要異步方式
一個函數執行之后,在它后面順序編寫的代碼中,如果能夠直接使用它的返回結果或者它修改之后的引用參數,那么我們通常認為該函數是同步的。
而如果一個函數的執行結果或者其修改的引用參數,需要通過設置回調函數或者回調事件的方式來獲取,而在其后順序編寫的代碼中無法直接獲取的話,那么我們通常認為這樣的函數是異步的。
PHP 提供的大部分函數都是同步的。通常我們會有一個誤解,那就是容易把同步和阻塞當成同一個概念,但實際上同步代碼不一定都是阻塞的,只是同步代碼對阻塞天然友好,當同步代碼和阻塞結合時,代碼通常是簡單易懂的。
阻塞帶來的問題是當前線程(或進程)會陷入等待,一直等到阻塞結束,這樣就會造成線程(或進程)資源的浪費。所以,通常認為阻塞是不夠高效的。
但是如果要編寫非阻塞代碼,使用同步方式會變得有些復雜,且不夠靈活。同步方式的非阻塞代碼通常會使用 select 模式,例如 curl_multi_select , stream_select , socket_select 等就是 PHP 中提供的一些典型的 select 模式的函數。
我們說它復雜且不夠靈活是有理由的,例如使用上面的 select 模式編寫同步的非阻塞代碼時,我們需要先構造一個并發任務的列表,之后手動構造循環來執行這些并發的任務,在循環開始之后,雖然這幾個任務可以并發,但是這個循環相對于其后的代碼總體上仍然是阻塞的,我們要想拿到這些并發任務的結果時,仍然需要等待。 select 雖然可以同時等待多個任務中某一個或幾個就位后,再執行后續操作,但仍然有一部分時間是被等待消耗掉的。而且如果是純同步非阻塞的情況下,我們也很難在循環開始后,動態添加更多的任務到這個循環中去。
所以,如果我們希望程序能夠更加高效,更加靈活,就需要引入異步方式。
傳統的異步方式有什么問題
一提到異步模式,大家腦子中的第一印象可能就是 回調、回調、回調 。是的,這是最簡單最直接也是之前最常見的異步模式。只要在調用異步函數時設置一個或多個回調函數,函數就會在完成時自動調用回調函數。或者為一個對象設置一堆事件,之后調用該對象上的某個異步方法,雖然這個異步方法本身可能不再需要設置回調函數,但是設置的這堆事件實際上跟回調函數所起到的作用是一樣的。
如果你的程序邏輯夠簡單,簡單的一兩層回調也許并不會讓你覺得異步方式的編程有什么麻煩。但如果你的程序邏輯一旦有些復雜,你可能就會被層層回調搞得疲憊不堪了。當然,實際上你的程序需要層層回調的原因,也許并不是你的程序邏輯真的復雜,而是你沒有辦法將回調函數中的參數結果傳出來,所以,你就不得不將另一個回調函數傳進去。
我們來舉一個簡單的例子,假設我們有 1 個同步函數:
function sum($a, $b) {
return $a + $b;
}
然后我們按照下面的方式去調用它:
$a = sum(1, 2);
$b = sum($a, 3);
$c = sum($b, 4);
var_dump(array($a, $b, $c));
雖然上面的代碼很不精簡,但我們要表達的意圖很明確,而且代碼看起來很清楚。
那接下來我們把這個函數換成一個形式上的異步函數,例如:
function async_sum($a, $b, $callback) {
$callback($a + $b);
}
當然,它的執行并不是異步的,這里我們先不關心它的實現是不是真異步的。
現在如果要做上面同樣的操作,代碼就要這樣寫了:
async_sum(1, 2, function($a) {
async_sum($a, 3, function($b) use ($a) {
async_sum($b, 4, function($c) use ($a, $b) {
var_dump(array($a, $b, $c));
});
});
});
代碼的執行結果是一樣的。但異步的代碼看起來顯然更難讀一些,雖然這已經是很簡單的例子了。
好了,看到這里,有些讀者可能會覺的我上面的這個例子很糟糕。因為明明有同步的函數可以使用,并且代碼清晰可讀,為啥非要寫個形似異步的函數,把本來同步可以做的很好的事情用異步方式復雜化呢?而且那個異步調用的方式,最后不還是想要實現同步化的結果嗎?
如果你這么想的話,一點都沒錯。但我們這里想要解決的問題是,如果我們拿到的只有一個異步函數,這個函數沒有同步實現,我們也不知道這個異步函數的內部定義是怎樣的,我們也沒辦法將這個異步函數改為同步函數實現。那我們有沒有辦法將上面的程序改的更可讀一些呢?
當然是可以的,所以,現在 Promise 要登場了。
為什么要引入 Promise
通常我們對 Promise 的一個誤解就是,它要解決的是層層回調的問題,比如上面的問題看上去就是一個典型的層層回調的問題。
然而實際上,Promise 要解決的并不是回調不回調的問題,如果你使用過 Promise 的話,你會發現使用 Promise 你仍然少不了要使用回調。Promise 要解決的問題是,如何將回調方法的參數從回調方法中傳遞出來,讓它可以像同步函數的返回結果一樣,在回調函數以外的控制范圍內,可以傳遞和復用。
我覺得這幾篇文章講的比較透徹,所以我就不重復文章中的內容了。
下面我們來看上面的例子用 Promise 如何解。
我們現在用最簡單粗暴的方式來引入 Hprose 的庫,直接復制源碼而不是使用 composer。然后我們在代碼中直接使用:
<?php
require_once("Hprose.php");
use Hprose\Promise;
這種方式來引入 Hprose 的 Promise 庫,當然你也可以寫成:
<?php
require_once("Hprose.php");
use Hprose\Future;
Future 庫跟 Promise 庫基本上是一樣的,你可以認為 Future 是 Promise 的具體實現, Promise 只是 Future 實現的一個包裝。這個區別你可以從源碼中直接看出來,這里就不多做解釋了。
接下來,我們要把前面的 async_sum 函數 Promise 化,Hprose 提供了這樣一個函數: Promisepromisify (或者 Futurepromisify ),它的作用就是將一個使用回調方式的異步函數變成一個返回 Promise 對象的異步函數。這樣說,也許有些不好理解,下面直接上代碼:
<?php
require_once("Hprose.php");
use Hprose\Promise;
function async_sum($a, $b, $callback) {
$callback($a + $b);
}
$sum = Promise\promisify('async_sum');
$a = $sum(1, 2);
$b = $a->then(function($a) use ($sum) {
return $sum($a, 3);
});
$c = $b->then(function($b) use ($sum) {
return $sum($b, 4);
});
Promise\all(array($a, $b, $c))->then(function($result) {
var_dump($result);
});
好了,看到這里,如果你對 Promise 的理解還不夠深入的話,你的第一反應可能是:這不是把程序變得更復雜了嗎?原來的程序是 3 個回調,現在仍然是 3 個回調,還多了包裝,都玩出花來了,有意思嗎?
確實,從上面的代碼來看,代碼并沒有被簡化,但是你會發現,現在回調函數中的參數已經通過 Promise 返回值的方式傳遞出來了,而且可以在原本的回調函數控制范圍以外被傳遞和復用了。
但是你可能會說然并卵,程序不是仍然很復雜嗎?那我們就來進一步簡化一下:
<?php
require_once("Hprose.php");
use Hprose\Promise;
function async_sum($a, $b, $callback) {
$callback($a + $b);
}
$sum = Promise\wrap(Promise\promisify('async_sum'));
$var_dump = Promise\wrap('var_dump');
$a = $sum(1, 2);
$b = $sum($a, 3);
$c = $sum($b, 4);
$var_dump(Promise\all(array($a, $b, $c)));
現在,代碼中再也看不到回調了。因為我們把函數包裝成了可以接收 Promise 變量的函數。當然,其實現細節略微有些復雜,如果你感興趣,可以去看一下源碼,這里就不做源碼剖析了。如果感興趣的讀者多得話,以后有時間再寫源碼剖析。
當然,如果你只是想把異步調用同步化,除了 Promisewrap 外,你還可以通過 co/yield 協程來實現。
Hprose 中的 co/yield 協程
還是上面的例子,如果你使用的是 PHP 5.5 或者更高版本,那么你可以這樣來寫代碼了。
<?php
require_once("Hprose.php");
use Hprose\Promise;
function async_sum($a, $b, $callback) {
$callback($a + $b);
}
Promise\co(function() {
$sum = Promise\promisify('async_sum');
$a = (yield $sum(1, 2));
$b = (yield $sum($a, 3));
$c = (yield $sum($b, 4));
var_dump(array($a, $b, $c));
});
這代碼比使用 Promisewrap 的又要簡單了。這里,代碼中的變量 $a , $b , $c 不再是 Promise 變量,而是實實在在的整數變量。也就是說, yield 把一個 Promise 變量變成了一個普通變量。
現在 Promiseco 中的代碼已經被實實在在的同步化了。
現在你可能有新的疑問了,異步不是為了高效嗎?現在把原本的異步代碼同步化了,那還會高效嗎?
當然,對這個例子上來說,效率肯定是沒有提高,反而是嚴重降低的。甚至在這個例子中,最原始的那個形似異步的實現也不比同步實現更高效。因為在這個例子中,并沒有涉及到并發和 IO 阻塞的情況。
下面我們就放到真實場景下來看看 Promise 和 co/yield 協程是怎么用的。
在 swoole 下使用 Promise 和 co/yield 協程
我們知道在 PHP 中,如果要讓程序延時可以使用 sleep 函數(或者 usleep , time_nanosleep 函數)來讓程序阻塞一會兒,但是這個阻塞會讓整個進程都阻塞,所以在阻塞期間,什么都不能干。
下面我們來看看使用 swoole_timer_after 實現的延時執行:
<?php
require_once("Hprose.php");
use Hprose\Future;
date_default_timezone_set('UTC');
function wait($time) {
$wait = Future\promisify('swoole_timer_after');
for ($i = 0; $i < 5; $i++) {
yield $wait($time);
var_dump("wait ". ($time / 1000) . "s, now is " . date("H:i:s"));
}
}
Future\co(wait(2000));
Future\co(wait(1000));
該程序執行結果如下:
string(24) "wait 1s, now is 13:48:25"
string(24) "wait 2s, now is 13:48:26"
string(24) "wait 1s, now is 13:48:26"
string(24) "wait 1s, now is 13:48:27"
string(24) "wait 2s, now is 13:48:28"
string(24) "wait 1s, now is 13:48:28"
string(24) "wait 1s, now is 13:48:29"
string(24) "wait 2s, now is 13:48:30"
string(24) "wait 2s, now is 13:48:32"
string(24) "wait 2s, now is 13:48:34"
從結果中我們可以看出, wait(2000) 和 wait(1000) 各自都是順序阻塞執行的,但是它們之間卻是并發執行的。
也就是說,協程之間并不會相互阻塞,雖然這幾個并發的協程是在同一個進程內跑的。
最后我們再來看一個用 co/yield 協程實現的并發抓圖程序:
<?php
require_once("Hprose.php");
use Hprose\Promise;
function fetch($url) {
$dns_lookup = Promise\promisify('swoole_async_dns_lookup');
$writefile = Promise\promisify('swoole_async_writefile');
$url = parse_url($url);
list($host, $ip) = (yield $dns_lookup($url['host']));
$cli = new swoole_http_client($ip, isset($url['port']) ? $url['port'] : 80);
$cli->setHeaders([
'Host' => $host,
"User-Agent" => 'Chrome/49.0.2587.3',
]);
$get = Promise\promisify([$cli, 'get']);
yield $get($url['path']);
list($filename) = (yield $writefile(basename($url['path']), $cli->body));
echo "write $filename ok.\r\n";
$cli->close();
}
$urls = array(
'http://b.hiphotos.baidu.com/baike/c0%3Dbaike116%2C5%2C5%2C116%2C38/sign=5f4519ba037b020818c437b303b099b6/472309f790529822434d08dcdeca7bcb0a46d4b6.jpg',
'http://f.hiphotos.baidu.com/baike/c0%3Dbaike116%2C5%2C5%2C116%2C38/sign=1c37718b3cc79f3d9becec62dbc8a674/38dbb6fd5266d016dc2eaa5c902bd40735fa358a.jpg',
'http://h.hiphotos.baidu.com/baike/c0%3Dbaike116%2C5%2C5%2C116%2C38/sign=edd05c9c502c11dfcadcb771024e09b5/d6ca7bcb0a46f21f3100c52cf1246b600c33ae9d.jpg',
'http://a.hiphotos.baidu.com/baike/c0%3Dbaike92%2C5%2C5%2C92%2C30/sign=4693756e8094a4c21e2eef796f9d70b0/54fbb2fb43166d22df5181f5412309f79052d2a9.jpg',
'http://a.hiphotos.baidu.com/baike/c0%3Dbaike92%2C5%2C5%2C92%2C30/sign=9388507144a98226accc2375ebebd264/faf2b2119313b07eb2cc820c0bd7912397dd8c45.jpg',
);
foreach ($urls as $url) {
Promise\co(fetch($url));
}
在這個程序中, fetch 函數內的代碼是同步執行的,但是多個 fetch 之間卻是并發執行的,從結果輸出就可以看出來,輸出順序是不一定的。但最后,你總能得到所有的美圖。
總結:通過 swoole 跟 hprose 中的 Promise 和 co/yield 協程相結合,你可以方便的使用同步的方式來調用 swoole 中的異步函數和方法,并可以實現協程間的并發。
因為篇幅所限,這里無法把 hprose 中 Promise 和 co/yield 協程的全部內容都介紹完,如果你想了解更多,可以參考下面兩篇內容:
來自:https://segmentfault.com/a/1190000007760513