深入淘寶Diamond之客戶端架構解析

uvhq1981 8年前發布 | 10K 次閱讀 數據庫

說明:本文不介紹如何使用Diamond,只介紹Diamond的實現原理

一、什么是Diamond

diamond是淘寶內部使用的一個管理持久配置的系統,它的特點是簡單、可靠、易用,目前淘寶內部絕大多數系統的配置,由diamond來進行統一管理。

diamond為應用系統提供了獲取配置的服務,應用不僅可以在啟動時從diamond獲取相關的配置,而且可以在運行中對配置數據的變化進行感知并獲取變化后的配置數據。

持久配置是指配置數據會持久化到磁盤和數據庫中。

二、Diamond的特點

  • 簡單:整體結構非常簡單,從而減少了出錯的可能性。
  • 可靠:應用方在任何情況下都可以啟動,在承載淘寶核心系統并正常運行一年多以來,沒有出現過任何重大故障。
  • 易用:客戶端使用只需要兩行代碼,暴露的接口都非常簡單,易于理解。

三、Diamond的持久機制

Paste_Image.png

訂閱方獲取配置數據時,直接讀取服務端本地磁盤文件,盡量減少對數據庫壓力。 這種架構用短暫的延時換取最大的性能和一致性,一些配置不能接受延時的情況下,通過API可以獲取數據庫中的最新配置

四、Diamond的容災機制

Diamond作為一個分布式環境下的持久配置系統,有一套完備的容災機制,數據被存儲在:數據庫,服務端磁盤,客戶端緩存目錄,以及可以手工干預的容災目錄。 客戶端通過API獲取配置數據按照固定的順序去不同的數據源獲取數據:容災目錄,服務端磁盤,客戶端緩存。

? 數據庫主庫不可用,可以切換到備庫,Diamond繼續提供服務

? 數據庫主備庫全部不可用,Diamond通過本地緩存可以繼續提供讀服務

? 數據庫主備庫全部不可用,Diamond服務端全部不可用,Diamond客戶端使用緩存目錄繼續運行,支持離線啟動

? 數據庫主備庫全部不可用,Diamond服務端全部不可用,Diamond客戶端緩存數據被刪,可以通過拷貝備份的緩存目錄到容災目錄下繼續使用

五、Diamond的架構圖

 

六、Diamond訂閱端(客戶端)分析

先看一個簡單的客戶端訂閱代碼實現:

public class DiamondTestClient {
    public static DiamondManager manager;
    public static void main(String[] str) {
        initDiamondManager();
    }
    private static void initDiamondManager() {
        manager = new DefaultDiamondManager("group_test", "dataId_test", new ManagerListener() {
            public void receiveConfigInfo(String configInfo) {
                System.out.println("configInfo="+ configInfo);
            }   
        });   
    }  
}

參數的說明:

DefaultDiamondManager有三個參數分別是:groupId,dataId和listener。

group和dataId為String類型,二者結合為diamond-server端保存數據的惟一key

ManagerListener 是客戶端注冊的數據監聽器, 它的作用是在運行中接受變化的配置數據,然后回調receiveConfigInfo()方法,執行客戶端處理數據的邏輯。如果要在運行中對變化的配置數據進行處理,就一定要注冊ManagerListener

我們來看一下DefaultDiamondManager的類圖

Paste_Image.png

DefaultDiamondManager的構造方法代碼如下:

public DefaultDiamondManager(String group, String dataId, ManagerListener managerListener) {
        this.dataId = dataId;
        this.group = group;

        diamondSubscriber = DiamondClientFactory.getSingletonDiamondSubscriber();

        this.managerListeners.add(managerListener);
        ((DefaultSubscriberListener) diamondSubscriber.getSubscriberListener()).addManagerListeners(this.dataId,
            this.group, this.managerListeners);
        diamondSubscriber.addDataId(this.dataId, this.group);
        diamondSubscriber.start();

    }

說明

1、利用工廠類DiamondClientFactory創建單例訂閱者類。

2、將客戶端創建的偵聽器類添加到偵聽器管理list中并注入到新創建的訂閱者類中。

3、為訂閱者設置dataId和groupId。

4、啟動訂閱者線程,開始輪詢消息。

DiamondSubscriber的類圖如下:

Paste_Image.png

執行diamondSubScriber.start()方法直接進入DefaultDiamondSubscriber子類中,先看如下代碼:

