php實現的memcached隊列類

jopen 9年前發布 | 1K 次閱讀 PHP

    <?php
/*

 * memcache隊列類 
 * 支持多進程并發寫入、讀取 
 * 邊寫邊讀,AB面輪值替換 
 * @author guoyu 
 * @create on 9:25 2014-9-28 
 * @qq技術行業交流群:136112330 
 * 
 * @example: 
 *      $obj = new memcacheQueue('duilie'); 
 *      $obj->add('1asdf'); 
 *      $obj->getQueueLength(); 
 *      $obj->read(11); 
 *      $obj->get(8); 
 */  

class memcacheQueue{  
    public static   $client;            //memcache客戶端連接  
    public          $access;            //隊列是否可更新     
    private         $currentSide;       //當前輪值的隊列面:A/B  
    private         $lastSide;          //上一輪值的隊列面:A/B  
    private         $sideAHead;         //A面隊首值  
    private         $sideATail;         //A面隊尾值  
    private         $sideBHead;         //B面隊首值  
    private         $sideBTail;         //B面隊尾值  
    private         $currentHead;       //當前隊首值  
    private         $currentTail;       //當前隊尾值  
    private         $lastHead;          //上輪隊首值  
    private         $lastTail;          //上輪隊尾值   
    private         $expire;            //過期時間,秒,1~2592000,即30天內;0為永不過期  
    private         $sleepTime;         //等待解鎖時間,微秒  
    private         $queueName;         //隊列名稱,唯一值  
    private         $retryNum;          //重試次數,= 10 * 理論并發數  

    const   MAXNUM      = 2000;                 //(單面)最大隊列數,建議上限10K  
    const   HEAD_KEY    = '_lkkQueueHead_';     //隊列首kye  
    const   TAIL_KEY    = '_lkkQueueTail_';     //隊列尾key  
    const   VALU_KEY    = '_lkkQueueValu_';     //隊列值key  
    const   LOCK_KEY    = '_lkkQueueLock_';     //隊列鎖key  
    const   SIDE_KEY    = '_lkkQueueSide_';     //輪值面key  

    /* 
     * 構造函數 
     * @param   [config]    array   memcache服務器參數 
     * @param   [queueName] string  隊列名稱 
     * @param   [expire]    string  過期時間 
     * @return  NULL 
     */  
    public function __construct($queueName ='',$expire='',$config =''){  
        if(empty($config)){  
            self::$client = memcache_pconnect('localhost',11211);  
        }elseif(is_array($config)){//array('host'=>'127.0.0.1','port'=>'11211')  
            self::$client = memcache_pconnect($config['host'],$config['port']);  
        }elseif(is_string($config)){//"127.0.0.1:11211"  
            $tmp = explode(':',$config);  
            $conf['host'] = isset($tmp[0]) ? $tmp[0] : '127.0.0.1';  
            $conf['port'] = isset($tmp[1]) ? $tmp[1] : '11211';  
            self::$client = memcache_pconnect($conf['host'],$conf['port']);       
        }  
        if(!self::$client) return false;  

        ignore_user_abort(TRUE);//當客戶斷開連接,允許繼續執行  
        set_time_limit(0);//取消腳本執行延時上限  

        $this->access = false;  
        $this->sleepTime = 1000;  
        $expire = (empty($expire) && $expire!=0) ? 3600 : (int)$expire;  
        $this->expire = $expire;  
        $this->queueName = $queueName;  
        $this->retryNum = 10000;  

        $side = memcache_add(self::$client, $queueName . self::SIDE_KEY, 'A',false, $expire);  
        $this->getHeadNTail($queueName);  
        if(!isset($this->sideAHead) || empty($this->sideAHead)) $this->sideAHead = 0;  
        if(!isset($this->sideATail) || empty($this->sideATail)) $this->sideATail = 0;  
        if(!isset($this->sideBHead) || empty($this->sideBHead)) $this->sideBHead = 0;  
        if(!isset($this->sideBHead) || empty($this->sideBHead)) $this->sideBHead = 0;  
    }  

    /* 
     * 獲取隊列首尾值 
     * @param   [queueName] string  隊列名稱 
     * @return  NULL 
     */  
    private function getHeadNTail($queueName){  
        $this->sideAHead = (int)memcache_get(self::$client, $queueName.'A'. self::HEAD_KEY);  
        $this->sideATail = (int)memcache_get(self::$client, $queueName.'A'. self::TAIL_KEY);  
        $this->sideBHead = (int)memcache_get(self::$client, $queueName.'B'. self::HEAD_KEY);  
        $this->sideBTail = (int)memcache_get(self::$client, $queueName.'B'. self::TAIL_KEY);  
    }  

