分布式日志收集系統: 非死book Scribe之結構及源碼分析

scribe結構及源碼詳細分析

1.    整體類關系圖

分布式日志收集系統: 非死book Scribe之結構及源碼分析

 

2.    客戶端寫日志序列圖

分布式日志收集系統: 非死book Scribe之結構及源碼分析

 

3.    活動及狀態圖

分布式日志收集系統: 非死book Scribe之結構及源碼分析

 

Scribe活動圖

分布式日志收集系統: 非死book Scribe之結構及源碼分析

4.    啟動代碼詳解

分布式日志收集系統: 非死book Scribe之結構及源碼分析

啟動過程流程圖

(1)       調用setrlimit函數設置能夠打開的最大文件數為65535;

(2)       調用getopt_long函數解析運行scribe所帶參數信息,如-p port指定運行端口號;

(3)       調用srand、time和getpid產生唯一的隨機種子(不知道有什么作用);

(4)       根據端口號和配置文件new一個scribe服務器全局控制器對象g_Handler:scribeHandler類型;

(5)       調用initialize()函數初始化scribe服務器---->設置scribe運行狀態信息---->調用StoreConf的 parseConfig解析配置文件信息(解析過程后面單獨詳解)---->根據解析的配置文件信息載入全局配置信息到程序---->根據解 析的配置文件信息調用configureStore函數配置模型信息---->獲取存儲分類名稱并保存---->根據分類名稱調用 configureStoreCategory建立存儲隊列(包含有存儲類型,具體的消息存入是由相應store完成的);

(6)       調用scribe::startServer()啟動scribe服務器。

說明:上面第五點的箭頭代表調用其他函數實現功能,就是函數一直嵌套下去。啟動代碼的文件是scribe_server.cpp,入口函數是main。

5.    配置文件解析源碼詳解

(1)       配置文件解析入口是在啟動代碼被調用的parseConfig函數,唯一的參數就是配置文件的名稱;

(2)       調用readConfFile函數讀入配置文件到一個字符串隊列中,每一行數據為隊列中的一個值,通過ifstream打開文件流,并getline一行一行的讀入數據,并壓入隊列;

(3)       調用parseStore函數來解析存儲的配置信息,參數是剛才讀入的字符串隊列和this指針(這個參數的作用是把解析的信息存入這個對象中,這個參數 本身意義不大,但是在內部遞歸調用的時候需要新建一個StoreConf的對象存放下一級的配置信息時,就必須傳入這個參數,所以統一考慮這個函數就設計 成兩個參數,第一調用就把this作為參數就可以了);

(4)       在parseStore函數中一行一行的取出,然后去掉注釋和空白。然后判斷這次讀入的行是不是store開始行( )或結束行 ( )。如果是開始行就繼續遞歸parseStore函數解析下一行數據;如果是結束行就解析完畢;如果都不是代表是一個配置 項參數設置(名稱=值),就分別提取出參數名稱和值,并按鍵值對存放入map中。

(5)       配置文件解析完畢,解析的結果就按鍵值對存放在StoreConf的對象中,以后哪一個需要使用參數時直接在里面查找就可以了。

6.    存儲配置詳解

(1)       在啟動代碼詳解中說明了存儲信息的配置是通過configureStore和configureStoreCategory著兩個函數實現的;

(2)       在configureStore函數中根據傳遞進來的StoreConf對象存放的配置信息,解析出此store存放哪個(參數名稱category:單 個分類)或哪幾個(參數名稱categories:多個分類mutil)分類的消息,并將其保持到分類向量中,然后針對單個和多個分類分別創建 StoreQueue對象來執行消息的分發處理;

(3)       單個分類:直接調用configureStoreCategory創建StoreQueue對象;

(4)       多個分類:先調用針對分類列表的調用創建一個StoreQueue對象副本,后然根據分類的數量依次拷貝這個副本創建StoreQueue對象;

(5)       每創建一個StoreQueue對象就對這個對象計數的變量numstores加1操作;

(6)       在configureStoreCategory函數中首先確實是否是一個前綴分類,然后根據model是否為null來決定是拷貝一個 StoreQueue對象還是新建一個StoreQueue對象。如果是拷貝,判斷是否為每一個分類都創建一個線程并且不是默認的分類和前綴分類,如果是 就調用StoreQueue的拷貝構造函數生成一個StoreQueue對象;如果不滿足條件就直接賦值表示已經存在分類了。如果是新建就根據各種條件生 成新建需要的各個參數值調用StoreQueue構造函數生成新對象。接著如果是拷貝的就直接打開StoreQueue(調用StoreQueue的 open),否則需要配置在打開(調用StoreQueue的configureAndOpen)。最后將相應的分類或前綴分類存放入對象的map中,把 新建StoreQueue對象也存放入StoreQueue向量中。

7.    StoreQueue功能詳解

