PHP使用QPM實現多進程并行任務處理程序

jopen 9年前發布 | 21K 次閱讀 PHP PHP開發

PHP 是強大的web開發語言,以至于大家常常忘記PHP 可以用來開發健壯的命令行(CLI)程序以至于daemon程序,而編寫daemon程序免不了與各種進程管理打交道,使用QPM編寫多進程程序非常簡單。本文即是使用QPM的Supervisor::taskFactoryMode()實現多進程并行任務處理程序的一個例子。

考慮用PHP實現以下場景: 有一個抓站的URL列表保存在隊列里,后臺程序讀取這個隊列,然后轉交給子進程去抓取HTML存放到文件里。 為了提高效率,允許多任務并行執行,但為了避免機器負載過高,限制了最大的并行任務數(為了測試方便,我們把這個數設為3),當隊列中取到 END標記時,程序結束運行。

這個場景用QPM的Supervisor::taskFactoryMode()實現,非常簡單。

QPM 全名是 Quick Process Management Module for PHP. PHP 是強大的web開發語言,以至于大家常常忘記PHP 可以用來開發健壯的命令行(CLI)程序以至于daemon程序。 而編寫daemon程序免不了與各種進程管理打交道。QPM正式為簡化進程管理而開發的類庫。QPM的項目地址是:https://github.com/Comos/qpm

為了,簡化測試環境,我們可以用一個文本文件來模擬隊列的數據。完整的例子文件看這里:spider_task_factory_data.txt

http://news.sina.com.cn/http://news.ifeng.com/http://news.163.com/http://news.sohu.com/http://ent.sina.com.cn/http://ent.ifeng.com/
...
END

使用QPM的taskFactoryMode之前,我們需要準備一個TaskFactory類。 我們將其命名為 SpiderTaskFactory,SpdierTaskFactory 的工廠方法fetchTask 正常返回 Runnable的子類的實例。當碰到END或文件結束,則throw StopSignal,這樣程序就會終止。

以下是組裝 Supervisor 并執行的代碼片段。完整的例子見:spider_task_factory.php

//如果沒有從參數指定輸入,把spider_task_factory_data.txt作為數據源$input = isset($argv[1]) ? $argv[1] : __DIR__.'/spider_task_factory_data.txt';$spiderTaskFactory = new SpiderTaskFactory($input);$config = [    //指定taskFactory對象和工廠方法
    'factoryMethod'=>[$spiderTaskFactory, 'fetchTask'],    //指定最大并發數量為3
    'quantity' => 3,
];//啟動Supervisorqpm\supervisor\Supervisor::taskFactoryMode($config)->start();

SpiderTaskFactory 的實現如下:

/**
 * 任務工廠,必須實現 fetchTask方法。
 * 該方法正常返回
 * 
*/class SpiderTaskFactory {private $_fh;public function __construct($input) {
    $this->_input = $input;    $this->_fh = fopen($input, 'r');    if ($this->_fh === false) {        throw new Exception('fopen failed:'.$input);
    }
}public function fetchTask() {
    while (true) {        if (feof($this->_fh)) {            throw new qpm\supervisor\StopSignal();
        }        $line = trim(fgets($this->_fh));        if ($line == 'END') {            throw new qpm\supervisor\StopSignal();
        }        if (empty($line)) {            continue;
        }        break;
    }    return new SpiderTask($line);
}
}

SpiderTask 的實現如下:

/**
 * 在子進程中執行任務的類
 * 必須實現 qpm\process\Runnable 接口
 */class SpiderTask implements qpm\process\Runnable {private $_target;public function __construct($target) {
    $this->_target = $target;
}//在子進程中執行的部分public function run() {
    $r = @file_get_contents($this->_target);    if ($r===false) {        throw new Exception('fail to crawl url:'.$this->_target);
    }
    file_put_contents($this->getLocalFilename(), $r);   
}private function getLocalFilename() {
    $filename = str_replace('/', '~', $this->_target);    $filename = str_replace(':', '_', $filename);    $filename = $filename.'-'.date('YmdHis');    return __DIR__.'/_spider/'.$filename.'.html';
}
}

真實的生產環境,用隊列替換文件輸入,即可實現持久運行的生產者/消費者模型的程序。



來自:http://my.oschina.net/bigbigant/blog/385021

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!