Apache Curator入門實戰

jopen 9年前發布 | 31K 次閱讀 分布式/云計算/大數據 Apache Curator

Apache Curator入門實戰

Curator是Netflix公司開源的一個Zookeeper客戶端,與Zookeeper提供的原生客戶端相比,Curator的抽象層次更高,簡化了Zookeeper客戶端的開發量。

1.Zookeeper安裝部署

Zookeeper的部署很簡單,如果已經有Java運行環境的話,下載tarball解壓后即可運行。

[root@vm Temp]$ wget http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz
[root@vm Temp]$ tar zxvf zookeeper-3.4.6.tar.gz
[root@vm Temp]$ cd zookeeper-3.4.6

[root@vm zookeeper-3.4.6]$ cp conf/zoo_sample.cfg conf/zoo.cfg [root@vm zookeeper-3.4.6]$ export ZOOKEEPER_HOME=/usr/local/src/zookeeper-3.4.5 [root@vm zookeeper-3.4.6]$ export PATH=$ZOOKEEPER_HOME/bin:$PATH

[root@vm zookeeper-3.4.6]$ bin/zkServer.sh start [root@vm zookeeper-3.4.6]$ bin/zkCli.sh -server 127.0.0.1:2181</pre>

2.客戶端常用操作

用zkCli.sh連接上Zookeeper服務后,用help能列出所有命令:

[root@BC-VM-edce4ac67d304079868c0bb265337bd4 zookeeper-3.4.6]# bin/zkCli.sh -127.0.0.1:2181
Connecting to localhost:2181
2015-06-11 10:55:14,387 [myid:] - INFO  [main:Environment@100] - Client environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT
    ...

[zk: localhost:2181(CONNECTED) 5] help ZooKeeper -server host:port cmd args connect host:port get path [watch] ls path [watch] set path data [version] rmr path delquota [-n|-b] path quit printwatches on|off create [-s] [-e] path data acl stat path [watch] close ls2 path [watch] history listquota path setAcl path acl getAcl path sync path redo cmdno addauth scheme auth delete path [version] setquota -n|-b val path</pre>

