PHP使用QPM實現多進程并行任務處理程序
PHP 是強大的web開發語言,以至于大家常常忘記PHP 可以用來開發健壯的命令行(CLI)程序以至于daemon程序,而編寫daemon程序免不了與各種進程管理打交道,使用QPM編寫多進程程序非常簡單。本文即是使用QPM的Supervisor::taskFactoryMode()實現多進程并行任務處理程序的一個例子。
考慮用PHP實現以下場景: 有一個抓站的URL列表保存在隊列里,后臺程序讀取這個隊列,然后轉交給子進程去抓取HTML存放到文件里。 為了提高效率,允許多任務并行執行,但為了避免機器負載過高,限制了最大的并行任務數(為了測試方便,我們把這個數設為3),當隊列中取到 END標記時,程序結束運行。
這個場景用QPM的Supervisor::taskFactoryMode()實現,非常簡單。
QPM 全名是 Quick Process Management Module for PHP. PHP 是強大的web開發語言,以至于大家常常忘記PHP 可以用來開發健壯的命令行(CLI)程序以至于daemon程序。 而編寫daemon程序免不了與各種進程管理打交道。QPM正式為簡化進程管理而開發的類庫。QPM的項目地址是:https://github.com/Comos/qpm
為了,簡化測試環境,我們可以用一個文本文件來模擬隊列的數據。完整的例子文件看這里:spider_task_factory_data.txt
http://news.sina.com.cn/http://news.ifeng.com/http://news.163.com/http://news.sohu.com/http://ent.sina.com.cn/http://ent.ifeng.com/ ... END
使用QPM的taskFactoryMode之前,我們需要準備一個TaskFactory類。 我們將其命名為 SpiderTaskFactory,SpdierTaskFactory 的工廠方法fetchTask 正常返回 Runnable的子類的實例。當碰到END或文件結束,則throw StopSignal,這樣程序就會終止。
以下是組裝 Supervisor 并執行的代碼片段。完整的例子見:spider_task_factory.php
//如果沒有從參數指定輸入,把spider_task_factory_data.txt作為數據源$input = isset($argv[1]) ? $argv[1] : __DIR__.'/spider_task_factory_data.txt';$spiderTaskFactory = new SpiderTaskFactory($input);$config = [ //指定taskFactory對象和工廠方法 'factoryMethod'=>[$spiderTaskFactory, 'fetchTask'], //指定最大并發數量為3 'quantity' => 3, ];//啟動Supervisorqpm\supervisor\Supervisor::taskFactoryMode($config)->start();
SpiderTaskFactory 的實現如下:
/** * 任務工廠,必須實現 fetchTask方法。 * 該方法正常返回 * */class SpiderTaskFactory {private $_fh;public function __construct($input) { $this->_input = $input; $this->_fh = fopen($input, 'r'); if ($this->_fh === false) { throw new Exception('fopen failed:'.$input); } }public function fetchTask() { while (true) { if (feof($this->_fh)) { throw new qpm\supervisor\StopSignal(); } $line = trim(fgets($this->_fh)); if ($line == 'END') { throw new qpm\supervisor\StopSignal(); } if (empty($line)) { continue; } break; } return new SpiderTask($line); } }
SpiderTask 的實現如下:
/** * 在子進程中執行任務的類 * 必須實現 qpm\process\Runnable 接口 */class SpiderTask implements qpm\process\Runnable {private $_target;public function __construct($target) { $this->_target = $target; }//在子進程中執行的部分public function run() { $r = @file_get_contents($this->_target); if ($r===false) { throw new Exception('fail to crawl url:'.$this->_target); } file_put_contents($this->getLocalFilename(), $r); }private function getLocalFilename() { $filename = str_replace('/', '~', $this->_target); $filename = str_replace(':', '_', $filename); $filename = $filename.'-'.date('YmdHis'); return __DIR__.'/_spider/'.$filename.'.html'; } }
真實的生產環境,用隊列替換文件輸入,即可實現持久運行的生產者/消費者模型的程序。
來自:http://my.oschina.net/bigbigant/blog/385021