zookeeper應用場景練習(數據發布/訂閱)

vr4733 8年前發布 | 34K 次閱讀 分布式/云計算/大數據 ZooKeeper

來自: http://blog.csdn.net/luckyzhoustar/article/details/50573744


 

 前面幾篇博客大致講解了一下有關zookeeper的概念知識,下面結合前面的幾篇博客來講解一下zookeeper的使用場景。


 數據發布/訂閱

 所謂的配置中心,就是發布者把數據發送到zookeeper的一個或者一系列的節點上,供訂閱者進行訂閱。從而達到動態獲取數據的目的,能夠實現配置信息的集中式管理和數據的動態更新。

一般的類似于發布/訂閱的模式有推和拉的兩種方式,而在zookeeper中,是把這兩種方式進行結合了。客戶端詳服務端注冊自己需要關注的節點,一旦該節點的數據發生變更,那么服務端就會向相應的客戶端發送watcher事件的通知,客戶端接受到這個消息通知后,需要主動的到服務端獲取最新的數據。


 案例模擬

 下面通過一個案例來模擬一下zookeeper的這個場景的使用。

 在平常的開發中,會遇到這樣的需求,系統中需要使用一些通用的配置信息,例如機器的列表信息,運行時開發配置,數據配置信息等。這些全局配置信息通常具備下面這些特性

 1.數據量比較小

 2.數據內容在運行時會發生變化

 3.集群中各個機器共享,配置一致


 對于上面中的這些配置,我們一般采取的操作是存取到本地或者內存中,無論采取哪種配置都可以實現相應的操作。但是一旦遇到集群規模比較大的情況的話,兩種方式就不再可取。而我們還需要能夠快速的做到全部配置信息的變更,同時希望變更成本足夠小,因此我們需要一種更為分布式的解決方案。

 比如我們把數據庫的相關的信息,供全局使用的信息來管理起來,這時候我們就可以在zookeeper上選取一個數據節點來配置存儲。例如/app1/database_config

/**     
 * @FileName: PublishTest.java   
 * @Package:com.zookeeperTest   
 * @Description: TODO  
 * @author: LUCKY    
 * @date:2016年1月24日 下午2:06:08   
 * @version V1.0     
 */
package com.zookeeperTest;

import java.util.concurrent.CountDownLatch;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.RetryNTimes;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * @ClassName: PublishTest
 * @Description: 發布與訂閱練習
 * @author: LUCKY
 * @date:2016年1月24日 下午2:06:08
 */
public class PublishTest {

    private static Logger logger = LoggerFactory.getLogger(PublishTest.class);
    static CuratorFramework client = null;
    static final String PATH = "/app1/database_config";
    static final String zkAddress = "100.66.154.82:2181";
    static final int timeout = 10000;
    static CountDownLatch countDownLatch = new CountDownLatch(1);
    // 客戶端的監聽配置
static  ConnectionStateListener clientListener = new ConnectionStateListener() {

        public void stateChanged(CuratorFramework client,
                ConnectionState newState) {
            if (newState == ConnectionState.CONNECTED) {
                logger.info("connected established");
                countDownLatch.countDown();
            } else if (newState == ConnectionState.LOST) {
                logger.info("connection lost,waiting for reconection");
                try {
                    logger.info("reinit---");
                    reinit();
                    logger.info("inited---");
                } catch (Exception e) {
                    logger.error("re-inited failed");
                }
            }

        }
    };

    public static void main(String[] args) throws Exception {
        init();
        watcherPath(PATH, pathWatcher);
        Thread.sleep(Integer.MAX_VALUE);
    }

    public static void init() throws Exception {
        client = CuratorFrameworkFactory.builder().connectString(zkAddress)
                .sessionTimeoutMs(timeout)
                .retryPolicy(new RetryNTimes(5, 5000)).build();
        // 客戶端注冊監聽,進行連接配置
        client.getConnectionStateListenable().addListener(clientListener);
        client.start();
        // 連接成功后,才進行下一步的操作
        countDownLatch.await();
    }

    public static void reinit() {
        try {
            unregister();
            init();
        } catch (Exception e) {
            // TODO: handle exception
        }
    }

    public static void unregister() {
        try {
            if (client != null) {
                client.close();
                client = null;
            }
        } catch (Exception e) {
            logger.info("unregister failed");
        }
    }

    // 對path進行監聽配置
    public static String  watcherPath(String path, CuratorWatcher watcher)
            throws Exception {
        byte[] buffer=client.getData().usingWatcher(watcher).forPath(path);
        System.out.println(new String(buffer));
        return new String(buffer);
    }

    public static String readPath(String path) throws Exception {
        byte[] buffer = client.getData().forPath(path);
        return new String(buffer);

    }

    static CuratorWatcher pathWatcher = new CuratorWatcher() {

        public void process(WatchedEvent event) throws Exception {
            // 當數據變化后,重新獲取數據信息
            if (event.getType() == EventType.NodeDataChanged) {
                //獲取更改后的數據,進行相應的業務處理
                String value=readPath(event.getPath());
                System.out.println(value);
            }

        }
    };
}



 上面的代碼就是一個簡單的發布/訂閱的實現。集群中每臺機器在啟動階段,都會到該節點上獲取數據庫的配置信息,同時客戶端還需要在在節點注冊一個數據變更的watcher監聽,一旦該數據節點發生變更,就會受到通知信息。


 本文由用戶 vr4733 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!