基于Gearman的PHP封裝類
在多個jobs server時,PHP的gearman擴展在jobs server端口不通的情況下會自動檢測出來,從而自動切換到另一個;但是在IP不通的情況下,就會出錯了。
該封裝解決了幾個問題:
1、Jobs Server的IP如果突然不可達(例如機器關機),worker會自動重新添加jobs server(默認會報錯然后worker終止);
2、添加jobs server時候只添加有效的server(如果把IP不通的server添加進去,后面會報錯);
PS:最下面那個函數就是我用來檢測有效jobs server的,粗暴的方法:P
/** * worker 類 */ class GW{ private $enabled = False; private $worker = null; private $config = array(); private $task = array(); //注冊的任務信息 /** * 創建worker對象,添加job服務器 * @param array $config job server config * @return void */ public function __construct($config) { $this->config = $config; $this->initWorker(); } /** * 注冊任務及處理函數 * @param string $task_name 要注冊的"任務" * @param string $fun_name 對應的處理函數的函數名 * @return boolean */ public function regTask($task_name, $fun_name){ if (!$this->enabled){ $ret = false; } else { //Register and add callback function $ret = $this->worker->addFunction($task_name, $fun_name); if ($ret){ $this->task[$task_name] = $fun_name; } } return $ret; } /** * 運行worker * @return void/boolean */ public function run(){ if (!$this->enabled){ return False; } while(@$this->worker->work() || $this->worker->returnCode()!=GEARMAN_SUCCESS) { // 運行中出錯 //echo "error: " . $this->worker->error() . "\n"; //echo "return_code: " . $this->worker->returnCode() . "\n"; // 自動重啟worker,并重新注冊之前的task if($this->worker->returnCode()!=GEARMAN_SUCCESS) { $this->enabled = False; $this->initWorker(); $ret = $this->reloadTask(); if ($ret){ // 重啟成功 $this->run(); } else { // 重啟失敗 } } } } /** * 添加有效的jobs server到worker中 */ private function initWorker() { $this->worker = new GearmanWorker(); //add job server foreach ($this->config as $value) { $host = trim(strval($value['host'])); $port = array_key_exists('port', $value) ? intval($value['port']) : 4730; if(!check_conn($host, $port)){ continue; } else { $this->worker->addServer($host,$port); $this->enabled = True; } } } /** * 重新注冊之前的任務 */ private function reloadTask(){ $ret = False; foreach ($this->task as $task_name => $fun_name){ $ret = $this->regTask($task_name, $fun_name); } return $ret; } /** * 是否已添加有效jobs server */ public function isEnabled(){ return $this->enabled; } } /** * client 類 */ class GC{ private $enabled = false; private $client = null; /** * 創建client對象,添加job服務器 * @param array $config job server config * @param int $timeout 超時時間(毫秒) * @return void */ public function __construct($config, $timeout=5000) { $this->client = new GearmanClient(); //add job server foreach ($config as $value) { $host = trim(strval($value['host'])); $port = array_key_exists('port', $value) ? intval($value['port']) : 4730; if(!check_conn($host, $port)){ continue; } else { $this->client->addServer($host,$port); $this->enabled = True; } } $this->enabled && $this->client->setTimeout($timeout); } /** * 發送消息,并等待響應 * @param string 任務名 * @param string 該任務的數據 * @return mixed job server返回的結果 */ public function send($task_name, $task_data) { if (!$this->enabled){ $ret = false; } else { $ret = $this->client->do($task_name, strval($task_data)); } return $ret; } /** * 是否已添加有效jobs server */ public function isEnabled(){ return $this->enabled; } } /** * 網絡檢測 * @param string 機器IP * @param string 機器port * @return bool */ function check_conn($ip, $port = 4730) { // socket鏈接測試,200ms超時 @$fp = fsockopen($ip, $port, $errno, $errstr, 0.2); if ($fp){ $fp && fclose($fp); return true; } else { return false; } }
本文由用戶 openkk 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!