/** * 啟動DiamondSubscriber:<br> * 1.阻塞主動獲取所有的DataId配置信息<br> * 2.啟動定時線程定時獲取所有的DataId配置信息<br> */
    public synchronized void start() {
        if (isRun) {
            return;
        }

        if (null == scheduledExecutor || scheduledExecutor.isTerminated()) {
            scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
        }

        localConfigInfoProcessor.start(this.diamondConfigure.getFilePath() + "/" + DATA_DIR);
        serverAddressProcessor = new ServerAddressProcessor(this.diamondConfigure, this.scheduledExecutor);
        serverAddressProcessor.start();

        this.snapshotConfigInfoProcessor =
                new SnapshotConfigInfoProcessor(this.diamondConfigure.getFilePath() + "/" + SNAPSHOT_DIR);
        // 設置domainNamePos值
        randomDomainNamePos();
        initHttpClient();

        // 初始化完畢
        isRun = true;

        if (log.isInfoEnabled()) {
            log.info("當前使用的域名有:" + this.diamondConfigure.getDomainNameList());
        }

        if (MockServer.isTestMode()) {
            bFirstCheck = false;
        }
        else {
            // 設置輪詢間隔時間
            this.diamondConfigure.setPollingIntervalTime(Constants.POLLING_INTERVAL_TIME);
        }
        // 輪詢
        rotateCheckConfigInfo();

        addShutdownHook();
    }

說明:

1、ServerAddressProcessor類從服務端獲取提供服務的地址列表(可能會多個)。

2、randomDomainNamePos這個方法是隨機從服務地址列表中選取一個地址。

3、初始化httpClient客戶端,使用initHttpClient方法。

4、設置讀取配置文件的輪詢時間默認為15秒。

5、rotateCheckConfigInfo這個方法是真正與服務端交互的輪詢方法。

rotateCheckConfigInfo方法的代碼如下:

/** * 循環探測配置信息是否變化,如果變化,則再次向DiamondServer請求獲取對應的配置信息 */
    private void rotateCheckConfigInfo() {
        scheduledExecutor.schedule(new Runnable() {
            public void run() {
                if (!isRun) {
                    log.warn("DiamondSubscriber不在運行狀態中,退出查詢循環");
                    return;
                }
                try {
                    checkLocalConfigInfo();
                    checkDiamondServerConfigInfo();
                    checkSnapshot();
                }
                catch (Exception e) {
                    e.printStackTrace();
                    log.error("循環探測發生異常", e);
                }
                finally {
                    rotateCheckConfigInfo();
                }
            }

        }, bFirstCheck ? 60 : diamondConfigure.getPollingIntervalTime(), TimeUnit.SECONDS);
        bFirstCheck = false;
    }

說明

1、方法內部啟動一個定時線程,默認每隔60秒執行一次。

2、方法內部實際上三個主方法分別是:

  • checkLocalConfigInfo:主要是檢查本地數據是否有更新,如果沒有則返回,有則返回最新數據,并通知客戶端配置的listener。
  • checkDiamondServerConfigInfo:遠程調用服務端,獲取最新修改的配置數據并通知客戶端listener。
  • checkSnapshot:主要是持久化數據信息用的方法。

6.1 checkLocalConfigInfo代碼分析

