Flume+Kafka收集Docker容器內分布式日志應用實踐

NadineOzc 8年前發布 | 46K 次閱讀 Flume Kafka Docker

 

1 背景和問題

隨著云計算、PaaS平臺的普及,虛擬化、容器化等技術的應用,例如Docker等技術,越來越多的服務會部署在云端。通常,我們需要需要獲取日志,來進行監控、分析、預測、統計等工作,但是云端的服務不是物理的固定資源,日志獲取的難度增加了,以往可以SSH登陸的或者FTP獲取的,現在可不那么容易獲得,但這又是工程師迫切需要的,最典型的場景便是:上線過程中,一切都在GUI化的PaaS平臺點點鼠標完成,但是我們需要結合tail -F、grep等命令來觀察日志,判斷是否上線成功。當然這是一種情況,完善的PaaS平臺會為我們完成這個工作,但是還有非常多的ad-hoc的需求,PaaS平臺無法滿足我們,我們需要日志。本文就給出了在分布式環境下,容器化的服務中的分散日志,如何集中收集的一種方法。

2 設計約束和需求描述

做任何設計之前,都需要明確應用場景、功能需求和非功能需求。

2.1 應用場景

分布式環境下可承載百臺服務器產生的日志,單條數據日志小于1k,最大不超過50k,日志總大小每天小于500G。

2.2 功能需求

1)集中收集所有服務日志。

2)可區分來源,按服務、模塊和天粒度切分。

2.3 非功能需求

1)不侵入服務進程,收集日志功能需獨立部署,占用系統資源可控。

2)實時性,低延遲,從產生日志到集中存儲延遲小于4s。

3)持久化,保留最近N天。

4)盡量遞送日志即可,不要求不丟不重,但比例應該不超過一個閾值(例如萬分之一)。

4)可以容忍不嚴格有序。

5)收集服務屬于線下離線功能,可用性要求不高,全年滿足3個9即可。

3 實現架構

一種方案實現的架構如下圖所示:

3.1 Producer層分析

PaaS平臺內的服務假設部署在Docker容器內,那么為了滿足非功能需求#1,獨立另外一個進程負責收集日志,因此不侵入服務框架和進程。采用 Flume NG 來進行日志的收集,這個開源的組件非常強大,可以看做一種監控、生產增量,并且可以發布、消費的模型,Source就是源,是增量源,Channel是緩沖通道,這里使用內存隊列緩沖區,Sink就是槽,是個消費的地方。容器內的Source就是執行tail -F這個命令的去利用linux的標準輸出讀取增量日志,Sink是一個Kafka的實現,用于推送消息到分布式消息中間件。

3.2 Broker層分析

PaaS平臺內的多個容器,會存在多個Flume NG的客戶端去推送消息到Kafka消息中間件。Kafka是一個吞吐量、性能非常高的消息中間件,采用單個分區按照順序的寫入的方式工作,并且支持按照offset偏移量隨機讀取的特性,因此非常適合做topic發布訂閱模型的實現。這里圖中有多個Kafka,是因為支持集群特性,容器內的Flume NG客戶端可以連接若干個Kafka的broker發布日志,也可以理解為連接若干個topic下的分區,這樣可以實現高吞吐,一來可以在Flume NG內部做打包批量發送來減輕QPS壓力,二來可以分散到多個分區寫入,同時Kafka還會指定replica備份個數,保證寫入某個master后還需要寫入N個備份,這里設置為2,沒有采用常用的分布式系統的3,是因為盡量保證高并發特性,滿足非功能需求中的#4。

3.3 Consumer層分析

消費Kafka增量的也是一個Flume NG,可以看出它的強大之處,在于可以接入任意的數據源,都是可插拔的實現,通過少量配置即可。這里使用Kafka Source訂閱topic,收集過來的日志同樣先入內存緩沖區,之后使用一個File Sink寫入文件,為了滿足功能需求#2,可區分來源,按服務、模塊和天粒度切分,我自己實現了一個Sink,叫做RollingByTypeAndDayFileSink,源代碼放到了 github 上,可以從這個 頁面下載 jar,直接放到flume的lib目錄即可。

4 實踐方法

4.1 容器內配置

Dockerfile

Dockerfile是容器內程序的運行腳本,里面會含有不少docker自帶的命令,下面是要典型的Dockerfile,BASE_IMAGE是一個包含了運行程序以及flume bin的鏡像,比較重要的就是ENTRYPOINT,主要利用supervisord來保證容器內進程的高可用。

