zookeeper應用場景練習(分布式鎖)
來自: http://blog.csdn.net/luckyzhoustar/article/details/50628419
在平常的高并發的程序中,為了保證數據的一致性,因此都會用到鎖,來對當前的線程進行鎖定。在單機操作中,很好做到,比如可以采用Synchronized、Lock或者其他的讀寫多來鎖定當前的線程。但是在分布式的系統中,就很難做到這一點。因此可以采用zookeeper中節點的特性來滿足這一點。大致實現的思路如下。
1.每個客戶端都去zookeeper上創建臨時的順序節點
2.客戶端判斷當前自己創建的節點是不是最小的
3.如果是的話,就獲得了執行當前任務的鎖
4.如果不是的話,就找到比自己小的節點,然后進行監聽,如果被刪除的話,就可以獲得鎖
上面就是大致的實現思路,下面我們來通過代碼來實現一下。
package com.test; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.api.CuratorWatcher; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.curator.retry.RetryNTimes; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class DistributedLock { private String lockName; private final int timeOut = 3000; private final String root = "/locks"; private String myZnode;// 代表當前節點信息 private String waitZnode; private static Logger logger = LoggerFactory .getLogger(DistributedLock.class); private CuratorFramework client; private CountDownLatch latch = new CountDownLatch(1); public DistributedLock(String connectString, String lockName) { this.lockName = lockName; client = CuratorFrameworkFactory.builder().connectionTimeoutMs(timeOut) .connectString(connectString) .retryPolicy(new RetryNTimes(3, 3000)).build(); ConnectionStateListener listener = new ConnectionStateListener() { public void stateChanged(CuratorFramework client, ConnectionState newState) { if (newState == ConnectionState.CONNECTED) { logger.info("連接成功了"); latch.countDown(); } } }; client.getConnectionStateListenable().addListener(listener); client.start(); try { latch.await(); createRoot(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } /** * @Title: 創建根節點root * @Description: TODO * @param * @return void * @throws */ private void createRoot() { try { Stat stat = client.checkExists().forPath(root); if (stat != null) { logger.info("root has already exists"); } else { // 創建跟節點 client.create().creatingParentsIfNeeded().forPath(root); } } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } public void getLocks() { try { myZnode = client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL) .forPath(root + "/" + lockName); logger.info(myZnode + "has created"); // 取出所有的子節點,然后找出比自己小的節點,進行監聽的設置 List<String> subNodes = client.getChildren().forPath(root); // 取出所有帶有lockname的節點信息 List<String> lockObjNodes = new ArrayList<String>(); for (String node : subNodes) { if (node.contains(lockName)) { lockObjNodes.add(node); } } // 對當前節點進行排序 Collections.sort(lockObjNodes); // 判斷當前的節點是不是最小的節點 if (myZnode.equals(root + "/" + lockObjNodes.get(0))) { doAction(); } else { // 找到比自己節點大一的節點進行監聽 String subMyZone = myZnode .substring(myZnode.lastIndexOf("/") + 1); waitZnode = lockObjNodes.get(Collections.binarySearch( lockObjNodes, subMyZone) - 1); // 對節點進行監聽 Stat stat = client.checkExists() .usingWatcher(deleteNodeWatcher).forPath("/"+waitZnode); if (stat != null) { System.out.println(Thread.currentThread().getName() + "處于等待狀態"); } else { doAction(); } } } catch (Exception e) { logger.error(e.getMessage()); } } // 刪除節點的事件監聽 CuratorWatcher deleteNodeWatcher = new CuratorWatcher() { public void process(WatchedEvent event) throws Exception { if (event.getType() == EventType.NodeDeleted) { doAction(); } } }; private void doAction() { System.out.println(Thread.currentThread().getName() + "開始執行"); client.close(); } }
下面來測試一下
/** * @FileName: TestCurrentZk.java * @Package:com.test * @Description: TODO * @author: LUCKY * @date:2016年2月2日 下午11:36:04 * @version V1.0 */ package com.test; /** * @ClassName: TestCurrentZk * @Description: TODO * @author: LUCKY * @date:2016年2月2日 下午11:36:04 */ public class TestCurrentZk { public static void main(String[] args) throws Exception { Thread threads[] = new Thread[10]; for (int i = 0; i < threads.length; i++) { threads[i] = new Thread(new Runnable() { public void run() { ClientTest clientTest = new ClientTest( "100.66.162.36:2181", "locknametest"); clientTest.getLocks(); } }); threads[i].start(); } Thread.sleep(Integer.MAX_VALUE); } }
本文由用戶 casonstart 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!