private void checkLocalConfigInfo() {
        for (Entry<String/* dataId */, ConcurrentHashMap<String/* group */, CacheData>> cacheDatasEntry : cache
            .entrySet()) {
            ConcurrentHashMap<String, CacheData> cacheDatas = cacheDatasEntry.getValue();
            if (null == cacheDatas) {
                continue;
            }
            for (Entry<String, CacheData> cacheDataEntry : cacheDatas.entrySet()) {
                final CacheData cacheData = cacheDataEntry.getValue();
                try {
                    String configInfo = getLocalConfigureInfomation(cacheData);
                    if (null != configInfo) {
                        if (log.isInfoEnabled()) {
                            log.info("本地配置信息被讀取, dataId:" + cacheData.getDataId() + ", group:" + cacheData.getGroup());
                        }
                        popConfigInfo(cacheData, configInfo);
                        continue;
                    }
                    if (cacheData.isUseLocalConfigInfo()) {
                        continue;
                    }
                }
                catch (Exception e) {
                    log.error("向本地索要配置信息的過程拋異常", e);
                }
            }
        }

說明:

1、循環本地緩存數據,比較數據是否更新變化,重點看getLocalConfigureInfomation方法。

2、如果有更新數據則調用popConfigInfo方法通知客戶端listener。

再深入看getLocalConfigureInfomation方法,代碼如下:

// 判斷是否變更,沒有變更,返回null
        if (!filePath.equals(cacheData.getLocalConfigInfoFile())
                || existFiles.get(filePath) != cacheData.getLocalConfigInfoVersion()) {
            String content = FileUtils.getFileContent(filePath);
            cacheData.setLocalConfigInfoFile(filePath);
            cacheData.setLocalConfigInfoVersion(existFiles.get(filePath));
            cacheData.setUseLocalConfigInfo(true);

            if (log.isInfoEnabled()) {
                log.info("本地配置數據發生變化, dataId:" + cacheData.getDataId() + ", group:" + cacheData.getGroup());
            }

            return content;
        }
        else {
            cacheData.setUseLocalConfigInfo(true);

            if (log.isInfoEnabled()) {
                log.debug("本地配置數據沒有發生變化, dataId:" + cacheData.getDataId() + ", group:" + cacheData.getGroup());
            }

            return null;
        }

說明:

這段代碼很關鍵,判斷當前緩存的數據是否持久化的文件數據是否一致,包括版本號,文件路徑等信息,如果服務器端有配置數據更新,客戶端則拿到最新的數據后更新本地文件內容。

popConfigInfo方法的代碼如下:

void popConfigInfo(final CacheData cacheData, final String configInfo) {
        final ConfigureInfomation configureInfomation = new ConfigureInfomation();
        configureInfomation.setConfigureInfomation(configInfo);
        final String dataId = cacheData.getDataId();
        final String group = cacheData.getGroup();
        configureInfomation.setDataId(dataId);
        configureInfomation.setGroup(group);
        cacheData.incrementFetchCountAndGet();
        if (null != this.subscriberListener.getExecutor()) {
            this.subscriberListener.getExecutor().execute(new Runnable() {
                public void run() {
                    try {
                        subscriberListener.receiveConfigInfo(configureInfomation);
                        saveSnapshot(dataId, group, configInfo);
                    }
                    catch (Throwable t) {
                        log.error("配置信息監聽器中有異常,group為:" + group + ", dataId為:" + dataId, t);
                    }
                }
            });
        }
        else {
            try {
                subscriberListener.receiveConfigInfo(configureInfomation);
                saveSnapshot(dataId, group, configInfo);
            }
            catch (Throwable t) {
                log.error("配置信息監聽器中有異常,group為:" + group + ", dataId為:" + dataId, t);
            }
        }
    }

說明:

這段代碼主要是將已經更新的數據通知給客戶端織入的listener程序,使能夠達到最新數據通知給客戶端。

6.2 checkDiamondServerConfigInfo代碼分析

private void checkDiamondServerConfigInfo() {
        Set<String> updateDataIdGroupPairs = checkUpdateDataIds(diamondConfigure.getReceiveWaitTime());
        if (null == updateDataIdGroupPairs || updateDataIdGroupPairs.size() == 0) {
            log.debug("沒有被修改的DataID");
            return;
        }
        // 對于每個發生變化的DataID,都請求一次對應的配置信息
        for (String freshDataIdGroupPair : updateDataIdGroupPairs) {
            int middleIndex = freshDataIdGroupPair.indexOf(WORD_SEPARATOR);
            if (middleIndex == -1)
                continue;
            String freshDataId = freshDataIdGroupPair.substring(0, middleIndex);
            String freshGroup = freshDataIdGroupPair.substring(middleIndex + 1);

            ConcurrentHashMap<String, CacheData> cacheDatas = cache.get(freshDataId);
            if (null == cacheDatas) {
                continue;
            }
            CacheData cacheData = cacheDatas.get(freshGroup);
            if (null == cacheData) {
                continue;
            }
            receiveConfigInfo(cacheData);
        }
    }

說明:

1、通過HttpClient方式從服務端獲取更新過的dataId和groupId集合。

2、根據dataId和groupId再從服務端將相應變化的數據獲取下來。

3、通知客戶端注冊的listener程序。

上面二種方式通知客戶端的listener程序,都是通過allListeners這個屬性獲取的

private final ConcurrentMap<String/* dataId + group */, CopyOnWriteArrayList<ManagerListener>/* listeners */> allListeners =
            new ConcurrentHashMap<String, CopyOnWriteArrayList<ManagerListener>>();

這行代碼就是在最開始的那個客戶端使用的例子中注冊在allListeners中的。

七、Diamond客戶端與服務端交互時序圖

 

來自:http://flychao88.iteye.com/blog/2290561

 

 本文由用戶 uvhq1981 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!