zookeeper應用場景練習(數據發布/訂閱)
來自: http://blog.csdn.net/luckyzhoustar/article/details/50573744
前面幾篇博客大致講解了一下有關zookeeper的概念知識,下面結合前面的幾篇博客來講解一下zookeeper的使用場景。
數據發布/訂閱
所謂的配置中心,就是發布者把數據發送到zookeeper的一個或者一系列的節點上,供訂閱者進行訂閱。從而達到動態獲取數據的目的,能夠實現配置信息的集中式管理和數據的動態更新。
一般的類似于發布/訂閱的模式有推和拉的兩種方式,而在zookeeper中,是把這兩種方式進行結合了。客戶端詳服務端注冊自己需要關注的節點,一旦該節點的數據發生變更,那么服務端就會向相應的客戶端發送watcher事件的通知,客戶端接受到這個消息通知后,需要主動的到服務端獲取最新的數據。
案例模擬
下面通過一個案例來模擬一下zookeeper的這個場景的使用。
在平常的開發中,會遇到這樣的需求,系統中需要使用一些通用的配置信息,例如機器的列表信息,運行時開發配置,數據配置信息等。這些全局配置信息通常具備下面這些特性
1.數據量比較小
2.數據內容在運行時會發生變化
3.集群中各個機器共享,配置一致
對于上面中的這些配置,我們一般采取的操作是存取到本地或者內存中,無論采取哪種配置都可以實現相應的操作。但是一旦遇到集群規模比較大的情況的話,兩種方式就不再可取。而我們還需要能夠快速的做到全部配置信息的變更,同時希望變更成本足夠小,因此我們需要一種更為分布式的解決方案。
比如我們把數據庫的相關的信息,供全局使用的信息來管理起來,這時候我們就可以在zookeeper上選取一個數據節點來配置存儲。例如/app1/database_config
/**
* @FileName: PublishTest.java
* @Package:com.zookeeperTest
* @Description: TODO
* @author: LUCKY
* @date:2016年1月24日 下午2:06:08
* @version V1.0
*/
package com.zookeeperTest;
import java.util.concurrent.CountDownLatch;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.RetryNTimes;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @ClassName: PublishTest
* @Description: 發布與訂閱練習
* @author: LUCKY
* @date:2016年1月24日 下午2:06:08
*/
public class PublishTest {
private static Logger logger = LoggerFactory.getLogger(PublishTest.class);
static CuratorFramework client = null;
static final String PATH = "/app1/database_config";
static final String zkAddress = "100.66.154.82:2181";
static final int timeout = 10000;
static CountDownLatch countDownLatch = new CountDownLatch(1);
// 客戶端的監聽配置
static ConnectionStateListener clientListener = new ConnectionStateListener() {
public void stateChanged(CuratorFramework client,
ConnectionState newState) {
if (newState == ConnectionState.CONNECTED) {
logger.info("connected established");
countDownLatch.countDown();
} else if (newState == ConnectionState.LOST) {
logger.info("connection lost,waiting for reconection");
try {
logger.info("reinit---");
reinit();
logger.info("inited---");
} catch (Exception e) {
logger.error("re-inited failed");
}
}
}
};
public static void main(String[] args) throws Exception {
init();
watcherPath(PATH, pathWatcher);
Thread.sleep(Integer.MAX_VALUE);
}
public static void init() throws Exception {
client = CuratorFrameworkFactory.builder().connectString(zkAddress)
.sessionTimeoutMs(timeout)
.retryPolicy(new RetryNTimes(5, 5000)).build();
// 客戶端注冊監聽,進行連接配置
client.getConnectionStateListenable().addListener(clientListener);
client.start();
// 連接成功后,才進行下一步的操作
countDownLatch.await();
}
public static void reinit() {
try {
unregister();
init();
} catch (Exception e) {
// TODO: handle exception
}
}
public static void unregister() {
try {
if (client != null) {
client.close();
client = null;
}
} catch (Exception e) {
logger.info("unregister failed");
}
}
// 對path進行監聽配置
public static String watcherPath(String path, CuratorWatcher watcher)
throws Exception {
byte[] buffer=client.getData().usingWatcher(watcher).forPath(path);
System.out.println(new String(buffer));
return new String(buffer);
}
public static String readPath(String path) throws Exception {
byte[] buffer = client.getData().forPath(path);
return new String(buffer);
}
static CuratorWatcher pathWatcher = new CuratorWatcher() {
public void process(WatchedEvent event) throws Exception {
// 當數據變化后,重新獲取數據信息
if (event.getType() == EventType.NodeDataChanged) {
//獲取更改后的數據,進行相應的業務處理
String value=readPath(event.getPath());
System.out.println(value);
}
}
};
}
本文由用戶 vr4733 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!