在 PHP 中使用 Promise + co/yield 協程

JanSpringfi 8年前發布 | 41K 次閱讀 PHP PHP開發 JavaScript

摘要: 我們知道 JavaScript 自從有了 Generator 之后,就有了各種基于 Generator 封裝的協程。其中 hprose 中封裝的 Promise 和協程庫實現了跟 ES2016 的 async/await 一樣的功能,并且更加靈活。我們還知道 PHP 自從 5.5 之后,也引入了 Generator,同樣也有了各種基于它封裝的 PHP 協程庫,hprose 同樣也為 PHP 提供的跟 JavaScript 版本類似的 Promise 和協程庫。下面我們就來看一下它跟 swoole 結合的效果。

為什么需要異步方式

一個函數執行之后,在它后面順序編寫的代碼中,如果能夠直接使用它的返回結果或者它修改之后的引用參數,那么我們通常認為該函數是同步的。

而如果一個函數的執行結果或者其修改的引用參數,需要通過設置回調函數或者回調事件的方式來獲取,而在其后順序編寫的代碼中無法直接獲取的話,那么我們通常認為這樣的函數是異步的。

PHP 提供的大部分函數都是同步的。通常我們會有一個誤解,那就是容易把同步和阻塞當成同一個概念,但實際上同步代碼不一定都是阻塞的,只是同步代碼對阻塞天然友好,當同步代碼和阻塞結合時,代碼通常是簡單易懂的。

阻塞帶來的問題是當前線程(或進程)會陷入等待,一直等到阻塞結束,這樣就會造成線程(或進程)資源的浪費。所以,通常認為阻塞是不夠高效的。

但是如果要編寫非阻塞代碼,使用同步方式會變得有些復雜,且不夠靈活。同步方式的非阻塞代碼通常會使用 select 模式,例如 curl_multi_select , stream_select , socket_select 等就是 PHP 中提供的一些典型的 select 模式的函數。

我們說它復雜且不夠靈活是有理由的,例如使用上面的 select 模式編寫同步的非阻塞代碼時,我們需要先構造一個并發任務的列表,之后手動構造循環來執行這些并發的任務,在循環開始之后,雖然這幾個任務可以并發,但是這個循環相對于其后的代碼總體上仍然是阻塞的,我們要想拿到這些并發任務的結果時,仍然需要等待。 select 雖然可以同時等待多個任務中某一個或幾個就位后,再執行后續操作,但仍然有一部分時間是被等待消耗掉的。而且如果是純同步非阻塞的情況下,我們也很難在循環開始后,動態添加更多的任務到這個循環中去。

所以,如果我們希望程序能夠更加高效,更加靈活,就需要引入異步方式。

傳統的異步方式有什么問題

一提到異步模式,大家腦子中的第一印象可能就是 回調、回調、回調 。是的,這是最簡單最直接也是之前最常見的異步模式。只要在調用異步函數時設置一個或多個回調函數,函數就會在完成時自動調用回調函數。或者為一個對象設置一堆事件,之后調用該對象上的某個異步方法,雖然這個異步方法本身可能不再需要設置回調函數,但是設置的這堆事件實際上跟回調函數所起到的作用是一樣的。

如果你的程序邏輯夠簡單,簡單的一兩層回調也許并不會讓你覺得異步方式的編程有什么麻煩。但如果你的程序邏輯一旦有些復雜,你可能就會被層層回調搞得疲憊不堪了。當然,實際上你的程序需要層層回調的原因,也許并不是你的程序邏輯真的復雜,而是你沒有辦法將回調函數中的參數結果傳出來,所以,你就不得不將另一個回調函數傳進去。

我們來舉一個簡單的例子,假設我們有 1 個同步函數:

function sum($a, $b) {
    return $a + $b;
}

然后我們按照下面的方式去調用它:

$a = sum(1, 2);
$b = sum($a, 3);
$c = sum($b, 4);
var_dump(array($a, $b, $c));

雖然上面的代碼很不精簡,但我們要表達的意圖很明確,而且代碼看起來很清楚。

那接下來我們把這個函數換成一個形式上的異步函數,例如:

function async_sum($a, $b, $callback) {
    $callback($a + $b);
}

當然,它的執行并不是異步的,這里我們先不關心它的實現是不是真異步的。

現在如果要做上面同樣的操作,代碼就要這樣寫了:

async_sum(1, 2, function($a) {
    async_sum($a, 3, function($b) use ($a) {
        async_sum($b, 4, function($c) use ($a, $b) {
            var_dump(array($a, $b, $c));
        });
    });
});

代碼的執行結果是一樣的。但異步的代碼看起來顯然更難讀一些,雖然這已經是很簡單的例子了。

好了,看到這里,有些讀者可能會覺的我上面的這個例子很糟糕。因為明明有同步的函數可以使用,并且代碼清晰可讀,為啥非要寫個形似異步的函數,把本來同步可以做的很好的事情用異步方式復雜化呢?而且那個異步調用的方式,最后不還是想要實現同步化的結果嗎?

