zookeeper 分布式鎖的實現

jopen 9年前發布 | 37K 次閱讀 分布式/云計算/大數據 ZooKeeper

zookeeper 分布式鎖的實現

臨時順序節點,這種類型的節點有幾下幾個特性:

  • 節點的生命周期和客戶端會話綁定,即創建節點的客戶端會話一旦失效,那么這個節點也會被清除。

  • 每個父節點都會負責維護其子節點創建的先后順序,并且如果創建的是順序節點(SEQUENTIAL)的話,父節點會自動為這個節點分配一個整形數值,以后綴的形式自動追加到節點名中,作為這個節點最終的節點名。

利用上面這兩個特性,我們來看下獲取實現分布式鎖的基本邏輯:

  1. 客戶端調用create()方法創建名為“_locknode_/guid-lock-”的節點,需要注意的是,這里節點的創建類型需要設置為EPHEMERAL_SEQUENTIAL

  2. 客戶端調用getChildren(“_locknode_”)方法來獲取所有已經創建的子節點,同時在這個節點上注冊上子節點變更通知的Watcher。

  3. 客戶端獲取到所有子節點path之后,如果發現自己在步驟1中創建的節點是所有節點中序號最小的,那么就認為這個客戶端獲得了鎖。

  4. 如果在步驟3中發現自己并非是所有子節點中最小的,說明自己還沒有獲取到鎖,就開始等待,直到下次子節點變更通知的時候,再進行子節點的獲取,判斷是否獲取鎖。

釋放鎖的過程相對比較簡單,就是刪除自己創建的那個子節點即可。

以上信息來自:http://jm-blog.aliapp.com/?p=2554

根據這個思路,來實現基于zookeeper的分布式鎖。

直接貼代碼,如下,如果有不合適的或需要改進的地方,請指教。

package com.usfot;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.ThreadLocalRandom;

/**
 * Shared意味著鎖是全局可見的,客戶端都可以請求鎖。
 * DistributedSharedLock 應該是線程安全的。有待驗證
 * Created by liyanxin on 2015/3/18.
 */
public class DistributedSharedLock implements Watcher {

    private static final String ADDR = "127.0.0.1:2181";
    private static final String LOCK_NODE = "guid-lock-";
    private String rootLockNode; //鎖目錄
    private ZooKeeper zk = null;
    private Integer mutex;
    private Integer currentLock;

    /**
     * 構造函數實現
     * 連接zk服務器
     * 創建zk鎖目錄
     *
     * @param rootLockNode
     */
    public DistributedSharedLock(String rootLockNode) {
        this.rootLockNode = rootLockNode;
        try {
            //連接zk服務器
            zk = new ZooKeeper(ADDR, 10 * 10000, this);
        } catch (IOException e) {
            e.printStackTrace();
        }
        mutex = new Integer(-1);
        // Create ZK node name
        if (zk != null) {
            try {
                //建立根目錄節點
                Stat s = zk.exists(rootLockNode, false);
                if (s == null) {
                    zk.create(rootLockNode, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
                            CreateMode.PERSISTENT);
                }
            } catch (KeeperException e) {
                System.out.println("Keeper exception when instantiating queue: " + e.toString());
            } catch (InterruptedException e) {
                System.out.println("Interrupted exception");
            }
        }
    }

