基于Zookeeper的分布式共享鎖

y35w 9年前發布 | 33K 次閱讀 分布式/云計算/大數據 ZooKeeper

首先,說說我們的場景,訂單服務是做成集群的,當兩個以上結點同時收到一個相同訂單的創建指令,這時并發就產生了,系統就會重復創建訂單。等等......場景。這時,分布式共享鎖就閃亮登場了。

共享鎖在同一個進程中是很容易實現的,但在跨進程或者在不同Server之間就不好實現了。Zookeeper就很容易實現。具體的實現原理官網和其它網站也有翻譯,這里就不在贅述了。

官網資料: http://zookeeper.apache.org/doc/r3.4.5/recipes.html

中文資料: https://www.ibm.com/developerworks/cn/opensource/os-cn-zookeeper

詳見Locks章節。

原理都知道了,網上一搜索Apache上面已經有提供了,既然已經有輪子了,哪我們也沒必要重復造輪子了吧!直接使用 Curator 。但是,我們在測試中發現,用于共享鎖的結點無法自動回收,除了最末一級的臨時結點會在鎖釋放和session超時的時候能自動回收外,其它結點均無法自動回收。我們的訂單一天有好幾萬,遇到618和雙十一的時候每天的訂單量超50W,如果結點長期不回收的話,肯定會影響Zookeeper的性能。這時,我們就想到了一句話“自己動手,豐衣足食”。下面直接上代碼:

首先,創建一個Maven工程,在pom文件里導入下面的包:

<dependencies>
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.4.6</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-client</artifactId>
            <version>2.8.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.8.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>2.8.0</version>
        </dependency>
        <dependency>
            <groupId>commons-beanutils</groupId>
            <artifactId>commons-beanutils</artifactId>
            <version>1.9.2</version>
        </dependency>
        <dependency>
            <groupId>commons-logging</groupId>
            <artifactId>commons-logging</artifactId>
            <version>1.2</version>
        </dependency>
        <dependency>
            <groupId>commons-lang</groupId>
            <artifactId>commons-lang</artifactId>
            <version>2.6</version>
        </dependency>
    </dependencies>

LockZookeeperClient接口:

package com.XXX.framework.lock;

import org.apache.curator.framework.CuratorFramework;

/**
 * 
 * description
 * 
 * @author Roadrunners
 * @version 1.0, 2015年7月9日
 */
public interface LockZookeeperClient {
    /**
     * 
     * @return
     */
    CuratorFramework getCuratorFramework();

    /**
     * 
     * @return
     */
    String getBasePath();

    /**
     * garbage collector
     * 
     * @param gcPath
     */
    void gc(String gcPath);
}

LockZookeeperClient接口的實現LockZookeeperClientFactory:

package com.XXX.framework.lock;

import java.util.Date;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentSkipListSet;

import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;

/**
 * 
 * description
 *  
 * @author Roadrunners
 * @version 1.0, 2015年7月9日
 */
public class LockZookeeperClientFactory implements LockZookeeperClient {
    private static final Log LOG = LogFactory.getLog(LockZookeeperClientFactory.class);

    private boolean hasGc = true;
    private Timer gcTimer;
    private TimerTask gcTimerTask;
    private ConcurrentSkipListSet<String> gcPaths = new ConcurrentSkipListSet<String>();
    private int gcIntervalSecond = 60;

    private CuratorFramework curatorFramework;
    private String zookeeperIpPort = "localhost:2181";
    private int sessionTimeoutMs = 10000;
    private int connectionTimeoutMs = 10000;
    private String basePath = "/locks";

    public void setHasGc(boolean hasGc) {
        this.hasGc = hasGc;
    }

    public void setGcIntervalSecond(int gcIntervalSecond) {
        this.gcIntervalSecond = gcIntervalSecond;
    }

    public void setZookeeperIpPort(String zookeeperIpPort) {
        this.zookeeperIpPort = zookeeperIpPort;
    }

    public void setSessionTimeoutMs(int sessionTimeoutMs) {
        this.sessionTimeoutMs = sessionTimeoutMs;
    }

    public void setConnectionTimeoutMs(int connectionTimeoutMs) {
        this.connectionTimeoutMs = connectionTimeoutMs;
    }

    public void setBasePath(String basePath) {
        basePath = basePath.trim();
        if (basePath.endsWith("/")) {
            basePath = basePath.substring(0, basePath.length() - 1);
        }

        this.basePath = basePath;
    }

    public void init() {
        if(StringUtils.isBlank(zookeeperIpPort)){
            throw new NullPointerException("zookeeperIpPort");
        }

        ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);
        curatorFramework = CuratorFrameworkFactory.newClient(zookeeperIpPort.trim(), sessionTimeoutMs, connectionTimeoutMs, retryPolicy);
        curatorFramework.start();
        LOG.info("CuratorFramework initialise succeed.");