(1)       在4中的(6)中介紹了在configureStoreCategory函數中分別用了構造函數和拷貝構造函數創建StoreQueue對象;

(2)       在StoreQueue構造函數中用初始化列表初始化了各個配置變量,然后調用Store的全局createStore函數創建一個Store對象(后面 詳解Store模塊功能),最后調用storeInitCommon函數初始化用于多線程的互斥和條件變量并創建啟動線程(model為true不創 建);拷貝構造實現同樣功能,只是很多配置變量的初始化直接拷貝;

(3)       一個全局的線程入口函數threadStatic,參數為一個StoreQueue對象,啟動這個線程以后,每個StoreQueue對象調用自己的線程成員函數;

(4)       線程成員函數threadMember開始執行,初始化最后一次檢查存儲的時間為0和最后一次處理消息為當前時間,然后開始處理命令 (StoreCommand描述,這里處理三種CMD_OPEN、CMD_CLOSE和CMD_CONFIGURE),如果是CMD_CONFIGURE 命令就會啟動在線配置(調用函數configureInline實現),在線配置會針對具體的存儲類型配置相應的存儲類型(例如是file存儲就會配置 file存儲相應的參數),調用Store的confige實現(動態綁定到具體的實現類)。接著根據設置的檢查存儲的時間間隔看是否超過,超過就開始執 行存儲檢查(實現函數是Store的periodicCheck,同樣利用多態動態綁定)。下面繼續執行處理消息的任務,兩種情況下都需要處理消息:一是 超過了設置的最大寫入時間間隔;二是消息長度超過了設置的目標長度(緩存功能),如果有失敗的消息沒有處理就先處理失敗的消息。處理消息是調用Store 的handleMessages函數,如果處理失敗調用StoreQueue的processFailedMessages函數將處理失敗的消息保存起 來,以便下次繼續處理,防止消息(或數據)丟失。最后沒有需要處理的消息或命令時讓本線程掛起等待,并根據設置的存儲檢查時間為等待設置超時,以便能夠定 期檢查存儲。

(5)       線程函數在沒有收到CMD_STOP命令會一直執行下去。

8.      Store以及各個繼承子類代碼詳解

(1)       store類:函數createStore根據存儲類型創建相應的子類對象,其他的實現的方法都很簡單,一句話的事,一看就明白,具體處理消息的方法在相應的子類中實現。

(2)       FileStoreBase類:

a)        這個是文件存儲共同的基類,不同的文件格式寫入具體的子類實現;

b)        它的構造函數用函數初始化列表初始化了所有的文件存儲的配置參數,config函數對默認的參數進行重新配置,copyCommon函數復制已有對象的配置信息參數;

c)        Open函數調用子類具體openInternal函數,具體實現子類中介紹;

d)        periodicCheck函數檢查是否符合滾動文件,如果滿足調用滾動文件函數rotateFile;

e)        rotateFile函數調用printStatus函數根據配置是否記錄滾動狀態的信息來決定是否創建并寫入狀態信息到狀態文件,然后調用子類openInternal函數滾動文件創建;

f)         其他一些基本函數實現功能:根據時間配置信息制作完全文件名,制作基本文件名,找最新和最舊文件,制作符號鏈接的完全文件名和基本文件名,找到文件后綴,對齊到塊大小,設置主機子目錄。

(3)       FileStore類:

a)        此類繼承FileStoreBase類,構造函數調用基類構造函數初始化基本配置信息,然后初始化列表初始化此類單獨用的配置參數信息,config函數重新配置默認的參數信息

b)        openInternal函數根據滾動類型(rollPeriod)配置和當前的時間新建存儲的文件名,并根據需要創建相應目錄、符號鏈接文件和緩存文 件;根據創建過程返回信息設置狀態信息等;文件和目錄的創建都是通過FileInterface類提供的接口完成了,具體創建哪種類型的文件(目前只支持 STD和hdfs)由子類實現;

c)        處理消息函數handleMessages是重點功能,首先它確保文件打開,然后調用writeMessages函數將消息寫入文件;

d)        writeMessages函數執行具體的寫入過程,根據配置組合需要寫入消息的字符串通過FileInterface類的write方法寫入文件;

e)        其他函數功能:刪除、替換和讀最老文件,判斷一個時間點的文件是否為空等。

(4)       ThriftFileStore類:和FileStore類的功能基本相同。

(5)       BufferStore類:

a)        構造函數和config參數配置函數和其他存儲類都是同樣的功能,只是初始化和配置的參數都是各自存儲需要的,本類的配置涉及到主從存儲的配置,配置好以 后就調用createStore創建對于的存儲類型,然后根據主從配置采用的存儲類型在調用相應的config配置函數;copy函數復制本類以創建好的 一個對象及它的配置信息;