    /**
     * 請求zk服務器,獲得鎖
     *
     * @throws KeeperException
     * @throws InterruptedException
     */
    public void acquire() throws KeeperException, InterruptedException {
        ByteBuffer b = ByteBuffer.allocate(4);
        byte[] value;
        // Add child with value i
        b.putInt(ThreadLocalRandom.current().nextInt(10));
        value = b.array();

        // 創建鎖節點
        String lockName = zk.create(rootLockNode + "/" + LOCK_NODE, value, ZooDefs.Ids.OPEN_ACL_UNSAFE,
                CreateMode.EPHEMERAL_SEQUENTIAL);

        synchronized (mutex) {
            while (true) {
                // 獲得當前鎖節點的number,和所有的鎖節點比較
                Integer acquireLock = new Integer(lockName.substring(lockName.lastIndexOf('-') + 1));
                List<String> childLockNode = zk.getChildren(rootLockNode, true);

                SortedSet<Integer> sortedLock = new TreeSet<Integer>();
                for (String temp : childLockNode) {
                    Integer tempLockNumber = new Integer(temp.substring(temp.lastIndexOf('-') + 1));
                    sortedLock.add(tempLockNumber);
                }

                currentLock = sortedLock.first();

                //如果當前創建的鎖的序號是最小的那么認為這個客戶端獲得了鎖
                if (currentLock >= acquireLock) {
                    System.err.println("thread_name=" + Thread.currentThread().getName() + "|attend lcok|lock_num=" + currentLock);
                    return;
                } else {
                    //沒有獲得鎖則等待下次事件的發生
                    System.err.println("thread_name=" + Thread.currentThread().getName() + "|wait lcok|lock_num=" + currentLock);
                    mutex.wait();
                }
            }
        }
    }


    /**
     * 釋放鎖
     *
     * @throws KeeperException
     * @throws InterruptedException
     */
    public void release() throws KeeperException, InterruptedException {
        String lockName = String.format("%010d", currentLock);
        zk.delete(rootLockNode + "/" + LOCK_NODE + lockName, -1);
        System.err.println("thread_name=" + Thread.currentThread().getName() + "|release lcok|lock_num=" + currentLock);
    }

    @Override
    public void process(WatchedEvent event) {
        synchronized (mutex) {
            mutex.notify();
        }
    }
}

測試代碼如下,

package com.usfot;

import org.apache.zookeeper.KeeperException;

/**
 * 啟動10個線程,都獲得一個鎖對象,每個線程都有一個鎖對象。
 * 鎖對象請求zk服務器獲得鎖。如果不能獲得鎖,則等待。
 * 當一個線程獲得鎖時,其他線程將等待鎖的釋放。
 * Created by liyanxin on 2015/3/18.
 */
public class DistributedSharedLockTest {