        if (hasGc) {
            gc();
        }
    }

    public void destroy() {
        gcPaths.clear();
        gcPaths = null;
        gcStop();
        curatorFramework.close();
        curatorFramework = null;
    }

    @Override
    public void gc(String gcPath) {
        if (hasGc && StringUtils.isNotBlank(gcPath)) {
            gcPaths.add(gcPath.trim());
        }
    }

    @Override
    public CuratorFramework getCuratorFramework() {
        return this.curatorFramework;
    }

    @Override
    public String getBasePath() {
        return this.basePath;
    }

    private synchronized void gc() {
        gcStop();

        try {
            scanningGCNodes();
        } catch (Throwable e) {
            LOG.warn(e);
        }

        gcTimerTask = new TimerTask() {
            @Override
            public void run() {
                doingGc();
            }
        };

        Date begin = new Date();
        begin.setTime(begin.getTime() + (10 * 1000L));
        gcTimer = new Timer("lock-gc", true);
        gcTimer.schedule(gcTimerTask, begin, gcIntervalSecond * 1000L);
    }

    private synchronized void gcStop() {
        if (null != gcTimer) {
            gcTimer.cancel();
            gcTimer = null;
        }

        if (null != gcTimerTask) {
            gcTimerTask.cancel();
            gcTimerTask = null;
        }
    }

    private synchronized void scanningGCNodes() throws Exception {
        if (null == curatorFramework.checkExists().forPath(basePath)) {
            return;
        }

        List<String> paths = curatorFramework.getChildren().forPath(basePath);
        if (CollectionUtils.isEmpty(paths)) {
            gcPaths.add(basePath);
            return;
        }

        for (String path : paths) {
            try{
                String tmpPath = basePath + "/" + path;
                if (null == curatorFramework.checkExists().forPath(tmpPath)) {
                    continue;
                }

                gcPaths.add(tmpPath);
            } catch(Throwable e){
                LOG.warn("scanning gc nodes error.", e);
            }
        }
    }

    private synchronized void doingGc() {
        LOG.debug("GC beginning.");

        if (CollectionUtils.isNotEmpty(gcPaths)) {
            for (String path : gcPaths) {
                try {
                    if (null != curatorFramework.checkExists().forPath(path)) {
                        if (CollectionUtils.isEmpty(curatorFramework.getChildren().forPath(path))) {
                            curatorFramework.delete().forPath(path);
                            gcPaths.remove(path);
                            LOG.debug("GC " + path);
                        }
                    } else {
                        gcPaths.remove(path);
                    }
                } catch (Throwable e) {
                    gcPaths.remove(path);
                    LOG.warn(e);
                }
            }
        }

        LOG.debug("GC ended.");
    }

}

SharedLock共享鎖:

package com.XXX.framework.lock.shared;

import java.util.concurrent.TimeUnit;

import org.apache.commons.lang.StringUtils;
import org.apache.curator.framework.recipes.locks.InterProcessLock;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;

import com.XXX.framework.lock.LockZookeeperClient;

/**
 * 
 * description
 * 
 * @author Roadrunners
 * @version 1.0, 2015年7月9日
 */
public class SharedLock {
    private InterProcessLock interProcessLock;

    public SharedLock(LockZookeeperClient lockZookeeperClient, String resourceId) {
        super();

        if (StringUtils.isBlank(resourceId)) {
            throw new NullPointerException("resourceId");
        }
        String path = lockZookeeperClient.getBasePath();
        path += ("/" + resourceId.trim());

        interProcessLock = new InterProcessMutex(lockZookeeperClient.getCuratorFramework(), path);
        lockZookeeperClient.gc(path);
    }

    /**
     * Acquire the mutex - blocking until it's available. Each call to acquire must be balanced by a call
     * to {@link #release()}
     *
     * @throws Exception ZK errors, connection interruptions
     */
    public void acquire() throws Exception {
        interProcessLock.acquire();
    }

    /**
     * Acquire the mutex - blocks until it's available or the given time expires. Each call to acquire that returns true must be balanced by a call
     * to {@link #release()}
     *
     * @param time time to wait
     * @param unit time unit
     * @return true if the mutex was acquired, false if not
     * @throws Exception ZK errors, connection interruptions
     */
    public boolean acquire(long time, TimeUnit unit) throws Exception {
        return interProcessLock.acquire(time, unit);
    }

    /**
     * Perform one release of the mutex.
     *
     * @throws Exception ZK errors, interruptions, current thread does not own the lock
     */
    public void release() throws Exception {
        interProcessLock.release();
    }

    /**
     * Returns true if the mutex is acquired by a thread in this JVM
     *
     * @return true/false
     */
    public boolean isAcquiredInThisProcess() {
        return interProcessLock.isAcquiredInThisProcess();
    }
}

到此代碼已經完成。下面寫一個簡單的Demo:

//LockZookeeperClientFactory通常是通過Spring配置注入的,此處是為了Demo的簡單明了才這樣寫的,不建議這樣寫
        LockZookeeperClientFactory lzc = new LockZookeeperClientFactory();
        lzc.setZookeeperIpPort("10.100.15.1:8900");
        lzc.setBasePath("/locks/sharedLock/");
        lzc.init();

        SharedLock sharedLock = new SharedLock(lzc, "sharedLock1");
        try {
            if (sharedLock.acquire(100, TimeUnit.MILLISECONDS)) {
                System.out.println("sharedLock1 get");
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                sharedLock.release();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        lzc.destroy();

就這樣,系統就會每隔一分鐘去回收一次沒有使用的結點。

 本文由用戶 y35w 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!