日志系統之Flume采集加morphline解析
概述
這段時間花了部分時間在處理 消息總線 跟日志的對接上。這里分享一下在日志采集和日志解析中遇到的一些問題和處理方案。
日志采集-flume
logstash VS flume
首先談談我們在日志采集器上的選型。由于我們選擇采用ElasticSearch作為日志的存儲與搜索引擎。而基于 ELK(ElasticSearch,Logstash,Kibana)的技術棧在日志系統方向又是如此流行,所以把Logstash列入考察對象也是順 理成章,Logstash在幾大主流的日志收集器里算是后起之秀,被Elastic收購之后更加成熟,社區也比較活躍。
Logstash的設計:input,filter,output。flume的設計source,channel,sink,當然flume也有interceptor。具體的設計就不多廢話,大致上都是 拆分 , 解耦 , pipeline(管道) 的思想。同時,它們都支持分布式擴展,比如Logstash既可以作為shipper也可作為indexer,flume可以多個agent組成分布式事件流。
我對flume的接觸早于Logstash。最近調研Logstash的時候,對它強大的filter印象深刻,特別是grok。而之前flume陣營強調最多的是它的source,sink,channel對各種開源組件的擴展支持非常強大。
Logstash固然是一個不錯的,但它采用JRuby語言(一種形似Ruby語法的JVM平臺的語言)實現使得它的 定制性不夠靈活 ,這是我放棄Logstash的主要原因。因為生態的原因,我確實需要Java技術棧提供的擴展性(這里主要目標是將 消息總線 作為日志采集的緩存隊列),而這正是flume的強項。但flume里很少有提及對日志的解析支持,即便有支持正則的interceptor,也只是很有限的查找、替換之類的。經過一番調研發現其實flume提供了這樣一個interceptor——morphline。它可以完成對日志的解析。
日志解析-morphline
morphline簡介
morphline是由flume的母公司cloudera開源的一個ETL框架。它用于構建、改變基于Hadoop進行 ETL(extract、transfer、load)的流式處理程序。(值得一提的是flume是由cloudera捐獻給Apache的,后來經過重 構成了flume-ng)。morphline使得你在構建ETL Job不需要編碼并且不需要大量的MapReduce技巧。
morphline是一個富配置文件可以很簡單得定義一個轉化鏈,用于從任何數據源消費任何類型的數據,處理數據然后加載結果到Hadoop組件中。它用簡單的配置步驟代替了Java編程。
morphline是一個類庫,可以嵌入任何java程序中。morphline是一個內存容器可以存儲轉化命令。這些命令以插件的形式被加載到 morphline中以執行任務,比如加載、解析、轉化或者處理單條記錄。一個記錄是在內存中的名稱-值對的數據結構。而且morphline是可擴展 的,可以集成已存在的功能和第三方系統。
這篇文章不是morphline的軟文,所以更多介紹請移步 cloudera的CDK官方文檔 。
這里有副圖,形象地展示了morphline大致的處理模型:

這里還有一幅圖,展示了在大數據生態系統中,morphline的架構模型:

