zookeeper應用場景練習(數據發布/訂閱)
來自: http://blog.csdn.net/luckyzhoustar/article/details/50573744
前面幾篇博客大致講解了一下有關zookeeper的概念知識,下面結合前面的幾篇博客來講解一下zookeeper的使用場景。
數據發布/訂閱
所謂的配置中心,就是發布者把數據發送到zookeeper的一個或者一系列的節點上,供訂閱者進行訂閱。從而達到動態獲取數據的目的,能夠實現配置信息的集中式管理和數據的動態更新。
一般的類似于發布/訂閱的模式有推和拉的兩種方式,而在zookeeper中,是把這兩種方式進行結合了。客戶端詳服務端注冊自己需要關注的節點,一旦該節點的數據發生變更,那么服務端就會向相應的客戶端發送watcher事件的通知,客戶端接受到這個消息通知后,需要主動的到服務端獲取最新的數據。
案例模擬
下面通過一個案例來模擬一下zookeeper的這個場景的使用。
在平常的開發中,會遇到這樣的需求,系統中需要使用一些通用的配置信息,例如機器的列表信息,運行時開發配置,數據配置信息等。這些全局配置信息通常具備下面這些特性
1.數據量比較小
2.數據內容在運行時會發生變化
3.集群中各個機器共享,配置一致
對于上面中的這些配置,我們一般采取的操作是存取到本地或者內存中,無論采取哪種配置都可以實現相應的操作。但是一旦遇到集群規模比較大的情況的話,兩種方式就不再可取。而我們還需要能夠快速的做到全部配置信息的變更,同時希望變更成本足夠小,因此我們需要一種更為分布式的解決方案。
比如我們把數據庫的相關的信息,供全局使用的信息來管理起來,這時候我們就可以在zookeeper上選取一個數據節點來配置存儲。例如/app1/database_config
/** * @FileName: PublishTest.java * @Package:com.zookeeperTest * @Description: TODO * @author: LUCKY * @date:2016年1月24日 下午2:06:08 * @version V1.0 */ package com.zookeeperTest; import java.util.concurrent.CountDownLatch; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.api.CuratorWatcher; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.curator.retry.RetryNTimes; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher.Event.EventType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * @ClassName: PublishTest * @Description: 發布與訂閱練習 * @author: LUCKY * @date:2016年1月24日 下午2:06:08 */ public class PublishTest { private static Logger logger = LoggerFactory.getLogger(PublishTest.class); static CuratorFramework client = null; static final String PATH = "/app1/database_config"; static final String zkAddress = "100.66.154.82:2181"; static final int timeout = 10000; static CountDownLatch countDownLatch = new CountDownLatch(1); // 客戶端的監聽配置 static ConnectionStateListener clientListener = new ConnectionStateListener() { public void stateChanged(CuratorFramework client, ConnectionState newState) { if (newState == ConnectionState.CONNECTED) { logger.info("connected established"); countDownLatch.countDown(); } else if (newState == ConnectionState.LOST) { logger.info("connection lost,waiting for reconection"); try { logger.info("reinit---"); reinit(); logger.info("inited---"); } catch (Exception e) { logger.error("re-inited failed"); } } } }; public static void main(String[] args) throws Exception { init(); watcherPath(PATH, pathWatcher); Thread.sleep(Integer.MAX_VALUE); } public static void init() throws Exception { client = CuratorFrameworkFactory.builder().connectString(zkAddress) .sessionTimeoutMs(timeout) .retryPolicy(new RetryNTimes(5, 5000)).build(); // 客戶端注冊監聽,進行連接配置 client.getConnectionStateListenable().addListener(clientListener); client.start(); // 連接成功后,才進行下一步的操作 countDownLatch.await(); } public static void reinit() { try { unregister(); init(); } catch (Exception e) { // TODO: handle exception } } public static void unregister() { try { if (client != null) { client.close(); client = null; } } catch (Exception e) { logger.info("unregister failed"); } } // 對path進行監聽配置 public static String watcherPath(String path, CuratorWatcher watcher) throws Exception { byte[] buffer=client.getData().usingWatcher(watcher).forPath(path); System.out.println(new String(buffer)); return new String(buffer); } public static String readPath(String path) throws Exception { byte[] buffer = client.getData().forPath(path); return new String(buffer); } static CuratorWatcher pathWatcher = new CuratorWatcher() { public void process(WatchedEvent event) throws Exception { // 當數據變化后,重新獲取數據信息 if (event.getType() == EventType.NodeDataChanged) { //獲取更改后的數據,進行相應的業務處理 String value=readPath(event.getPath()); System.out.println(value); } } }; }
本文由用戶 vr4733 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!