b)        changeState函數改變buffer存儲的當前狀態(三種:STREAMING、DISCONNECTED和SENDING_BUFFER),每種狀態下處理消息是不同的,所以這個狀態也很重要;

c)        handleMessages函數就是處理消息,根據不同的狀態信息做不同的消息處理,分別調用主存儲和從從存儲的消息處理函數;這里面有很重要的一點內 容是:如果我們設置了自適應算法確定的重試時間的參數,就會調用函數setNewRetryInterval來設置具體的重試時間。這個消息處理函數首先 用主存儲來處理消息,如果處理失敗改變狀態,后面狀態改變了就會執行從存儲來處理消息。

d)        setNewRetryInterval函數設置重試時間;

e)        periodicCheck函數:定期檢查存儲函數;首先檢查主從存儲的存儲,不同的存儲類型有不同的檢查功能;如果現在處于DISCONNECTED狀 態并且現在的時間減去最后一次嘗試打開的時間大于重試時間,就嘗試重新打開主存儲(因為當主存儲不可用的情況下才會進入DISCONNECTED狀態), 根據打開結果重新設置現在的狀態;如果是SENDING_BUFFER狀態并且是刷新流,就判斷存儲隊列的大小是否大于設置的最大存儲隊列大小乘以設置的 某個百分比,如果大于直接返回了保持現在的狀態,以便有時間讓消息可以直接發生到主存儲處理,不用在到本地緩存,提高了一定的效率;后面接著讀取本地緩存 中的文件數據并交給主存儲處理,如果處理成功就刪除本地緩存,否則將這些沒有成功處理的消息重新放回文件,以便以后處理,如果放回本地緩存出錯,這些消息 就丟失,報告一個數據丟失的信息;

f)         其他功能函數:打開、關閉和判斷是否打開等。

(6)       NetworkStore類:

a)        配置、構造函數、copy、open、isOpen、close等和其他存儲分類功能相似;

b)        periodicCheck函數唯一功能就是定期檢查服務器的IP和端口是否改變,如果改變先關閉鏈接,然后重新設置IP和端口,最后在重新打開鏈接;

c)        handleMessages函數,如果消息的長度大于設置的瓶頸值就先發送一個空的消息測試;發送根據配置選擇是否使用連接池。

(7)       BucketStore類:

a)        配置、構造函數、copy、open、isOpen、close等和其他存儲分類功能相似;

b)        createBuckets和createBucketsFromBucket函數根據配置參數和規則創建相應的存儲目錄和文件,為每個配置的bucket創建配置的存儲并配置;

c)        periodicCheck函數:先就bucket的數量生成隨機數序列,然后根據這個序列一次調用每個bucket配置的相應存儲類型存儲檢查函數;

d)        handleMessages函數:首先調用bucketize函數(根據不同配置有不同的算法確定)確定寫入哪一個bucket,然后判斷是否需要移除消息里面的key,需要就移除后寫入,不需要就直接寫入;如果寫入失敗把消息保存起來。

(8)       NullStore類:不將消息記錄下來,只是簡單的留下一個被忽略的記錄。

(9)       MultiStore類:

a)        配置、構造函數、copy、open、isOpen、close等和其他存儲分類功能相似;

b)        periodicCheck函數:

c)        handleMessages函數:分別調用每一個存儲相應的消息處理函數,根據配置決定是有一個處理成功就是成功還是所有的處理成功才算成功;

(10)   CategoryStore類:分別調用每一個存儲相應的存儲檢查函數。

(11)   MultiFileStore類:只有框架,還沒有具體實現什么功能!

(12)   ThriftMultiFileStore類:只有框架,還沒有具體實現什么功能!

9.      File相關(FileInterface、StdFile和HdfsFile)

a)        這幾個類主要實現了文件系統的常用操作,比如創建文件、打開和關閉文件、計算文件長度等;

b)        實現文件系統常用功能主要使用的是boost庫里面處理文件系統的部分庫函數(boost::filesystem);

c)        這些類是最終實現消息寫入文件的地方,和我們平時直接讀寫文件類似,前面幾個模塊介紹了怎樣一步一步到達最后這里,前面消息基本上都是在緩存中處理。

10.總結:今天把以前自己分析scribe的源碼的文檔與大家分享了,里面并沒有涉及到具體的源代碼,算不上真正的源代碼分析,主要介紹了一些源 碼實現的功能,有了這些功能說明,你去看源代碼可能會更加快捷一些!粘貼一些源碼本來不是什么費勁的事情,但是我覺得看源代碼最好還是完整的看或者至少是 一個完整的模塊的去看更好,更能體會源碼設計者的思路、思想和編碼技巧。如果你想更深入理解學習scribe的原理并通過源碼去分析上一篇博文提到的各種 配置選項的用作,那么你可以結合本篇更加詳細去分析scribe源代碼!源代碼可以到google上搜索!

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!