后來morphline的開發主要由Kite主導,它是構建于Hadoop上的一套抽象的數據模型層的API接口。這里有 kiteSDK關于morphline的文檔說明 。
強大的正則提取器——grok
其實我找morphline就是為了找grok,或者找到一種提供grok的切入口。grok利用正則的解析能力從非結構化的日志數據中提取結構化的字 段。因為Logstash已經提供了一大堆的經過驗證的grok規則,這是Logstash的優勢,如果能夠將這些規則直接在flume里使用,那么將能 夠直接集成Logstash的能力(其實,只要有文本是規則的,正則都能提取出來,但已經有成熟的東西就沒必要自己再花費巨大的功夫去驗證)。這里有 grok的說明文檔 ,就不再過多介紹了。
服務端使用morphline
flume在agent里利用morphline。在client端對日志進行ETL的優勢可以利用客戶端PC分散的計算能力以省去服務端解析的 麻煩,但agent的數量非常之多,而且散布在各個生產服務器上,日志的格式也是五花八門。也就是說,在agent做太多的事情將使得我們在應對改變的時 候缺乏靈活性。所以,我們在客戶端只收集不解析。而在服務端利用morphline對日志進行解析。相當于啟動一個解析服務,從日志采集隊列中提取日志, 用morphline進行解析轉換,然后再將解析過的更結構化的日志發送到索引隊列,等到索引服務將其存入ElasticSearch。整個過程大致如下 圖:
這種異步的基于隊列的pipeline其實跟Storm這樣的流處理器的同步pipeline本質上殊途同歸,都是在利用廉價的PC來平攤計算量。
程序示例
為了在你的程序中使用morphline,首先需要添加對morphline的maven依賴:
<dependency> <groupId>org.kitesdk</groupId> <artifactId>kite-morphlines-all</artifactId> <version>${kite.version}</version> <exclusions> <exclusion> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> </exclusion> </exclusions> <type>pom</type> <optional>true</optional> </dependency>
版本是 1.0.0 。需要注意的是,這里面有些依賴,需要從推ter的倉庫里去下載,所以你懂的:請自備梯子。
示例程序:
private void process(Message message) { msgBuffer.add(message);if (msgBuffer.size() < MESSAGE_BUFFER_SIZE) return; try { Notifications.notifyBeginTransaction(morphline); for (Message msg : msgBuffer) { Event logEvent = GSON.fromJson(new String(msg.getContent()), Event.class); String originalLog = new String(logEvent.getBody()); logEvent.getHeaders().put(MORPHLINE_GROK_FIELD_NAME, originalLog); logEvent.setBody(null); Record record = new Record(); for (Map.Entry<String, String> entry : logEvent.getHeaders().entrySet()) { record.put(entry.getKey(), entry.getValue()); } byte[] bytes = logEvent.getBody(); if (bytes != null && bytes.length > 0) { logger.info("original : " + new String(bytes)); record.put(Fields.ATTACHMENT_BODY, bytes); } Notifications.notifyStartSession(morphline); boolean success = morphline.process(record); if (!success) { logger.error("failed to process record! from : " + morphlineFileAndId); logger.error("record body : " + new String(logEvent.getBody())); } } //do some ETL jobs List<Record> records = this.extract(); List<Event> events = this.transfer(records); this.load(events); } catch (JsonSyntaxException e) { logger.error(e); Notifications.notifyRollbackTransaction(morphline); } finally { //clear buffer and extractor this.extracter.getRecords().clear(); this.msgBuffer.clear(); Notifications.notifyCommitTransaction(morphline); Notifications.notifyShutdown(morphline); }
}</pre>
這里只是部分代碼,展示morphline的大致用法。主要的邏輯在配置文件中:
morphlines : [ { id : morphline1 importCommands : ["org.kitesdk.**"]commands : [ { grok { dictionaryString : """ """ expressions : { original : """""" } extract : true numRequiredMatches : atLeastOnce # default is atLeastOnce findSubstrings : false addEmptyStrings : false } } { logInfo { format : "output record: {}", args : ["@{}"] } } ] }
]</pre>
如上所述,我們最主要的是想利用grok來解析日志,而logstash已經提供了 大量的grok patterns 供你開箱即用,但對于自定義的日志格式類型,你通常都需要自行解析。這里有個 grok 在線debug工具 。
綜述
其實,業界使用flume都是規模較大的互聯網公司,比如美團。它們通常會使用flume+kafka+storm+hadoop生態系統。利用 storm stream做實時解析,利用mapreduce做離線分析,這種高度定制化的使用場景,幾乎不需要flume的agent在客戶端進行解析的能力,因此 flume的morphline也就很少被提及。
但morphline還是不可多得的文本ETL利器,無論你是在采集的時候直接用morphline 做ETL還是在服務端做,flume+morphline加起來帶來的靈活性也不輸Logstash。