基于Gearman的PHP封裝類

openkk 12年前發布 | 33K 次閱讀 PHP PHP開發

在多個jobs server時,PHP的gearman擴展在jobs server端口不通的情況下會自動檢測出來,從而自動切換到另一個;但是在IP不通的情況下,就會出錯了。

該封裝解決了幾個問題:

1、Jobs Server的IP如果突然不可達(例如機器關機),worker會自動重新添加jobs server(默認會報錯然后worker終止);

2、添加jobs server時候只添加有效的server(如果把IP不通的server添加進去,后面會報錯);

PS:最下面那個函數就是我用來檢測有效jobs server的,粗暴的方法:P

    /** 
     * worker 類 
     */  
    class GW{  
        private $enabled = False;  
        private $worker = null;  
        private $config = array();     
        private $task = array(); //注冊的任務信息  

        /** 
         * 創建worker對象,添加job服務器 
         * @param  array    $config     job server config 
         * @return void 
         */  
        public function __construct($config)  
        {  
            $this->config = $config;  
            $this->initWorker();  
        }  

        /** 
         * 注冊任務及處理函數 
         * @param   string  $task_name  要注冊的"任務" 
         * @param   string  $fun_name   對應的處理函數的函數名 
         * @return  boolean 
         */  
        public function regTask($task_name, $fun_name){  
            if (!$this->enabled){  
                $ret = false;  
            } else {  
                //Register and add callback function  
                $ret = $this->worker->addFunction($task_name, $fun_name);  
                if ($ret){  
                    $this->task[$task_name] = $fun_name;      
                }  
            }  
            return $ret;  
        }  

        /** 
         * 運行worker 
         * @return  void/boolean 
         */  
        public function run(){  
            if (!$this->enabled){  
                return False;  
            }  
            while(@$this->worker->work() || $this->worker->returnCode()!=GEARMAN_SUCCESS)  
            {             
                // 運行中出錯  
                //echo "error: " . $this->worker->error() . "\n";  
                //echo "return_code: " . $this->worker->returnCode() . "\n";  

                // 自動重啟worker,并重新注冊之前的task  
                if($this->worker->returnCode()!=GEARMAN_SUCCESS)  
                {  
                    $this->enabled = False;  
                    $this->initWorker();  
                    $ret = $this->reloadTask();  
                    if ($ret){  
                        // 重啟成功  
                        $this->run();  
                    } else {  
                        // 重啟失敗  
                    }  
                }         
            }  
        }  

        /** 
         * 添加有效的jobs server到worker中 
         */  
        private function initWorker()  
        {         
            $this->worker = new GearmanWorker();  
            //add job server  
            foreach ($this->config as $value) {            
                $host = trim(strval($value['host']));  
                $port = array_key_exists('port', $value) ? intval($value['port']) : 4730;  
                if(!check_conn($host, $port)){  
                    continue;  
                } else {  
                    $this->worker->addServer($host,$port);  
                    $this->enabled = True;  
                }  
            }  

        }  

        /** 
         * 重新注冊之前的任務 
         */  
        private function reloadTask(){  
            $ret = False;  
            foreach ($this->task as $task_name => $fun_name){  
                $ret = $this->regTask($task_name, $fun_name);      
            }  
            return $ret;  
        }  

        /** 
         * 是否已添加有效jobs server 
         */  
        public function isEnabled(){  
            return $this->enabled;      
        }  
    }     


    /** 
     * client 類 
     */  
    class GC{  
        private $enabled = false;  
        private $client = null;  

        /** 
         * 創建client對象,添加job服務器 
         * @param  array    $config     job server config 
         * @param  int      $timeout    超時時間(毫秒) 
         * @return void 
         */  
        public function __construct($config, $timeout=5000)  
        {  
            $this->client = new GearmanClient();  

            //add job server  
            foreach ($config as $value) {   
                $host = trim(strval($value['host']));  
                $port = array_key_exists('port', $value) ? intval($value['port']) : 4730;  
                if(!check_conn($host, $port)){  
                    continue;  
                } else {  
                    $this->client->addServer($host,$port);  
                    $this->enabled = True;  
                }  
            }  
            $this->enabled && $this->client->setTimeout($timeout);  
        }  
        /** 
         * 發送消息,并等待響應 
         * @param   string      任務名 
         * @param   string      該任務的數據 
         * @return  mixed       job server返回的結果 
         */  
        public function send($task_name, $task_data)  
        {  
            if (!$this->enabled){  
                $ret = false;     
            } else {  
                $ret = $this->client->do($task_name, strval($task_data));  
            }  
            return $ret;  
        }  

        /** 
         * 是否已添加有效jobs server 
         */  
        public function isEnabled(){  
            return $this->enabled;      
        }  
    }  


    /** 
     * 網絡檢測 
     * @param   string  機器IP 
     * @param   string  機器port 
     * @return  bool            
     */  
    function check_conn($ip, $port = 4730)  
    {  
        // socket鏈接測試,200ms超時  
        @$fp = fsockopen($ip, $port, $errno, $errstr, 0.2);   
        if ($fp){         
            $fp && fclose($fp);  
            return true;     
        } else {  
            return false;     
        }  
    }  

 本文由用戶 openkk 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!