    /* 
     * 獲取當前輪值的隊列面 
     * @return  string  隊列面名稱 
     */  
    public function getCurrentSide(){  
        $currentSide = memcache_get(self::$client, $this->queueName . self::SIDE_KEY);  
        if($currentSide == 'A'){  
            $this->currentSide = 'A';  
            $this->lastSide = 'B';    

            $this->currentHead  = $this->sideAHead;  
            $this->currentTail  = $this->sideATail;  
            $this->lastHead     = $this->sideBHead;  
            $this->lastTail     = $this->sideBTail;           
        }else{  
            $this->currentSide = 'B';  
            $this->lastSide = 'A';  

            $this->currentHead  = $this->sideBHead;  
            $this->currentTail  = $this->sideBTail;  
            $this->lastHead     = $this->sideAHead;  
            $this->lastTail     = $this->sideATail;                       
        }  

        return $this->currentSide;  
    }  

    /* 
     * 隊列加鎖 
     * @return boolean 
     */  
    private function getLock(){  
        if($this->access === false){  
            while(!memcache_add(self::$client, $this->queueName .self::LOCK_KEY, 1, false, $this->expire) ){  
                usleep($this->sleepTime);  
                @$i++;  
                if($i > $this->retryNum){//嘗試等待N次  
                    return false;  
                    break;  
                }  
            }  
            return $this->access = true;  
        }  
        return false;  
    }  

    /* 
     * 隊列解鎖 
     * @return NULL 
     */  
    private function unLock(){  
        memcache_delete(self::$client, $this->queueName .self::LOCK_KEY);  
        $this->access = false;  
    }  

    /* 
     * 添加數據 
     * @param   [data]  要存儲的值 
     * @return  boolean 
     */  
    public function add($data){  
        $result = false;  
        if(!$this->getLock()){  
            return $result;  
        }   
        $this->getHeadNTail($this->queueName);  
        $this->getCurrentSide();  

        if($this->isFull()){  
            $this->unLock();  
            return false;  
        }  

        if($this->currentTail < self::MAXNUM){  
            $value_key = $this->queueName .$this->currentSide . self::VALU_KEY . $this->currentTail;  
            if(memcache_add(self::$client, $value_key, $data, false, $this->expire)){  
                $this->changeTail();  
                $result = true;  
            }  
        }else{//當前隊列已滿,更換輪值面  
            $this->unLock();  
            $this->changeCurrentSide();  
            return $this->add($data);  
        }  

        $this->unLock();  
        return $result;  
    }  

    /* 
     * 取出數據 
     * @param   [length]    int 數據的長度 
     * @return  array 
     */  
    public function get($length=0){  
        if(!is_numeric($length)) return false;  
        if(empty($length)) $length = self::MAXNUM * 2;//默認讀取所有  
        if(!$this->getLock()) return false;  

        if($this->isEmpty()){  
            $this->unLock();  
            return false;  
        }  

        $keyArray   = $this->getKeyArray($length);  
        $lastKey    = $keyArray['lastKey'];  
        $currentKey = $keyArray['currentKey'];  
        $keys       = $keyArray['keys'];  
        $this->changeHead($this->lastSide,$lastKey);  
        $this->changeHead($this->currentSide,$currentKey);  

        $data   = @memcache_get(self::$client, $keys);  
        foreach($keys as $v){//取出之后刪除  
            @memcache_delete(self::$client, $v, 0);  
        }  
        $this->unLock();  

        return $data;  
    }  

    /* 
     * 讀取數據 
     * @param   [length]    int 數據的長度 
     * @return  array 
     */  
    public function read($length=0){  
        if(!is_numeric($length)) return false;  
        if(empty($length)) $length = self::MAXNUM * 2;//默認讀取所有  
        $keyArray   = $this->getKeyArray($length);  
        $data   = @memcache_get(self::$client, $keyArray['keys']);  
        return $data;  
    }  

    /* 
     * 獲取隊列某段長度的key數組 
     * @param   [length]    int 隊列長度 
     * @return  array 
     */  
    private function getKeyArray($length){  
        $result = array('keys'=>array(),'lastKey'=>array(),'currentKey'=>array());  
        $this->getHeadNTail($this->queueName);  
        $this->getCurrentSide();  
        if(empty($length)) return $result;  

        //先取上一面的key  
        $i = $result['lastKey'] = 0;  
        for($i=0;$i<$length;$i++){  
            $result['lastKey'] = $this->lastHead + $i;  
            if($result['lastKey'] >= $this->lastTail) break;  
            $result['keys'][] = $this->queueName .$this->lastSide . self::VALU_KEY . $result['lastKey'];  
        }  

        //再取當前面的key  
        $j = $length - $i;  
        $k = $result['currentKey'] = 0;  
        for($k=0;$k<$j;$k++){  
            $result['currentKey'] = $this->currentHead + $k;  
            if($result['currentKey'] >= $this->currentTail) break;  
            $result['keys'][] = $this->queueName .$this->currentSide . self::VALU_KEY . $result['currentKey'];  
        }  

        return $result;  
    }  