如果你這么想的話,一點都沒錯。但我們這里想要解決的問題是,如果我們拿到的只有一個異步函數,這個函數沒有同步實現,我們也不知道這個異步函數的內部定義是怎樣的,我們也沒辦法將這個異步函數改為同步函數實現。那我們有沒有辦法將上面的程序改的更可讀一些呢?

當然是可以的,所以,現在 Promise 要登場了。

為什么要引入 Promise

通常我們對 Promise 的一個誤解就是,它要解決的是層層回調的問題,比如上面的問題看上去就是一個典型的層層回調的問題。

然而實際上,Promise 要解決的并不是回調不回調的問題,如果你使用過 Promise 的話,你會發現使用 Promise 你仍然少不了要使用回調。Promise 要解決的問題是,如何將回調方法的參數從回調方法中傳遞出來,讓它可以像同步函數的返回結果一樣,在回調函數以外的控制范圍內,可以傳遞和復用。

我覺得這幾篇文章講的比較透徹,所以我就不重復文章中的內容了。

下面我們來看上面的例子用 Promise 如何解。

我們現在用最簡單粗暴的方式來引入 Hprose 的庫,直接復制源碼而不是使用 composer。然后我們在代碼中直接使用:

<?php
require_once("Hprose.php");
use Hprose\Promise;

這種方式來引入 Hprose 的 Promise 庫,當然你也可以寫成:

<?php
require_once("Hprose.php");
use Hprose\Future;

Future 庫跟 Promise 庫基本上是一樣的,你可以認為 Future 是 Promise 的具體實現, Promise 只是 Future 實現的一個包裝。這個區別你可以從源碼中直接看出來,這里就不多做解釋了。

接下來,我們要把前面的 async_sum 函數 Promise 化,Hprose 提供了這樣一個函數: Promisepromisify (或者 Futurepromisify ),它的作用就是將一個使用回調方式的異步函數變成一個返回 Promise 對象的異步函數。這樣說,也許有些不好理解,下面直接上代碼:

<?php
require_once("Hprose.php");

use Hprose\Promise;

function async_sum($a, $b, $callback) {
    $callback($a + $b);
}

$sum = Promise\promisify('async_sum');

$a = $sum(1, 2);
$b = $a->then(function($a) use ($sum) {
    return $sum($a, 3);
});
$c = $b->then(function($b) use ($sum) {
   return $sum($b, 4);
});

Promise\all(array($a, $b, $c))->then(function($result) {
    var_dump($result);
});

好了,看到這里,如果你對 Promise 的理解還不夠深入的話,你的第一反應可能是:這不是把程序變得更復雜了嗎?原來的程序是 3 個回調,現在仍然是 3 個回調,還多了包裝,都玩出花來了,有意思嗎?

確實,從上面的代碼來看,代碼并沒有被簡化,但是你會發現,現在回調函數中的參數已經通過 Promise 返回值的方式傳遞出來了,而且可以在原本的回調函數控制范圍以外被傳遞和復用了。

但是你可能會說然并卵,程序不是仍然很復雜嗎?那我們就來進一步簡化一下:

<?php
require_once("Hprose.php");

use Hprose\Promise;

function async_sum($a, $b, $callback) {
    $callback($a + $b);
}

$sum = Promise\wrap(Promise\promisify('async_sum'));
$var_dump = Promise\wrap('var_dump');

$a = $sum(1, 2);
$b = $sum($a, 3);
$c = $sum($b, 4);

$var_dump(Promise\all(array($a, $b, $c)));

現在,代碼中再也看不到回調了。因為我們把函數包裝成了可以接收 Promise 變量的函數。當然,其實現細節略微有些復雜,如果你感興趣,可以去看一下源碼,這里就不做源碼剖析了。如果感興趣的讀者多得話,以后有時間再寫源碼剖析。

當然,如果你只是想把異步調用同步化,除了 Promisewrap 外,你還可以通過 co/yield 協程來實現。

Hprose 中的 co/yield 協程

還是上面的例子,如果你使用的是 PHP 5.5 或者更高版本,那么你可以這樣來寫代碼了。

<?php
require_once("Hprose.php");

use Hprose\Promise;

function async_sum($a, $b, $callback) {
    $callback($a + $b);
}

Promise\co(function() {
    $sum = Promise\promisify('async_sum');

    $a = (yield $sum(1, 2));
    $b = (yield $sum($a, 3));
    $c = (yield $sum($b, 4));

    var_dump(array($a, $b, $c));
});

這代碼比使用 Promisewrap 的又要簡單了。這里,代碼中的變量 $a , $b , $c 不再是 Promise 變量,而是實實在在的整數變量。也就是說, yield 把一個 Promise 變量變成了一個普通變量。

現在 Promiseco 中的代碼已經被實實在在的同步化了。

現在你可能有新的疑問了,異步不是為了高效嗎?現在把原本的異步代碼同步化了,那還會高效嗎?