FROM ${BASE_IMAGE}
MAINTAINER ${MAINTAINER}
ENV REFRESH_AT ${REFRESH_AT}
RUN mkdir -p /opt/${MODULE_NAME}
ADD ${PACKAGE_NAME} /opt/${MODULE_NAME}/
COPY service.supervisord.conf /etc/supervisord.conf.d/service.supervisord.conf
COPY supervisor-msoa-wrapper.sh /opt/${MODULE_NAME}/supervisor-msoa-wrapper.sh
RUN chmod +x /opt/${MODULE_NAME}/supervisor-msoa-wrapper.sh
RUN chmod +x /opt/${MODULE_NAME}/*.sh
EXPOSE
ENTRYPOINT ["/usr/bin/supervisord", "-c", "/etc/supervisord.conf"]

下面是supervisord的配置文件,執行supervisor-msoa-wrapper.sh腳本。

[program:${MODULE_NAME}]
command=/opt/${MODULE_NAME}/supervisor-msoa-wrapper.sh

下面是supervisor-msoa-wrapper.sh,這個腳本內的start.sh或者stop.sh就是應用程序的啟動和停止腳本,這里的背景是我們的啟停的腳本都是在后臺運行的,因此不會阻塞當前進程,因此直接退出了,Docker就會認為程序結束,因此應用生命周期也結束,這里使用wait命令來進行一個阻塞,這樣就可以保證即使后臺運行的進程,我們可以看似是前臺跑的。

這里加入了flume的運行命令,–conf后面的參數標示會去這個文件夾下面尋找flume-env.sh,里面可以定義JAVA_HOME和JAVA_OPTS。–conf-file指定flume實際的source、channel、sink等的配置。

#! /bin/bash
function shutdown()
{
    date
    echo "Shutting down Service"
    unset SERVICE_PID # Necessary in some cases
    cd /opt/${MODULE_NAME}
    source stop.sh
}
## 停止進程
cd /opt/${MODULE_NAME}
echo "Stopping Service"
source stop.sh
## 啟動進程
echo "Starting Service"
source start.sh
export SERVICE_PID=$!
## 啟動Flume NG agent,等待4s日志由start.sh生成
sleep 4 
nohup /opt/apache-flume-1.6.0-bin/bin/flume-ng agent --conf /opt/apache-flume-1.6.0-bin/conf --conf-file /opt/apache-flume-1.6.0-bin/conf/logback-to-kafka.conf --name a1 -Dflume.root.logger=INFO,console &
# Allow any signal which would kill a process to stop Service
trap shutdown HUP INT QUIT ABRT KILL ALRM TERM TSTP
echo "Waiting for $SERVICE_PID"
wait $SERVICE_PID

Flume配置

source本應該采用 exec source ,執行tailf -F日志文件即可。但是這里使用了一個自行開發的 StaticLinePrefixExecSource ,源代碼可以在 github 上找到。之所以采用自定義的,是因為需要將一些固定的信息傳遞下去,例如服務/模塊的名稱以及分布式服務所在容器的hostname,便于收集方根據這個標記來區分日志。如果這里你發現為什么不用flume的攔截器interceptor來做這個工作,加入header中一些KV不就OK了嗎?這是個小坑,我后續會解釋一下。

例如原來日志的一行為:

[INFO]  2016-03-18 12:59:31,080 [main]  fountain.runner.CustomConsumerFactoryPostProcessor      (CustomConsumerFactoryPostProcessor.java:91)    -Start to init IoC container by loading XML bean definitions from classpath:fountain-consumer-stdout.xml

按照如下配置,那么實際傳遞給Channel的日志為:

service1##$$##m1-ocean-1004.cp  [INFO]  2016-03-18 12:59:31,080 [main]  fountain.runner.CustomConsumerFactoryPostProcessor      (CustomConsumerFactoryPostProcessor.java:91)    -Start to init IoC container by loading XML bean definitions from classpath:fountain-consumer-stdout.xml

channel使用內存緩沖隊列,大小標識可容乃的日志條數(event size),事務可以控制一次性從source以及一次性給sink的批量日志條數,實際內部有個timeout超時,可通過keepAlive參數設置,超時后仍然會推送過去,默認為3s。

sink采用 Kafka sink ,配置broker的list列表以及topic的名稱,需要ACK與否,以及一次性批量發送的日志大小,默認5條一個包,如果并發很大可以把這個值擴大,加大吞吐。

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = com.baidu.unbiz.flume.sink.StaticLinePrefixExecSource
a1.sources.r1.command = tail -F /opt/MODULE_NAME/log/logback.log
a1.sources.r1.channels = c1
a1.sources.r1.prefix=service1
a1.sources.r1.separator=##$$##
a1.sources.r1.suffix=m1-ocean-1004.cp
# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = keplerlog
a1.sinks.k1.brokerList = gzns-cm-201508c02n01.gzns:9092,gzns-cm-201508c02n02.gzn
s:9092
a1.sinks.k1.requiredAcks = 0
a1.sinks.k1.batchSize = 5
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

4.2 Broker配置

參考 Kafka官方的教程 ,這里新建一個名稱叫做keplerlog的topic,備份數量為2,分區為4。

> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 4 --topic keplerlog

制造一些增量信息,例如如下腳本,在終端內可以隨便輸入一些字符串:

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic keplerlog

打開另外一個終端,訂閱topic,確認可以看到producer的輸入的字符串即可,即表示聯通了。

> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic keplerlog --from-beginning

4.3 集中接收日志配置

Flume配置

首先source采用flume官方提供的 KafkaSource ,配置好zookeeper的地址,會去找可用的broker list進行日志的訂閱接收。channel采用內存緩存隊列。sink由于我們的需求是按照服務名稱和日期切分日志,而官方提供的默認 file roll sink ,只能按照時間戳,和時間interval來切分。

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.zookeeperConnect = localhost:2181
a1.sources.r1.topic = keplerlog
a1.sources.r1.batchSize = 5
a1.sources.r1.groupId = flume-collector
a1.sources.r1.kafka.consumer.timeout.ms = 800
# Describe the sink
a1.sinks.k1.type = com.baidu.unbiz.flume.sink.RollingByTypeAndDayFileSink
a1.sinks.k1.channel = c1
a1.sinks.k1.sink.directory = /home/work/data/kepler-log
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

定制版RollingByTypeAndDayFileSink

源代碼見 github 。RollingByTypeAndDayFileSink使用有兩個條件:

1)Event header中必須有timestamp,否則會忽略事件,并且會拋出{@link InputNotSpecifiedException}

2)Event body如果是按照##$$##分隔的,那么把分隔之前的字符串當做模塊名稱(module name)來處理;如果沒有則默認為default文件名。

輸出到本地文件,首先要設置一個跟目錄,通過sink.directory設置。其次根據條件#2中提取出來的module name作為文件名稱前綴,timestamp日志作為文件名稱后綴,例如文件名為portal.20150606或者default.20150703。

規整完的一個文件目錄形式如下,可以看出匯集了眾多服務的日志,并且按照服務名稱、時間進行了區分:

~/data/kepler-log$ ls
authorization.20160512  
default.20160513  
default.20160505 
portal.20160512       
portal.20160505   
portal.20160514

不得不提的兩個坑

坑1

回到前兩節提到的自定義了一個StaticLinePrefixExecSource來進行添加一些前綴的工作。由于要區分來源的服務/模塊名稱,并且按照時間來切分,根據官方flume文檔,完全可以采用如下的Source攔截器配置。例如i1表示時間戳,i2表示默認的靜態變量KV,key=module,value=portal。

a1.sources.r1.interceptors = i2 i1
a1.sources.r1.interceptors.i1.type = timestamp
a1.sources.r1.interceptors.i2.type = static
a1.sources.r1.interceptors.i2.key = module
a1.sources.r1.interceptors.i2.value = portal

但是flume官方默認的 KafkaSource(v1.6.0) 的實現:

 while (eventList.size() < batchUpperLimit &&
               System.currentTimeMillis() < batchEndTime) {
         iterStatus = hasNext();
         if (iterStatus) {
           // get next message
          MessageAndMetadata<byte[], byte[]> messageAndMetadata = it.next();
          kafkaMessage = messageAndMetadata.message();
          kafkaKey = messageAndMetadata.key();

          // Add headers to event (topic, timestamp, and key)
          headers = new HashMap<String, String>();
          headers.put(KafkaSourceConstants.TIMESTAMP,
                  String.valueOf(System.currentTimeMillis()));
          headers.put(KafkaSourceConstants.TOPIC, topic);
          if (kafkaKey != null) {
            headers.put(KafkaSourceConstants.KEY, new String(kafkaKey));
          }
          if (log.isDebugEnabled()) {
            log.debug("Message: {}", new String(kafkaMessage));
          }
          event = EventBuilder.withBody(kafkaMessage, headers);
          eventList.add(event);
        }

可以看出自己重寫了Event header中的KV,丟棄了發送過來的header,因為這個坑的存在因此,tailf -F在event body中在前面指定模塊/服務名稱,然后RollingByTypeAndDayFileSink會按照分隔符切分。否則下游無法能達到KV。

坑2

exec source需要執行tail -F命令來通過標準輸出和標準錯誤一行一行的讀取,但是如果把tail -F封裝在一個腳本中,腳本中再執行一些管道命令,例如tail -F logback.log | awk ‘{print "portal##$$##"$0}’,那么exec source總是會把最近的輸出丟棄掉,導致追加到文件末尾的日志有一些無法總是“姍姍來遲”,除非有新的日志追加,他們才會被“擠”出來。這個問題比較詭異。暫時沒有細致研究。以示后人不要采坑。

5 結語

從這個分布式服務分散日志的集中收集方法,可以看出利用一些開源組件,可以非常方便的解決我們日常工作中所發現的問題,而這個發現問題和解決問題的能力才是工程師的基本素質要求。對于其不滿足需求的,需要具備有鉆研精神,知其然還要知其所以然的去做一些ad-hoc工作,才可以更加好的leverage這些組件。

另外,日志的收集只是起點,利用寶貴的數據,后面的使用場景和想象空間都會非常大,例如

1)利用Spark streaming在一個時間窗口內計算日志,做流量控制和訪問限制。

2)使用awk腳本、scala語言的高級函數做單機的訪問統計分析,或者Hadoop、Spark做大數據的統計分析。

3)除了端口存活和語義監控,利用實時計算處理日志,做ERROR、異常等信息的過濾,實現服務真正的健康保障和預警監控。

4)收集的日志可以通過logstash導入Elastic Search,使用ELK方式做日志查詢使用。

via: http://outofmemory.cn/java/flume-kafka-docker-log-collect-practise

 本文由用戶 NadineOzc 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!