    /* 
     * 更新當前輪值面隊列尾的值 
     * @return  NULL 
     */  
    private function changeTail(){  
        $tail_key = $this->queueName .$this->currentSide . self::TAIL_KEY;  
        memcache_add(self::$client, $tail_key, 0,false, $this->expire);//如果沒有,則插入;有則false;  
        //memcache_increment(self::$client, $tail_key, 1);//隊列尾+1  
        $v = memcache_get(self::$client, $tail_key) +1;  
        memcache_set(self::$client, $tail_key,$v,false,$this->expire);  
    }  

    /* 
     * 更新隊列首的值 
     * @param   [side]      string  要更新的面 
     * @param   [headValue] int     隊列首的值 
     * @return  NULL 
     */  
    private function changeHead($side,$headValue){  
        if($headValue < 1) return false;  
        $head_key = $this->queueName .$side . self::HEAD_KEY;  
        $tail_key = $this->queueName .$side . self::TAIL_KEY;  
        $sideTail = memcache_get(self::$client, $tail_key);  
        if($headValue < $sideTail){  
            memcache_set(self::$client, $head_key,$headValue+1,false,$this->expire);  
        }elseif($headValue >= $sideTail){  
            $this->resetSide($side);  
        }  
    }  

    /* 
     * 重置隊列面,即將該隊列面的隊首、隊尾值置為0 
     * @param   [side]  string  要重置的面 
     * @return  NULL 
     */  
    private function resetSide($side){  
        $head_key = $this->queueName .$side . self::HEAD_KEY;  
        $tail_key = $this->queueName .$side . self::TAIL_KEY;  
        memcache_set(self::$client, $head_key,0,false,$this->expire);  
        memcache_set(self::$client, $tail_key,0,false,$this->expire);  
    }  

    /* 
     * 改變當前輪值隊列面 
     * @return  string 
     */  
    private function changeCurrentSide(){  
        $currentSide = memcache_get(self::$client, $this->queueName . self::SIDE_KEY);  
        if($currentSide == 'A'){  
            memcache_set(self::$client, $this->queueName . self::SIDE_KEY,'B',false,$this->expire);  
            $this->currentSide = 'B';  
        }else{  
            memcache_set(self::$client, $this->queueName . self::SIDE_KEY,'A',false,$this->expire);  
            $this->currentSide = 'A';  
        }  
        return $this->currentSide;  
    }  

    /* 
     * 檢查當前隊列是否已滿 
     * @return  boolean 
     */  
    public function isFull(){  
        $result = false;  
        if($this->sideATail == self::MAXNUM && $this->sideBTail == self::MAXNUM){  
            $result = true;  
        }  
        return $result;  
    }  

    /* 
     * 檢查當前隊列是否為空 
     * @return  boolean 
     */  
    public function isEmpty(){  
        $result = true;  
        if($this->sideATail > 0 || $this->sideBTail > 0){  
            $result = false;  
        }  
        return $result;  
    }  

    /* 
     * 獲取當前隊列的長度 
     * 該長度為理論長度,某些元素由于過期失效而丟失,真實長度小于或等于該長度 
     * @return  int 
     */  
    public function getQueueLength(){  
        $this->getHeadNTail($this->queueName);  
        $this->getCurrentSide();  

        $sideALength = $this->sideATail - $this->sideAHead;  
        $sideBLength = $this->sideBTail - $this->sideBHead;  
        $result = $sideALength + $sideBLength;  

        return $result;  
    }  

    /* 
     * 清空當前隊列數據,僅保留HEAD_KEY、TAIL_KEY、SIDE_KEY三個key 
     * @return  boolean 
     */  
    public function clear(){  
        if(!$this->getLock()) return false;  
        for($i=0;$i<self::MAXNUM;$i++){  
            @memcache_delete(self::$client, $this->queueName.'A'. self::VALU_KEY .$i, 0);  
            @memcache_delete(self::$client, $this->queueName.'B'. self::VALU_KEY .$i, 0);  
        }  
        $this->unLock();  
        $this->resetSide('A');  
        $this->resetSide('B');  
        return true;  
    }  

    /* 
     * 清除所有memcache緩存數據 
     * @return  NULL 
     */  
    public function memFlush(){  
        memcache_flush(self::$client);  
    }  

}  </pre> 


 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!