    public static void main(String args[]) {
        for (int i = 0; i < 10; i++) {
            Thread t = new Thread(new Runnable() {
                @Override
                public void run() {
                    DistributedSharedLock lock = new DistributedSharedLock("/_locknode_");
                    try {
                        lock.acquire();
                        Thread.sleep(1000); //獲得鎖之后可以進行相應的處理
                        System.out.println("======獲得鎖后進行相應的操作======");
                        lock.release();
                        System.err.println("=============================");
                    } catch (KeeperException e) {
                        e.printStackTrace();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
            t.start();
        }
    }
}

看一下測試情況,

thread_name=Thread-5|wait lcok|lock_num=300
thread_name=Thread-1|attend lcok|lock_num=300
thread_name=Thread-9|wait lcok|lock_num=300
thread_name=Thread-3|wait lcok|lock_num=300
thread_name=Thread-7|wait lcok|lock_num=300
thread_name=Thread-6|wait lcok|lock_num=300
thread_name=Thread-2|wait lcok|lock_num=300
thread_name=Thread-0|wait lcok|lock_num=300
thread_name=Thread-8|wait lcok|lock_num=300
thread_name=Thread-4|wait lcok|lock_num=300
======獲得鎖后進行相應的操作======
thread_name=Thread-2|wait lcok|lock_num=301
thread_name=Thread-6|wait lcok|lock_num=301
thread_name=Thread-1|release lcok|lock_num=300
=============================
.................................

這是一部分的打印日志。

在換一種方式測試一下鎖對象的線程安全性。如下測試代碼,

package com.usfot;

import org.apache.zookeeper.KeeperException;

/**
 * Created by liyanxin on 2015/3/18.
 */
public class DistributedSharedLockTest2 {


    public static void main(String args[]) {
        final DistributedSharedLock lock = new DistributedSharedLock("/_locknode_");

        /**
         * 所有的線程都共享一個鎖對象,驗證鎖對象的線程安全性
         * 鎖是阻塞的
         */
        for (int i = 0; i < 10; i++) {
            Thread t = new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        lock.acquire();
                        Thread.sleep(1000); //獲得鎖之后可以進行相應的處理
                        System.out.println("======獲得鎖后進行相應的操作======");
                        lock.release();
                        System.err.println("=============================");
                    } catch (KeeperException e) {
                        e.printStackTrace();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
            t.start();
        }
    }
}

經過測試,發現了死鎖deadlock的問題,這個問題如何導致的呢?是因為多個線程都會在mutex對象的內置鎖上發生競爭。當線程A獲得mutex對象的內置鎖時,會進入到同步代碼塊,進行獲取zk服務器的分布式鎖的操作,當獲得分布式鎖后,退出同步代碼塊,mutex的內置鎖也就被線程A釋放。大量的線程都在競爭mutex對象的內置鎖。這時,線程B獲得mutex的內置鎖,進入同步代碼塊,由于沒有獲得分布式鎖,線程B等待。然后這時線程A釋放分布式鎖,刪除zk服務器鎖節點,此時觸發watcher事件,喚醒mutex對象內置鎖上等待的線程,

注意使用notify喚醒。notify大家應該知道,只能喚醒所有等待線程的其中一個,或許剛好此時喚醒的不是線程B,那么deadlock就來了。

怎么解決?我把notify換成notifyAll試了下,程序能順利執行,沒有死鎖的現象。

重新運行代碼,如下日志,

thread_name=Thread-9|wait lcok|lock_num=360
thread_name=Thread-0|wait lcok|lock_num=360
thread_name=Thread-1|wait lcok|lock_num=360
thread_name=Thread-5|wait lcok|lock_num=360
thread_name=Thread-7|wait lcok|lock_num=360
thread_name=Thread-3|wait lcok|lock_num=360
thread_name=Thread-6|wait lcok|lock_num=360
thread_name=Thread-2|attend lcok|lock_num=360
thread_name=Thread-4|wait lcok|lock_num=360
thread_name=Thread-8|wait lcok|lock_num=360
======獲得鎖后進行相應的操作======
thread_name=Thread-2|release lcok|lock_num=360
=============================
thread_name=Thread-8|attend lcok|lock_num=361
thread_name=Thread-4|wait lcok|lock_num=361
thread_name=Thread-6|wait lcok|lock_num=361
..............

一部分的打印日志

參考:http://blog.csdn.net/java2000_wl/article/details/8694270

=====================================================================

改進后的分布式鎖實現

下面是改進后的分布式鎖實現,和之前的實現方式唯一不同之處在于,這里設計成每個鎖競爭者,只需要關注”_locknode_”節點下序號比自己小的那個節點是否存在即可。實現如下:

  1. 客戶端調用create()方法創建名為“_locknode_/guid-lock-”的節點,需要注意的是,這里節點的創建類型需要設置為EPHEMERAL_SEQUENTIAL。

  2. 客戶端調用getChildren(“_locknode_”)方法來獲取所有已經創建的子節點,注意,這里不注冊任何Watcher。

  3. 客戶端獲取到所有子節點path之后,如果發現自己在步驟1中創建的節點序號最小,那么就認為這個客戶端獲得了鎖。

  4. 如果在步驟3中發現自己并非所有子節點中最小的,說明自己還沒有獲取到鎖。此時客戶端需要找到比自己小的那個節點,然后對其調用exist()方法,同時注冊事件監聽。

  5. 之后當這個被關注的節點被移除了,客戶端會收到相應的通知。這個時候客戶端需要再次調用getChildren(“_locknode_”)方法來獲取所有已經創建的子節點,確保自己確實是最小的節點了,然后進入步驟3。

=================================END=================================

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!