下面就試驗一下常用的命令:

  • create:創建路徑結點。
  • ls:查看路徑下的所有結點。
  • get:獲得結點上的值。
  • set:修改結點上的值。
  • delete:刪除結點。
  • </ul>

    [zk: localhost:2181(CONNECTED) 6] create /zktest mydata
    Created /zktest
    [zk: localhost:2181(CONNECTED) 12] ls /
    [zktest, zookeeper]
    [zk: localhost:2181(CONNECTED) 7] ls /zktest
    []
    [zk: localhost:2181(CONNECTED) 13] get /zktest
    mydata
    cZxid = 0x1c
    ctime = Thu Jun 11 10:58:06 CST 2015
    mZxid = 0x1c
    mtime = Thu Jun 11 10:58:06 CST 2015
    pZxid = 0x1c
    cversion = 0
    dataVersion = 0
    aclVersion = 0
    ephemeralOwner = 0x0
    dataLength = 6
    numChildren = 0
    [zk: localhost:2181(CONNECTED) 14] set /zktest junk
    cZxid = 0x1c
    ctime = Thu Jun 11 10:58:06 CST 2015
    mZxid = 0x1f
    mtime = Thu Jun 11 10:59:08 CST 2015
    pZxid = 0x1c
    cversion = 0
    dataVersion = 1
    aclVersion = 0
    ephemeralOwner = 0x0
    dataLength = 4
    numChildren = 0
    [zk: localhost:2181(CONNECTED) 15] delete /zktest
    [zk: localhost:2181(CONNECTED) 16] ls /
    [zookeeper]

    3.用Curator管理Zookeeper

    Curator的Maven依賴如下,一般直接使用curator-recipes就行了,如果需要自己封裝一些底層些的功能的話,例如增加連接管理重試機制等,則可以引入curator-framework包。

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.7.0</version>
        </dependency>

    3.1 Client操作

    利用Curator提供的客戶端API,可以完全實現上面原生客戶端的功能。值得注意的是,Curator采用流式風格API。

    package com.cdai.codebase.bigdata.hadoop.zookeeper.curator;

    import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.RetryNTimes;

    /**

    • Curator framework's client test.
    • Output:
    • $ create /zktest hello
    • $ ls /
    • [zktest, zookeeper]
    • $ get /zktest
    • hello
    • $ set /zktest world
    • $ get /zktest
    • world
    • $ delete /zktest
    • $ ls /
    • [zookeeper] */ public class CuratorClientTest {

      /* Zookeeper info / private static final String ZK_ADDRESS = "192.168.1.100:2181"; private static final String ZK_PATH = "/zktest";

      public static void main(String[] args) throws Exception {

      // 1.Connect to zk
      CuratorFramework client = CuratorFrameworkFactory.newClient(
              ZK_ADDRESS,
              new RetryNTimes(10, 5000)
      );
      client.start();
      System.out.println("zk client start successfully!");
      
      // 2.Client API test
      // 2.1 Create node
      String data1 = "hello";
      print("create", ZK_PATH, data1);
      client.create().
              creatingParentsIfNeeded().
              forPath(ZK_PATH, data1.getBytes());
      
      // 2.2 Get node and data
      print("ls", "/");
      print(client.getChildren().forPath("/"));
      print("get", ZK_PATH);
      print(client.getData().forPath(ZK_PATH));
      
      // 2.3 Modify data
      String data2 = "world";
      print("set", ZK_PATH, data2);
      client.setData().forPath(ZK_PATH, data2.getBytes());
      print("get", ZK_PATH);
      print(client.getData().forPath(ZK_PATH));
      
      // 2.4 Remove node
      print("delete", ZK_PATH);
      client.delete().forPath(ZK_PATH);
      print("ls", "/");
      print(client.getChildren().forPath("/"));
      

      }

      private static void print(String... cmds) {

      StringBuilder text = new StringBuilder("$ ");
      for (String cmd : cmds) {
          text.append(cmd).append(" ");
      }
      System.out.println(text.toString());
      

      }

      private static void print(Object result) {

      System.out.println(
              result instanceof byte[]
                  ? new String((byte[]) result)
                      : result);
      

      }

    }</pre>

    3.2 監聽器

    Curator提供了三種Watcher(Cache)來監聽結點的變化:

    • Path Cache:監視一個路徑下1)孩子結點的創建、2)刪除,3)以及結點數據的更新。產生的事件會傳遞給注冊的PathChildrenCacheListener。
    • Node Cache:監視一個結點的創建、更新、刪除,并將結點的數據緩存在本地。
    • Tree Cache:Path Cache和Node Cache的“合體”,監視路徑下的創建、更新、刪除事件,并緩存路徑下所有孩子結點的數據。
    • </ul>

      下面就測試一下最簡單的Path Watcher:

      package com.cdai.codebase.bigdata.hadoop.zookeeper.curator;

      import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.cache.ChildData; import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode; import org.apache.curator.retry.RetryNTimes;

      /**

      • Curator framework watch test. */ public class CuratorWatcherTest {

        /* Zookeeper info / private static final String ZK_ADDRESS = "192.168.1.100:2181"; private static final String ZK_PATH = "/zktest";

        public static void main(String[] args) throws Exception {

         // 1.Connect to zk
         CuratorFramework client = CuratorFrameworkFactory.newClient(
                 ZK_ADDRESS,
                 new RetryNTimes(10, 5000)
         );
         client.start();
         System.out.println("zk client start successfully!");
        
         // 2.Register watcher
         PathChildrenCache watcher = new PathChildrenCache(
                 client,
                 ZK_PATH,
                 true    // if cache data
         );
         watcher.getListenable().addListener((client1, event) -> {
             ChildData data = event.getData();
             if (data == null) {
                 System.out.println("No data in event[" + event + "]");
             } else {
                 System.out.println("Receive event: "
                         + "type=[" + event.getType() + "]"
                         + ", path=[" + data.getPath() + "]"
                         + ", data=[" + new String(data.getData()) + "]"
                         + ", stat=[" + data.getStat() + "]");
             }
         });
         watcher.start(StartMode.BUILD_INITIAL_CACHE);
         System.out.println("Register zk watcher successfully!");
        
         Thread.sleep(Integer.MAX_VALUE);
        

        }

      }</pre>
      下面是在zkCli.sh中操作時Java程序的輸出:

      Java: zk client start successfully!
      Java: Register zk watcher successfully!

      zkCli: [zk: localhost:2181(CONNECTED) 11] create /zktest/hello mydata Java: Receive event: type=[CHILD_ADDED], path=[/zktest/hello], data=[mydata], stat=[121,121,1434001221097,1434001221097,0,0,0,0,6,0,121]

      zkCli: [zk: localhost:2181(CONNECTED) 12] set /zktest/hello otherdata Java: Receive event: type=[CHILD_UPDATED], path=[/zktest/hello], data=[otherdata], stat=[121,122,1434001221097,1434001228467,1,0,0,0,9,0,121]

      zkCli: [zk: localhost:2181(CONNECTED) 13] delete /zktest/hello Java: Receive event: type=[CHILD_REMOVED], path=[/zktest/hello], data=[otherdata], stat=[121,122,1434001221097,1434001228467,1,0,0,0,9,0,121]</pre>

      4.Curator“菜譜”

      既然Maven包叫做curator-recipes,那說明Curator有它獨特的“菜譜”

      • :包括共享鎖、共享可重入鎖、讀寫鎖等。
      • 選舉:Leader選舉算法。
      • Barrier:阻止分布式計算直至某個條件被滿足的“柵欄”,可以看做JDK Concurrent包中Barrier的分布式實現。
      • 緩存:前面提到過的三種Cache及監聽機制。
      • 持久化結點:連接或Session終止后仍然在Zookeeper中存在的結點。
      • 隊列:分布式隊列、分布式優先級隊列等。
      • </ul>

        4.1 分布式鎖

        分布式編程時,比如最容易碰到的情況就是應用程序在線上多機部署,于是當多個應用同時訪問某一資源時,就需要某種機制去協調它們。例如,現在一臺應用正在rebuild緩存內容,要臨時鎖住某個區域暫時不讓訪問;又比如調度程序每次只想一個任務被一臺應用執行等等。

        下面的程序會啟動兩個線程t1和t2去爭奪鎖,拿到鎖的線程會占用5秒。運行多次可以觀察到,有時是t1先拿到鎖而t2等待,有時又會反過來。 Curator會用我們提供的lock路徑的結點作為全局鎖,這個結點的數據類似這種格式:[_c_64e0811f-9475-44ca-aa36- c1db65ae5350-lock-0000000005],每次獲得鎖時會生成這種串,釋放鎖時清空數據。

        package com.cdai.codebase.bigdata.hadoop.zookeeper.curator;

        import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.retry.RetryNTimes;

        import java.util.concurrent.TimeUnit;

        /**

        • Curator framework's distributed lock test. */ public class CuratorDistrLockTest {

          /* Zookeeper info / private static final String ZK_ADDRESS = "192.168.1.100:2181"; private static final String ZK_LOCK_PATH = "/zktest";

          public static void main(String[] args) throws InterruptedException {

           // 1.Connect to zk
           CuratorFramework client = CuratorFrameworkFactory.newClient(
                   ZK_ADDRESS,
                   new RetryNTimes(10, 5000)
           );
           client.start();
           System.out.println("zk client start successfully!");
          
           Thread t1 = new Thread(() -> {
               doWithLock(client);
           }, "t1");
           Thread t2 = new Thread(() -> {
               doWithLock(client);
           }, "t2");
          
           t1.start();
           t2.start();
          

          }

          private static void doWithLock(CuratorFramework client) {

           InterProcessMutex lock = new InterProcessMutex(client, ZK_LOCK_PATH);
           try {
               if (lock.acquire(10 * 1000, TimeUnit.SECONDS)) {
                   System.out.println(Thread.currentThread().getName() + " hold lock");
                   Thread.sleep(5000L);
                   System.out.println(Thread.currentThread().getName() + " release lock");
               }
           } catch (Exception e) {
               e.printStackTrace();
           } finally {
               try {
                   lock.release();
               } catch (Exception e) {
                   e.printStackTrace();
               }
           }
          
          

          }

        }</pre>

        4.2 Leader選舉

        當集群里的某個服務down機時,我們可能要從slave結點里選出一個作為新的master,這時就需要一套能在分布式環境中自動協調的 Leader選舉方法。Curator提供了LeaderSelector監聽器實現Leader選舉功能。同一時刻,只有一個Listener會進入 takeLeadership()方法,說明它是當前的Leader。注意:當Listener從takeLeadership()退出時就說明它放棄了“Leader身份”,這時Curator會利用Zookeeper再從剩余的Listener中選出一個新的Leader。autoRequeue()方法使放棄 Leadership的Listener有機會重新獲得Leadership,如果不設置的話放棄了的Listener是不會再變成Leader的。

        package com.cdai.codebase.bigdata.hadoop.zookeeper.curator;

        import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.leader.LeaderSelector; import org.apache.curator.framework.recipes.leader.LeaderSelectorListener; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.retry.RetryNTimes; import org.apache.curator.utils.EnsurePath;

        /**

        • Curator framework's leader election test.
        • Output:
        • LeaderSelector-2 take leadership!
        • LeaderSelector-2 relinquish leadership!
        • LeaderSelector-1 take leadership!
        • LeaderSelector-1 relinquish leadership!
        • LeaderSelector-0 take leadership!
        • LeaderSelector-0 relinquish leadership!
        • ... */ public class CuratorLeaderTest {

          /* Zookeeper info / private static final String ZK_ADDRESS = "192.168.1.100:2181"; private static final String ZK_PATH = "/zktest";

          public static void main(String[] args) throws InterruptedException { LeaderSelectorListener listener = new LeaderSelectorListener() {

          @Override
          public void takeLeadership(CuratorFramework client) throws Exception {
              System.out.println(Thread.currentThread().getName() + " take leadership!");
          
              // takeLeadership() method should only return when leadership is being relinquished.
              Thread.sleep(5000L);
          
              System.out.println(Thread.currentThread().getName() + " relinquish leadership!");
          }
          
          @Override
          public void stateChanged(CuratorFramework client, ConnectionState state) {
          }
          

          };

          new Thread(() -> {

          registerListener(listener);
          

          }).start();

          new Thread(() -> {

          registerListener(listener);
          

          }).start();

          new Thread(() -> {

          registerListener(listener);
          

          }).start();

          Thread.sleep(Integer.MAX_VALUE); }

          private static void registerListener(LeaderSelectorListener listener) { // 1.Connect to zk CuratorFramework client = CuratorFrameworkFactory.newClient(

              ZK_ADDRESS,
              new RetryNTimes(10, 5000)
          

          ); client.start();

          // 2.Ensure path try {

          new EnsurePath(ZK_PATH).ensure(client.getZookeeperClient());
          

          } catch (Exception e) {

          e.printStackTrace();
          

          }

          // 3.Register listener LeaderSelector selector = new LeaderSelector(client, ZK_PATH, listener); selector.autoRequeue(); selector.start(); }

        }</pre>
        來自:http://blog.csdn.net/dc_726/article/details/46475633

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!