當然,對這個例子上來說,效率肯定是沒有提高,反而是嚴重降低的。甚至在這個例子中,最原始的那個形似異步的實現也不比同步實現更高效。因為在這個例子中,并沒有涉及到并發和 IO 阻塞的情況。

下面我們就放到真實場景下來看看 Promise 和 co/yield 協程是怎么用的。

在 swoole 下使用 Promise 和 co/yield 協程

我們知道在 PHP 中,如果要讓程序延時可以使用 sleep 函數(或者 usleep , time_nanosleep 函數)來讓程序阻塞一會兒,但是這個阻塞會讓整個進程都阻塞,所以在阻塞期間,什么都不能干。

下面我們來看看使用 swoole_timer_after 實現的延時執行:

<?php
require_once("Hprose.php");

use Hprose\Future;

date_default_timezone_set('UTC');

function wait($time) {
    $wait = Future\promisify('swoole_timer_after');
    for ($i = 0; $i < 5; $i++) {
        yield $wait($time);
        var_dump("wait ". ($time / 1000) . "s, now is " . date("H:i:s"));
    }
}

Future\co(wait(2000));
Future\co(wait(1000));

該程序執行結果如下:

string(24) "wait 1s, now is 13:48:25"
string(24) "wait 2s, now is 13:48:26"
string(24) "wait 1s, now is 13:48:26"
string(24) "wait 1s, now is 13:48:27"
string(24) "wait 2s, now is 13:48:28"
string(24) "wait 1s, now is 13:48:28"
string(24) "wait 1s, now is 13:48:29"
string(24) "wait 2s, now is 13:48:30"
string(24) "wait 2s, now is 13:48:32"
string(24) "wait 2s, now is 13:48:34"

從結果中我們可以看出, wait(2000) 和 wait(1000) 各自都是順序阻塞執行的,但是它們之間卻是并發執行的。

也就是說,協程之間并不會相互阻塞,雖然這幾個并發的協程是在同一個進程內跑的。

最后我們再來看一個用 co/yield 協程實現的并發抓圖程序:

<?php
require_once("Hprose.php");

use Hprose\Promise;

function fetch($url) {
    $dns_lookup = Promise\promisify('swoole_async_dns_lookup');
    $writefile = Promise\promisify('swoole_async_writefile');
    $url = parse_url($url);
    list($host, $ip) = (yield $dns_lookup($url['host']));
    $cli = new swoole_http_client($ip, isset($url['port']) ? $url['port'] : 80);
    $cli->setHeaders([
        'Host' => $host,
        "User-Agent" => 'Chrome/49.0.2587.3',
    ]);
    $get = Promise\promisify([$cli, 'get']);
    yield $get($url['path']);
    list($filename) = (yield $writefile(basename($url['path']), $cli->body));
    echo "write $filename ok.\r\n";
    $cli->close();
}

$urls = array(
    'http://b.hiphotos.baidu.com/baike/c0%3Dbaike116%2C5%2C5%2C116%2C38/sign=5f4519ba037b020818c437b303b099b6/472309f790529822434d08dcdeca7bcb0a46d4b6.jpg',
    'http://f.hiphotos.baidu.com/baike/c0%3Dbaike116%2C5%2C5%2C116%2C38/sign=1c37718b3cc79f3d9becec62dbc8a674/38dbb6fd5266d016dc2eaa5c902bd40735fa358a.jpg',
    'http://h.hiphotos.baidu.com/baike/c0%3Dbaike116%2C5%2C5%2C116%2C38/sign=edd05c9c502c11dfcadcb771024e09b5/d6ca7bcb0a46f21f3100c52cf1246b600c33ae9d.jpg',
    'http://a.hiphotos.baidu.com/baike/c0%3Dbaike92%2C5%2C5%2C92%2C30/sign=4693756e8094a4c21e2eef796f9d70b0/54fbb2fb43166d22df5181f5412309f79052d2a9.jpg',
    'http://a.hiphotos.baidu.com/baike/c0%3Dbaike92%2C5%2C5%2C92%2C30/sign=9388507144a98226accc2375ebebd264/faf2b2119313b07eb2cc820c0bd7912397dd8c45.jpg',
);

foreach ($urls as $url) {
    Promise\co(fetch($url));
}

在這個程序中, fetch 函數內的代碼是同步執行的,但是多個 fetch 之間卻是并發執行的,從結果輸出就可以看出來,輸出順序是不一定的。但最后,你總能得到所有的美圖。

總結:通過 swoole 跟 hprose 中的 Promise 和 co/yield 協程相結合,你可以方便的使用同步的方式來調用 swoole 中的異步函數和方法,并可以實現協程間的并發。

因為篇幅所限,這里無法把 hprose 中 Promise 和 co/yield 協程的全部內容都介紹完,如果你想了解更多,可以參考下面兩篇內容:

 

來自:https://segmentfault.com/a/1190000007760513

 

 本文由用戶 JanSpringfi 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!