zookeeper 分布式鎖的實現
zookeeper 分布式鎖的實現
臨時順序節點,這種類型的節點有幾下幾個特性:
-
節點的生命周期和客戶端會話綁定,即創建節點的客戶端會話一旦失效,那么這個節點也會被清除。
-
每個父節點都會負責維護其子節點創建的先后順序,并且如果創建的是順序節點(SEQUENTIAL)的話,父節點會自動為這個節點分配一個整形數值,以后綴的形式自動追加到節點名中,作為這個節點最終的節點名。
利用上面這兩個特性,我們來看下獲取實現分布式鎖的基本邏輯:
-
客戶端調用create()方法創建名為“_locknode_/guid-lock-”的節點,需要注意的是,這里節點的創建類型需要設置為EPHEMERAL_SEQUENTIAL。
-
客戶端調用getChildren(“_locknode_”)方法來獲取所有已經創建的子節點,同時在這個節點上注冊上子節點變更通知的Watcher。
-
客戶端獲取到所有子節點path之后,如果發現自己在步驟1中創建的節點是所有節點中序號最小的,那么就認為這個客戶端獲得了鎖。
-
如果在步驟3中發現自己并非是所有子節點中最小的,說明自己還沒有獲取到鎖,就開始等待,直到下次子節點變更通知的時候,再進行子節點的獲取,判斷是否獲取鎖。
釋放鎖的過程相對比較簡單,就是刪除自己創建的那個子節點即可。
以上信息來自:http://jm-blog.aliapp.com/?p=2554
根據這個思路,來實現基于zookeeper的分布式鎖。
直接貼代碼,如下,如果有不合適的或需要改進的地方,請指教。
package com.usfot; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; import java.io.IOException; import java.nio.ByteBuffer; import java.util.List; import java.util.SortedSet; import java.util.TreeSet; import java.util.concurrent.ThreadLocalRandom; /** * Shared意味著鎖是全局可見的,客戶端都可以請求鎖。 * DistributedSharedLock 應該是線程安全的。有待驗證 * Created by liyanxin on 2015/3/18. */ public class DistributedSharedLock implements Watcher { private static final String ADDR = "127.0.0.1:2181"; private static final String LOCK_NODE = "guid-lock-"; private String rootLockNode; //鎖目錄 private ZooKeeper zk = null; private Integer mutex; private Integer currentLock; /** * 構造函數實現 * 連接zk服務器 * 創建zk鎖目錄 * * @param rootLockNode */ public DistributedSharedLock(String rootLockNode) { this.rootLockNode = rootLockNode; try { //連接zk服務器 zk = new ZooKeeper(ADDR, 10 * 10000, this); } catch (IOException e) { e.printStackTrace(); } mutex = new Integer(-1); // Create ZK node name if (zk != null) { try { //建立根目錄節點 Stat s = zk.exists(rootLockNode, false); if (s == null) { zk.create(rootLockNode, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } catch (KeeperException e) { System.out.println("Keeper exception when instantiating queue: " + e.toString()); } catch (InterruptedException e) { System.out.println("Interrupted exception"); } } } /** * 請求zk服務器,獲得鎖 * * @throws KeeperException * @throws InterruptedException */ public void acquire() throws KeeperException, InterruptedException { ByteBuffer b = ByteBuffer.allocate(4); byte[] value; // Add child with value i b.putInt(ThreadLocalRandom.current().nextInt(10)); value = b.array(); // 創建鎖節點 String lockName = zk.create(rootLockNode + "/" + LOCK_NODE, value, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); synchronized (mutex) { while (true) { // 獲得當前鎖節點的number,和所有的鎖節點比較 Integer acquireLock = new Integer(lockName.substring(lockName.lastIndexOf('-') + 1)); List<String> childLockNode = zk.getChildren(rootLockNode, true); SortedSet<Integer> sortedLock = new TreeSet<Integer>(); for (String temp : childLockNode) { Integer tempLockNumber = new Integer(temp.substring(temp.lastIndexOf('-') + 1)); sortedLock.add(tempLockNumber); } currentLock = sortedLock.first(); //如果當前創建的鎖的序號是最小的那么認為這個客戶端獲得了鎖 if (currentLock >= acquireLock) { System.err.println("thread_name=" + Thread.currentThread().getName() + "|attend lcok|lock_num=" + currentLock); return; } else { //沒有獲得鎖則等待下次事件的發生 System.err.println("thread_name=" + Thread.currentThread().getName() + "|wait lcok|lock_num=" + currentLock); mutex.wait(); } } } } /** * 釋放鎖 * * @throws KeeperException * @throws InterruptedException */ public void release() throws KeeperException, InterruptedException { String lockName = String.format("%010d", currentLock); zk.delete(rootLockNode + "/" + LOCK_NODE + lockName, -1); System.err.println("thread_name=" + Thread.currentThread().getName() + "|release lcok|lock_num=" + currentLock); } @Override public void process(WatchedEvent event) { synchronized (mutex) { mutex.notify(); } } }
測試代碼如下,
package com.usfot; import org.apache.zookeeper.KeeperException; /** * 啟動10個線程,都獲得一個鎖對象,每個線程都有一個鎖對象。 * 鎖對象請求zk服務器獲得鎖。如果不能獲得鎖,則等待。 * 當一個線程獲得鎖時,其他線程將等待鎖的釋放。 * Created by liyanxin on 2015/3/18. */ public class DistributedSharedLockTest { public static void main(String args[]) { for (int i = 0; i < 10; i++) { Thread t = new Thread(new Runnable() { @Override public void run() { DistributedSharedLock lock = new DistributedSharedLock("/_locknode_"); try { lock.acquire(); Thread.sleep(1000); //獲得鎖之后可以進行相應的處理 System.out.println("======獲得鎖后進行相應的操作======"); lock.release(); System.err.println("============================="); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }); t.start(); } } }
看一下測試情況,
thread_name=Thread-5|wait lcok|lock_num=300 thread_name=Thread-1|attend lcok|lock_num=300 thread_name=Thread-9|wait lcok|lock_num=300 thread_name=Thread-3|wait lcok|lock_num=300 thread_name=Thread-7|wait lcok|lock_num=300 thread_name=Thread-6|wait lcok|lock_num=300 thread_name=Thread-2|wait lcok|lock_num=300 thread_name=Thread-0|wait lcok|lock_num=300 thread_name=Thread-8|wait lcok|lock_num=300 thread_name=Thread-4|wait lcok|lock_num=300 ======獲得鎖后進行相應的操作====== thread_name=Thread-2|wait lcok|lock_num=301 thread_name=Thread-6|wait lcok|lock_num=301 thread_name=Thread-1|release lcok|lock_num=300 ============================= .................................
這是一部分的打印日志。
在換一種方式測試一下鎖對象的線程安全性。如下測試代碼,
package com.usfot; import org.apache.zookeeper.KeeperException; /** * Created by liyanxin on 2015/3/18. */ public class DistributedSharedLockTest2 { public static void main(String args[]) { final DistributedSharedLock lock = new DistributedSharedLock("/_locknode_"); /** * 所有的線程都共享一個鎖對象,驗證鎖對象的線程安全性 * 鎖是阻塞的 */ for (int i = 0; i < 10; i++) { Thread t = new Thread(new Runnable() { @Override public void run() { try { lock.acquire(); Thread.sleep(1000); //獲得鎖之后可以進行相應的處理 System.out.println("======獲得鎖后進行相應的操作======"); lock.release(); System.err.println("============================="); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }); t.start(); } } }
經過測試,發現了死鎖deadlock的問題,這個問題如何導致的呢?是因為多個線程都會在mutex對象的內置鎖上發生競爭。當線程A獲得mutex對象的內置鎖時,會進入到同步代碼塊,進行獲取zk服務器的分布式鎖的操作,當獲得分布式鎖后,退出同步代碼塊,mutex的內置鎖也就被線程A釋放。大量的線程都在競爭mutex對象的內置鎖。這時,線程B獲得mutex的內置鎖,進入同步代碼塊,由于沒有獲得分布式鎖,線程B等待。然后這時線程A釋放分布式鎖,刪除zk服務器鎖節點,此時觸發watcher事件,喚醒mutex對象內置鎖上等待的線程,
注意使用notify喚醒。notify大家應該知道,只能喚醒所有等待線程的其中一個,或許剛好此時喚醒的不是線程B,那么deadlock就來了。
怎么解決?我把notify換成notifyAll試了下,程序能順利執行,沒有死鎖的現象。
重新運行代碼,如下日志,
thread_name=Thread-9|wait lcok|lock_num=360 thread_name=Thread-0|wait lcok|lock_num=360 thread_name=Thread-1|wait lcok|lock_num=360 thread_name=Thread-5|wait lcok|lock_num=360 thread_name=Thread-7|wait lcok|lock_num=360 thread_name=Thread-3|wait lcok|lock_num=360 thread_name=Thread-6|wait lcok|lock_num=360 thread_name=Thread-2|attend lcok|lock_num=360 thread_name=Thread-4|wait lcok|lock_num=360 thread_name=Thread-8|wait lcok|lock_num=360 ======獲得鎖后進行相應的操作====== thread_name=Thread-2|release lcok|lock_num=360 ============================= thread_name=Thread-8|attend lcok|lock_num=361 thread_name=Thread-4|wait lcok|lock_num=361 thread_name=Thread-6|wait lcok|lock_num=361 ..............
一部分的打印日志
參考:http://blog.csdn.net/java2000_wl/article/details/8694270
=====================================================================
改進后的分布式鎖實現
下面是改進后的分布式鎖實現,和之前的實現方式唯一不同之處在于,這里設計成每個鎖競爭者,只需要關注”_locknode_”節點下序號比自己小的那個節點是否存在即可。實現如下:
-
客戶端調用create()方法創建名為“_locknode_/guid-lock-”的節點,需要注意的是,這里節點的創建類型需要設置為EPHEMERAL_SEQUENTIAL。
-
客戶端調用getChildren(“_locknode_”)方法來獲取所有已經創建的子節點,注意,這里不注冊任何Watcher。
-
客戶端獲取到所有子節點path之后,如果發現自己在步驟1中創建的節點序號最小,那么就認為這個客戶端獲得了鎖。
-
如果在步驟3中發現自己并非所有子節點中最小的,說明自己還沒有獲取到鎖。此時客戶端需要找到比自己小的那個節點,然后對其調用exist()方法,同時注冊事件監聽。
-
之后當這個被關注的節點被移除了,客戶端會收到相應的通知。這個時候客戶端需要再次調用getChildren(“_locknode_”)方法來獲取所有已經創建的子節點,確保自己確實是最小的節點了,然后進入步驟3。
=================================END=================================