基于Zookeeper的分布式共享鎖
首先,說說我們的場景,訂單服務是做成集群的,當兩個以上結點同時收到一個相同訂單的創建指令,這時并發就產生了,系統就會重復創建訂單。等等......場景。這時,分布式共享鎖就閃亮登場了。
共享鎖在同一個進程中是很容易實現的,但在跨進程或者在不同Server之間就不好實現了。Zookeeper就很容易實現。具體的實現原理官網和其它網站也有翻譯,這里就不在贅述了。
官網資料: http://zookeeper.apache.org/doc/r3.4.5/recipes.html
中文資料: https://www.ibm.com/developerworks/cn/opensource/os-cn-zookeeper
詳見Locks章節。
原理都知道了,網上一搜索Apache上面已經有提供了,既然已經有輪子了,哪我們也沒必要重復造輪子了吧!直接使用 Curator 。但是,我們在測試中發現,用于共享鎖的結點無法自動回收,除了最末一級的臨時結點會在鎖釋放和session超時的時候能自動回收外,其它結點均無法自動回收。我們的訂單一天有好幾萬,遇到618和雙十一的時候每天的訂單量超50W,如果結點長期不回收的話,肯定會影響Zookeeper的性能。這時,我們就想到了一句話“自己動手,豐衣足食”。下面直接上代碼:
首先,創建一個Maven工程,在pom文件里導入下面的包:
<dependencies> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.6</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-client</artifactId> <version>2.8.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>2.8.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>2.8.0</version> </dependency> <dependency> <groupId>commons-beanutils</groupId> <artifactId>commons-beanutils</artifactId> <version>1.9.2</version> </dependency> <dependency> <groupId>commons-logging</groupId> <artifactId>commons-logging</artifactId> <version>1.2</version> </dependency> <dependency> <groupId>commons-lang</groupId> <artifactId>commons-lang</artifactId> <version>2.6</version> </dependency> </dependencies>
LockZookeeperClient接口:
package com.XXX.framework.lock; import org.apache.curator.framework.CuratorFramework; /** * * description * * @author Roadrunners * @version 1.0, 2015年7月9日 */ public interface LockZookeeperClient { /** * * @return */ CuratorFramework getCuratorFramework(); /** * * @return */ String getBasePath(); /** * garbage collector * * @param gcPath */ void gc(String gcPath); }
LockZookeeperClient接口的實現LockZookeeperClientFactory:
package com.XXX.framework.lock; import java.util.Date; import java.util.List; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ConcurrentSkipListSet; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; /** * * description * * @author Roadrunners * @version 1.0, 2015年7月9日 */ public class LockZookeeperClientFactory implements LockZookeeperClient { private static final Log LOG = LogFactory.getLog(LockZookeeperClientFactory.class); private boolean hasGc = true; private Timer gcTimer; private TimerTask gcTimerTask; private ConcurrentSkipListSet<String> gcPaths = new ConcurrentSkipListSet<String>(); private int gcIntervalSecond = 60; private CuratorFramework curatorFramework; private String zookeeperIpPort = "localhost:2181"; private int sessionTimeoutMs = 10000; private int connectionTimeoutMs = 10000; private String basePath = "/locks"; public void setHasGc(boolean hasGc) { this.hasGc = hasGc; } public void setGcIntervalSecond(int gcIntervalSecond) { this.gcIntervalSecond = gcIntervalSecond; } public void setZookeeperIpPort(String zookeeperIpPort) { this.zookeeperIpPort = zookeeperIpPort; } public void setSessionTimeoutMs(int sessionTimeoutMs) { this.sessionTimeoutMs = sessionTimeoutMs; } public void setConnectionTimeoutMs(int connectionTimeoutMs) { this.connectionTimeoutMs = connectionTimeoutMs; } public void setBasePath(String basePath) { basePath = basePath.trim(); if (basePath.endsWith("/")) { basePath = basePath.substring(0, basePath.length() - 1); } this.basePath = basePath; } public void init() { if(StringUtils.isBlank(zookeeperIpPort)){ throw new NullPointerException("zookeeperIpPort"); } ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3); curatorFramework = CuratorFrameworkFactory.newClient(zookeeperIpPort.trim(), sessionTimeoutMs, connectionTimeoutMs, retryPolicy); curatorFramework.start(); LOG.info("CuratorFramework initialise succeed."); if (hasGc) { gc(); } } public void destroy() { gcPaths.clear(); gcPaths = null; gcStop(); curatorFramework.close(); curatorFramework = null; } @Override public void gc(String gcPath) { if (hasGc && StringUtils.isNotBlank(gcPath)) { gcPaths.add(gcPath.trim()); } } @Override public CuratorFramework getCuratorFramework() { return this.curatorFramework; } @Override public String getBasePath() { return this.basePath; } private synchronized void gc() { gcStop(); try { scanningGCNodes(); } catch (Throwable e) { LOG.warn(e); } gcTimerTask = new TimerTask() { @Override public void run() { doingGc(); } }; Date begin = new Date(); begin.setTime(begin.getTime() + (10 * 1000L)); gcTimer = new Timer("lock-gc", true); gcTimer.schedule(gcTimerTask, begin, gcIntervalSecond * 1000L); } private synchronized void gcStop() { if (null != gcTimer) { gcTimer.cancel(); gcTimer = null; } if (null != gcTimerTask) { gcTimerTask.cancel(); gcTimerTask = null; } } private synchronized void scanningGCNodes() throws Exception { if (null == curatorFramework.checkExists().forPath(basePath)) { return; } List<String> paths = curatorFramework.getChildren().forPath(basePath); if (CollectionUtils.isEmpty(paths)) { gcPaths.add(basePath); return; } for (String path : paths) { try{ String tmpPath = basePath + "/" + path; if (null == curatorFramework.checkExists().forPath(tmpPath)) { continue; } gcPaths.add(tmpPath); } catch(Throwable e){ LOG.warn("scanning gc nodes error.", e); } } } private synchronized void doingGc() { LOG.debug("GC beginning."); if (CollectionUtils.isNotEmpty(gcPaths)) { for (String path : gcPaths) { try { if (null != curatorFramework.checkExists().forPath(path)) { if (CollectionUtils.isEmpty(curatorFramework.getChildren().forPath(path))) { curatorFramework.delete().forPath(path); gcPaths.remove(path); LOG.debug("GC " + path); } } else { gcPaths.remove(path); } } catch (Throwable e) { gcPaths.remove(path); LOG.warn(e); } } } LOG.debug("GC ended."); } }
SharedLock共享鎖:
package com.XXX.framework.lock.shared; import java.util.concurrent.TimeUnit; import org.apache.commons.lang.StringUtils; import org.apache.curator.framework.recipes.locks.InterProcessLock; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import com.XXX.framework.lock.LockZookeeperClient; /** * * description * * @author Roadrunners * @version 1.0, 2015年7月9日 */ public class SharedLock { private InterProcessLock interProcessLock; public SharedLock(LockZookeeperClient lockZookeeperClient, String resourceId) { super(); if (StringUtils.isBlank(resourceId)) { throw new NullPointerException("resourceId"); } String path = lockZookeeperClient.getBasePath(); path += ("/" + resourceId.trim()); interProcessLock = new InterProcessMutex(lockZookeeperClient.getCuratorFramework(), path); lockZookeeperClient.gc(path); } /** * Acquire the mutex - blocking until it's available. Each call to acquire must be balanced by a call * to {@link #release()} * * @throws Exception ZK errors, connection interruptions */ public void acquire() throws Exception { interProcessLock.acquire(); } /** * Acquire the mutex - blocks until it's available or the given time expires. Each call to acquire that returns true must be balanced by a call * to {@link #release()} * * @param time time to wait * @param unit time unit * @return true if the mutex was acquired, false if not * @throws Exception ZK errors, connection interruptions */ public boolean acquire(long time, TimeUnit unit) throws Exception { return interProcessLock.acquire(time, unit); } /** * Perform one release of the mutex. * * @throws Exception ZK errors, interruptions, current thread does not own the lock */ public void release() throws Exception { interProcessLock.release(); } /** * Returns true if the mutex is acquired by a thread in this JVM * * @return true/false */ public boolean isAcquiredInThisProcess() { return interProcessLock.isAcquiredInThisProcess(); } }
到此代碼已經完成。下面寫一個簡單的Demo:
//LockZookeeperClientFactory通常是通過Spring配置注入的,此處是為了Demo的簡單明了才這樣寫的,不建議這樣寫 LockZookeeperClientFactory lzc = new LockZookeeperClientFactory(); lzc.setZookeeperIpPort("10.100.15.1:8900"); lzc.setBasePath("/locks/sharedLock/"); lzc.init(); SharedLock sharedLock = new SharedLock(lzc, "sharedLock1"); try { if (sharedLock.acquire(100, TimeUnit.MILLISECONDS)) { System.out.println("sharedLock1 get"); } } catch (Exception e) { e.printStackTrace(); } finally { try { sharedLock.release(); } catch (Exception e) { e.printStackTrace(); } } lzc.destroy();
就這樣,系統就會每隔一分鐘去回收一次沒有使用的結點。
本文由用戶 y35w 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!