zookeeper 分布式鎖的實現
zookeeper 分布式鎖的實現
臨時順序節點,這種類型的節點有幾下幾個特性:
-
節點的生命周期和客戶端會話綁定,即創建節點的客戶端會話一旦失效,那么這個節點也會被清除。
-
每個父節點都會負責維護其子節點創建的先后順序,并且如果創建的是順序節點(SEQUENTIAL)的話,父節點會自動為這個節點分配一個整形數值,以后綴的形式自動追加到節點名中,作為這個節點最終的節點名。
利用上面這兩個特性,我們來看下獲取實現分布式鎖的基本邏輯:
-
客戶端調用create()方法創建名為“_locknode_/guid-lock-”的節點,需要注意的是,這里節點的創建類型需要設置為EPHEMERAL_SEQUENTIAL。
-
客戶端調用getChildren(“_locknode_”)方法來獲取所有已經創建的子節點,同時在這個節點上注冊上子節點變更通知的Watcher。
-
客戶端獲取到所有子節點path之后,如果發現自己在步驟1中創建的節點是所有節點中序號最小的,那么就認為這個客戶端獲得了鎖。
-
如果在步驟3中發現自己并非是所有子節點中最小的,說明自己還沒有獲取到鎖,就開始等待,直到下次子節點變更通知的時候,再進行子節點的獲取,判斷是否獲取鎖。
釋放鎖的過程相對比較簡單,就是刪除自己創建的那個子節點即可。
以上信息來自:http://jm-blog.aliapp.com/?p=2554
根據這個思路,來實現基于zookeeper的分布式鎖。
直接貼代碼,如下,如果有不合適的或需要改進的地方,請指教。
package com.usfot;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.ThreadLocalRandom;
/**
* Shared意味著鎖是全局可見的,客戶端都可以請求鎖。
* DistributedSharedLock 應該是線程安全的。有待驗證
* Created by liyanxin on 2015/3/18.
*/
public class DistributedSharedLock implements Watcher {
private static final String ADDR = "127.0.0.1:2181";
private static final String LOCK_NODE = "guid-lock-";
private String rootLockNode; //鎖目錄
private ZooKeeper zk = null;
private Integer mutex;
private Integer currentLock;
/**
* 構造函數實現
* 連接zk服務器
* 創建zk鎖目錄
*
* @param rootLockNode
*/
public DistributedSharedLock(String rootLockNode) {
this.rootLockNode = rootLockNode;
try {
//連接zk服務器
zk = new ZooKeeper(ADDR, 10 * 10000, this);
} catch (IOException e) {
e.printStackTrace();
}
mutex = new Integer(-1);
// Create ZK node name
if (zk != null) {
try {
//建立根目錄節點
Stat s = zk.exists(rootLockNode, false);
if (s == null) {
zk.create(rootLockNode, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
}
} catch (KeeperException e) {
System.out.println("Keeper exception when instantiating queue: " + e.toString());
} catch (InterruptedException e) {
System.out.println("Interrupted exception");
}
}
}
/**
* 請求zk服務器,獲得鎖
*
* @throws KeeperException
* @throws InterruptedException
*/
public void acquire() throws KeeperException, InterruptedException {
ByteBuffer b = ByteBuffer.allocate(4);
byte[] value;
// Add child with value i
b.putInt(ThreadLocalRandom.current().nextInt(10));
value = b.array();
// 創建鎖節點
String lockName = zk.create(rootLockNode + "/" + LOCK_NODE, value, ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
synchronized (mutex) {
while (true) {
// 獲得當前鎖節點的number,和所有的鎖節點比較
Integer acquireLock = new Integer(lockName.substring(lockName.lastIndexOf('-') + 1));
List<String> childLockNode = zk.getChildren(rootLockNode, true);
SortedSet<Integer> sortedLock = new TreeSet<Integer>();
for (String temp : childLockNode) {
Integer tempLockNumber = new Integer(temp.substring(temp.lastIndexOf('-') + 1));
sortedLock.add(tempLockNumber);
}
currentLock = sortedLock.first();
//如果當前創建的鎖的序號是最小的那么認為這個客戶端獲得了鎖
if (currentLock >= acquireLock) {
System.err.println("thread_name=" + Thread.currentThread().getName() + "|attend lcok|lock_num=" + currentLock);
return;
} else {
//沒有獲得鎖則等待下次事件的發生
System.err.println("thread_name=" + Thread.currentThread().getName() + "|wait lcok|lock_num=" + currentLock);
mutex.wait();
}
}
}
}
/**
* 釋放鎖
*
* @throws KeeperException
* @throws InterruptedException
*/
public void release() throws KeeperException, InterruptedException {
String lockName = String.format("%010d", currentLock);
zk.delete(rootLockNode + "/" + LOCK_NODE + lockName, -1);
System.err.println("thread_name=" + Thread.currentThread().getName() + "|release lcok|lock_num=" + currentLock);
}
@Override
public void process(WatchedEvent event) {
synchronized (mutex) {
mutex.notify();
}
}
}
測試代碼如下,
package com.usfot;
import org.apache.zookeeper.KeeperException;
/**
* 啟動10個線程,都獲得一個鎖對象,每個線程都有一個鎖對象。
* 鎖對象請求zk服務器獲得鎖。如果不能獲得鎖,則等待。
* 當一個線程獲得鎖時,其他線程將等待鎖的釋放。
* Created by liyanxin on 2015/3/18.
*/
public class DistributedSharedLockTest {
public static void main(String args[]) {
for (int i = 0; i < 10; i++) {
Thread t = new Thread(new Runnable() {
@Override
public void run() {
DistributedSharedLock lock = new DistributedSharedLock("/_locknode_");
try {
lock.acquire();
Thread.sleep(1000); //獲得鎖之后可以進行相應的處理
System.out.println("======獲得鎖后進行相應的操作======");
lock.release();
System.err.println("=============================");
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
t.start();
}
}
}
看一下測試情況,
thread_name=Thread-5|wait lcok|lock_num=300 thread_name=Thread-1|attend lcok|lock_num=300 thread_name=Thread-9|wait lcok|lock_num=300 thread_name=Thread-3|wait lcok|lock_num=300 thread_name=Thread-7|wait lcok|lock_num=300 thread_name=Thread-6|wait lcok|lock_num=300 thread_name=Thread-2|wait lcok|lock_num=300 thread_name=Thread-0|wait lcok|lock_num=300 thread_name=Thread-8|wait lcok|lock_num=300 thread_name=Thread-4|wait lcok|lock_num=300 ======獲得鎖后進行相應的操作====== thread_name=Thread-2|wait lcok|lock_num=301 thread_name=Thread-6|wait lcok|lock_num=301 thread_name=Thread-1|release lcok|lock_num=300 ============================= .................................
這是一部分的打印日志。
在換一種方式測試一下鎖對象的線程安全性。如下測試代碼,
package com.usfot;
import org.apache.zookeeper.KeeperException;
/**
* Created by liyanxin on 2015/3/18.
*/
public class DistributedSharedLockTest2 {
public static void main(String args[]) {
final DistributedSharedLock lock = new DistributedSharedLock("/_locknode_");
/**
* 所有的線程都共享一個鎖對象,驗證鎖對象的線程安全性
* 鎖是阻塞的
*/
for (int i = 0; i < 10; i++) {
Thread t = new Thread(new Runnable() {
@Override
public void run() {
try {
lock.acquire();
Thread.sleep(1000); //獲得鎖之后可以進行相應的處理
System.out.println("======獲得鎖后進行相應的操作======");
lock.release();
System.err.println("=============================");
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
t.start();
}
}
}
經過測試,發現了死鎖deadlock的問題,這個問題如何導致的呢?是因為多個線程都會在mutex對象的內置鎖上發生競爭。當線程A獲得mutex對象的內置鎖時,會進入到同步代碼塊,進行獲取zk服務器的分布式鎖的操作,當獲得分布式鎖后,退出同步代碼塊,mutex的內置鎖也就被線程A釋放。大量的線程都在競爭mutex對象的內置鎖。這時,線程B獲得mutex的內置鎖,進入同步代碼塊,由于沒有獲得分布式鎖,線程B等待。然后這時線程A釋放分布式鎖,刪除zk服務器鎖節點,此時觸發watcher事件,喚醒mutex對象內置鎖上等待的線程,
注意使用notify喚醒。notify大家應該知道,只能喚醒所有等待線程的其中一個,或許剛好此時喚醒的不是線程B,那么deadlock就來了。
怎么解決?我把notify換成notifyAll試了下,程序能順利執行,沒有死鎖的現象。
重新運行代碼,如下日志,
thread_name=Thread-9|wait lcok|lock_num=360 thread_name=Thread-0|wait lcok|lock_num=360 thread_name=Thread-1|wait lcok|lock_num=360 thread_name=Thread-5|wait lcok|lock_num=360 thread_name=Thread-7|wait lcok|lock_num=360 thread_name=Thread-3|wait lcok|lock_num=360 thread_name=Thread-6|wait lcok|lock_num=360 thread_name=Thread-2|attend lcok|lock_num=360 thread_name=Thread-4|wait lcok|lock_num=360 thread_name=Thread-8|wait lcok|lock_num=360 ======獲得鎖后進行相應的操作====== thread_name=Thread-2|release lcok|lock_num=360 ============================= thread_name=Thread-8|attend lcok|lock_num=361 thread_name=Thread-4|wait lcok|lock_num=361 thread_name=Thread-6|wait lcok|lock_num=361 ..............
一部分的打印日志
參考:http://blog.csdn.net/java2000_wl/article/details/8694270
=====================================================================
改進后的分布式鎖實現
下面是改進后的分布式鎖實現,和之前的實現方式唯一不同之處在于,這里設計成每個鎖競爭者,只需要關注”_locknode_”節點下序號比自己小的那個節點是否存在即可。實現如下:
-
客戶端調用create()方法創建名為“_locknode_/guid-lock-”的節點,需要注意的是,這里節點的創建類型需要設置為EPHEMERAL_SEQUENTIAL。
-
客戶端調用getChildren(“_locknode_”)方法來獲取所有已經創建的子節點,注意,這里不注冊任何Watcher。
-
客戶端獲取到所有子節點path之后,如果發現自己在步驟1中創建的節點序號最小,那么就認為這個客戶端獲得了鎖。
-
如果在步驟3中發現自己并非所有子節點中最小的,說明自己還沒有獲取到鎖。此時客戶端需要找到比自己小的那個節點,然后對其調用exist()方法,同時注冊事件監聽。
-
之后當這個被關注的節點被移除了,客戶端會收到相應的通知。這個時候客戶端需要再次調用getChildren(“_locknode_”)方法來獲取所有已經創建的子節點,確保自己確實是最小的節點了,然后進入步驟3。
=================================END=================================