zookeeper應用場景練習(分布式鎖)

casonstart 8年前發布 | 14K 次閱讀 分布式/云計算/大數據

來自: http://blog.csdn.net/luckyzhoustar/article/details/50628419


 

 在平常的高并發的程序中,為了保證數據的一致性,因此都會用到鎖,來對當前的線程進行鎖定。在單機操作中,很好做到,比如可以采用Synchronized、Lock或者其他的讀寫多來鎖定當前的線程。但是在分布式的系統中,就很難做到這一點。因此可以采用zookeeper中節點的特性來滿足這一點。大致實現的思路如下。

 1.每個客戶端都去zookeeper上創建臨時的順序節點

 2.客戶端判斷當前自己創建的節點是不是最小的

 3.如果是的話,就獲得了執行當前任務的鎖

 4.如果不是的話,就找到比自己小的節點,然后進行監聽,如果被刪除的話,就可以獲得鎖


 上面就是大致的實現思路,下面我們來通過代碼來實現一下。

 

package com.test;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.RetryNTimes;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DistributedLock {

    private String lockName;
    private final int timeOut = 3000;
    private final String root = "/locks";
    private String myZnode;// 代表當前節點信息
    private String waitZnode;
    private static Logger logger = LoggerFactory
            .getLogger(DistributedLock.class);
    private CuratorFramework client;
    private CountDownLatch latch = new CountDownLatch(1);

    public DistributedLock(String connectString, String lockName) {
        this.lockName = lockName;
        client = CuratorFrameworkFactory.builder().connectionTimeoutMs(timeOut)
                .connectString(connectString)
                .retryPolicy(new RetryNTimes(3, 3000)).build();
        ConnectionStateListener listener = new ConnectionStateListener() {

            public void stateChanged(CuratorFramework client,
                    ConnectionState newState) {
                if (newState == ConnectionState.CONNECTED) {
                    logger.info("連接成功了");
                    latch.countDown();
                }
            }
        };

        client.getConnectionStateListenable().addListener(listener);
        client.start();
        try {
            latch.await();
            createRoot();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

    }

    /**
     * @Title: 創建根節點root
     * @Description: TODO
     * @param
     * @return void
     * @throws
     */
    private void createRoot() {
        try {
            Stat stat = client.checkExists().forPath(root);
            if (stat != null) {
                logger.info("root has already exists");
            } else {
                // 創建跟節點
                client.create().creatingParentsIfNeeded().forPath(root);

            }
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    public void getLocks() {

        try {
            myZnode = client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
                    .forPath(root + "/" + lockName);
            logger.info(myZnode + "has created");
            // 取出所有的子節點,然后找出比自己小的節點,進行監聽的設置
            List<String> subNodes = client.getChildren().forPath(root);
            // 取出所有帶有lockname的節點信息
            List<String> lockObjNodes = new ArrayList<String>();
            for (String node : subNodes) {
                if (node.contains(lockName)) {
                    lockObjNodes.add(node);
                }
            }
            // 對當前節點進行排序
            Collections.sort(lockObjNodes);
            // 判斷當前的節點是不是最小的節點
            if (myZnode.equals(root + "/" + lockObjNodes.get(0))) {
                doAction();
            } else {
                // 找到比自己節點大一的節點進行監聽
                String subMyZone = myZnode
                        .substring(myZnode.lastIndexOf("/") + 1);
                waitZnode = lockObjNodes.get(Collections.binarySearch(
                        lockObjNodes, subMyZone) - 1);
                // 對節點進行監聽
                Stat stat = client.checkExists()
                        .usingWatcher(deleteNodeWatcher).forPath("/"+waitZnode);
                if (stat != null) {
                    System.out.println(Thread.currentThread().getName()
                            + "處于等待狀態");
                } else {
                    doAction();
                }
            }
        } catch (Exception e) {
            logger.error(e.getMessage());
        }
    }

    // 刪除節點的事件監聽
    CuratorWatcher deleteNodeWatcher = new CuratorWatcher() {

        public void process(WatchedEvent event) throws Exception {

            if (event.getType() == EventType.NodeDeleted) {
                doAction();
            }
        }
    };

    private void doAction() {
        System.out.println(Thread.currentThread().getName() + "開始執行");
        client.close();
    }
}


 下面來測試一下

 

/**     
 * @FileName: TestCurrentZk.java   
 * @Package:com.test   
 * @Description: TODO  
 * @author: LUCKY    
 * @date:2016年2月2日 下午11:36:04   
 * @version V1.0     
 */
package com.test;

/**
 * @ClassName: TestCurrentZk
 * @Description: TODO
 * @author: LUCKY
 * @date:2016年2月2日 下午11:36:04
 */
public class TestCurrentZk {

    public static void main(String[] args) throws Exception {
        Thread threads[] = new Thread[10];
        for (int i = 0; i < threads.length; i++) {
            threads[i] = new Thread(new Runnable() {
                public void run() {
                    ClientTest clientTest = new ClientTest(
                            "100.66.162.36:2181", "locknametest");
                    clientTest.getLocks();
                }
            });

            threads[i].start();

        }
        Thread.sleep(Integer.MAX_VALUE);
    }
}


 本文由用戶 casonstart 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!