Flume-ng的原理和使用

jopen 9年前發布 | 61K 次閱讀 Flume 分布式/云計算/大數據

1. 介紹

Flume 是 Cloudera 提供的日志收集系統,具有分布式、高可靠、高可用性等特點,對海量日志采集、聚合和傳輸,Flume 支持在日志系統中定制各類數據發送方,同時,Flume提供對數據進行簡單處理,并寫到各種數據接受方的能力。

Flume 使用 java 編寫,其需要運行在 Java1.6 或更高版本之上。

2. 架構

2.1 數據流

Flume 的核心是把數據從數據源收集過來,再送到目的地。為了保證輸送一定成功,在送到目的地之前,會先緩存數據,待數據真正到達目的地后,刪除自己緩存的數據。

Flume 傳輸的數據的基本單位是 Event,如果是文本文件,通常是一行記錄,這也是事務的基本單位。Event 從 Source,流向 Channel,再到 Sink,本身為一個 byte 數組,并可攜帶 headers 信息。Event 代表著一個數據流的最小完整單元,從外部數據源來,向外部的目的地去。

Flume 運行的核心是 Agent。它是一個完整的數據收集工具,含有三個核心組件,分別是 source、channel、sink。通過這些組件,Event 可以從一個地方流向另一個地方,如下圖所示。

  • source 可以接收外部源發送過來的數據。不同的 source,可以接受不同的數據格式。比如有目錄池(spooling directory)數據源,可以監控指定文件夾中的新文件變化,如果目錄中有文件產生,就會立刻讀取其內容。
  • channel 是一個存儲地,接收 source 的輸出,直到有 sink 消費掉 channel 中的數據。channel 中的數據直到進入到下一個channel中或者進入終端才會被刪除。當 sink 寫入失敗后,可以自動重啟,不會造成數據丟失,因此很可靠。
  • sink 會消費 channel 中的數據,然后送給外部源或者其他 source。如數據可以寫入到 HDFS 或者 HBase 中。

flume 允許多個 agent 連在一起,形成前后相連的多級跳。

2.2 核心組件

2.2.1 source

Client端操作消費數據的來源,Flume 支持 Avro,log4j,syslog 和 http post(body為json格式)。可以讓應用程序同已有的Source直接打交道,如AvroSource,SyslogTcpSource。也可以 寫一個 Source,以 IPC 或 RPC 的方式接入自己的應用,Avro和 Thrift 都可以(分別有 NettyAvroRpcClient 和 ThriftRpcClient 實現了 RpcClient接口),其中 Avro 是默認的 RPC 協議。具體代碼級別的 Client 端數據接入,可以參考官方手冊。

對現有程序改動最小的使用方式是使用是直接讀取程序原來記錄的日志文件,基本可以實現無縫接入,不需要對現有程序進行任何改動。
對于直接讀取文件 Source,有兩種方式:

  • ExecSource: 以運行 Linux 命令的方式,持續的輸出最新的數據,如tail -F 文件名指令,在這種方式下,取的文件名必須是指定的。 ExecSource 可以實現對日志的實時收集,但是存在Flume不運行或者指令執行出錯時,將無法收集到日志數據,無法保證日志數據的完整性。
  • SpoolSource: 監測配置的目錄下新增的文件,并將文件中的數據讀取出來。需要注意兩點:拷貝到 spool 目錄下的文件不可以再打開編輯;spool 目錄下不可包含相應的子目錄。

SpoolSource 雖然無法實現實時的收集數據,但是可以使用以分鐘的方式分割文件,趨近于實時。

如果應用無法實現以分鐘切割日志文件的話, 可以兩種收集方式結合使用。 在實際使用的過程中,可以結合 log4j 使用,使用 log4j的時候,將 log4j 的文件分割機制設為1分鐘一次,將文件拷貝到spool的監控目錄。

log4j 有一個 TimeRolling 的插件,可以把 log4j 分割文件到 spool 目錄。基本實現了實時的監控。Flume 在傳完文件之后,將會修改文件的后綴,變為 .COMPLETED(后綴也可以在配置文件中靈活指定)

2.2.2 Channel

當前有幾個 channel 可供選擇,分別是 Memory Channel, JDBC Channel , File Channel,Psuedo Transaction Channel。比較常見的是前三種 channel。

  • MemoryChannel 可以實現高速的吞吐,但是無法保證數據的完整性。
  • MemoryRecoverChannel 在官方文檔的建議上已經建義使用FileChannel來替換。
  • FileChannel保證數據的完整性與一致性。在具體配置FileChannel時,建議FileChannel設置的目錄和程序日志文件保存的目錄設成不同的磁盤,以便提高效率。

File Channel 是一個持久化的隧道(channel),它持久化所有的事件,并將其存儲到磁盤中。因此,即使 Java 虛擬機當掉,或者操作系統崩潰或重啟,再或者事件沒有在管道中成功地傳遞到下一個代理(agent),這一切都不會造成數據丟失。Memory Channel 是一個不穩定的隧道,其原因是由于它在內存中存儲所有事件。如果 java 進程死掉,任何存儲在內存的事件將會丟失。另外,內存的空間收到 RAM大小的限制,而 File Channel 這方面是它的優勢,只要磁盤空間足夠,它就可以將所有事件數據存儲到磁盤上。

2.2.3 sink

Sink在設置存儲數據時,可以向文件系統、數據庫、hadoop存數據,在日志數據較少時,可以將數據存儲在文件系中,并且設定一定的時間間隔保存數據。在日志數據較多時,可以將相應的日志數據存儲到Hadoop中,便于日后進行相應的數據分析.

更多sink的內容可以參考官方手冊

2.3 可靠性

Flume 的核心是把數據從數據源收集過來,再送到目的地。為了保證輸送一定成功,在送到目的地之前,會先緩存數據,待數據真正到達目的地后,刪除自己緩存的數據。

Flume 使用事務性的方式保證傳送Event整個過程的可靠性。Sink 必須在 Event 被存入 Channel 后,或者,已經被傳達到下一站agent里,又或者,已經被存入外部數據目的地之后,才能把 Event 從 Channel 中 remove 掉。這樣數據流里的 event 無論是在一個 agent 里還是多個 agent 之間流轉,都能保證可靠,因為以上的事務保證了 event 會被成功存儲起來。而 Channel 的多種實現在可恢復性上有不同的保證。也保證了 event 不同程度的可靠性。比如 Flume 支持在本地保存一份文件 channel 作為備份,而memory channel 將 event 存在內存 queue 里,速度快,但丟失的話無法恢復。

2.4 可恢復性

3. 安裝和使用

Flume 的 rpm 安裝方式很簡單,這里不做說明。

示例1: avro 數據源

安裝成功之后,在 /etc/flume/conf 目錄創建f1.conf 文件,內容如下:

agent-1.channels.ch-1.type = memory

agent-1.sources.avro-source1.channels = ch-1
agent-1.sources.avro-source1.type = avro
agent-1.sources.avro-source1.bind = 0.0.0.0
agent-1.sources.avro-source1.port = 41414
agent-1.sources.avro-source1.threads = 5

agent-1.sinks.log-sink1.channel = ch-1
agent-1.sinks.log-sink1.type = logger

agent-1.channels = ch-1
agent-1.sources = avro-source1
agent-1.sinks = log-sink1


關于 avro-source 配置說明,請參考 avro-source

接下來啟動 agent:

$ flume-ng agent -c /etc/flume-ng/conf -f /etc/flume-ng/conf/f1.conf -Dflume.root.logger=DEBUG,console -n agent-1


參數說明:

  • -n指定agent名稱
  • -c指定配置文件目錄
  • -f指定配置文件
  • -Dflume.root.logger=DEBUG,console設置日志等級

下面可以啟動一個 avro-client 客戶端生產數據:

$ flume-ng avro-client -c /etc/flume-ng/conf -H localhost -p 41414 -F /etc/passwd -Dflume.root.logger=DEBUG,console


示例2:spooldir 數據源

在 /etc/flume/conf 目錄創建 f2.conf 文件,內容如下:

agent-1.channels = ch-1
agent-1.sources = src-1

agent-1.channels.ch-1.type = memory

agent-1.sources.src-1.type = spooldir
agent-1.sources.src-1.channels = ch-1
agent-1.sources.src-1.spoolDir = /root/log
agent-1.sources.src-1.fileHeader = true

agent-1.sinks.log-sink1.channel = ch-1
agent-1.sinks.log-sink1.type = logger

agent-1.sinks = log-sink1


關于 Spooling Directory Source 配置說明,請參考 Spooling Directory Source

接下來啟動 agent:

$ flume-ng agent -c /etc/flume-ng/conf -f /etc/flume-ng/conf/f2.conf -Dflume.root.logger=DEBUG,console -n agent-1


然后,手動拷貝一個文件到 /root/log 目錄,觀察日志輸出以及/root/log 目錄下的變化。

示例3:spooldir 數據源,寫入 hdfs

在 /etc/flume/conf 目錄創建 f3.conf 文件,內容如下:

agent-1.channels.ch-1.type = file
agent-1.channels.ch-1.checkpointDir= /root/checkpoint
agent-1.channels.ch-1.dataDirs= /root/data

agent-1.sources.src-1.type = spooldir
agent-1.sources.src-1.channels = ch-1
agent-1.sources.src-1.spoolDir = /root/log
agent-1.sources.src-1.deletePolicy= never
agent-1.sources.src-1.fileHeader = true

agent-1.sources.src-1.interceptors =i1
agent-1.sources.src-1.interceptors.i1.type = timestamp

agent-1.sinks.sink_hdfs.channel = ch-1
agent-1.sinks.sink_hdfs.type = hdfs
agent-1.sinks.sink_hdfs.hdfs.path = hdfs://cdh1:8020/user/root/events/%Y-%m-%d
agent-1.sinks.sink_hdfs.hdfs.filePrefix = logs
agent-1.sinks.sink_hdfs.hdfs.inUsePrefix = .
agent-1.sinks.sink_hdfs.hdfs.rollInterval = 30
agent-1.sinks.sink_hdfs.hdfs.rollSize = 0
agent-1.sinks.sink_hdfs.hdfs.rollCount = 0
agent-1.sinks.sink_hdfs.hdfs.batchSize = 1000
agent-1.sinks.sink_hdfs.hdfs.writeFormat = text
agent-1.sinks.sink_hdfs.hdfs.fileType = DataStream
#agent-1.sinks.sink_hdfs.hdfs.fileType = CompressedStream
#agent-1.sinks.sink_hdfs.hdfs.codeC = lzop

agent-1.channels = ch-1
agent-1.sources = src-1
agent-1.sinks = sink_hdfs


關于 HDFS Sink配置說明,請參考 HDFS Sink

說明:

  1. 通過 interceptors 往 header 里添加 timestamp,這樣做,可以在 hdfs.path 引用系統內部的時間變量或者主機的 hostname。
  2. 通過設置hdfs.inUsePrefix,例如設置為.時,hdfs 會把該文件當做隱藏文件,以避免在 mr 過程中讀到這些臨時文件,引起一些錯誤
  3. 如果使用 lzo 壓縮,則需要手動創建 lzo 索引,可以通過修改 HdfsSink 的代碼,通過代碼創建索引
  4. FileChannel 的目錄最好是和 spooldir 的數據目錄處于不同磁盤。

示例4:spooldir 數據源,寫入 HBase

關于 HBase Sink 配置說明,請參考 HBase Sink

4. 開發相關

4.1 編譯源代碼

從 github 下載源代碼并編譯:

$ git clone git@github.com:cloudera/flume-ng.git -b cdh4-1.4.0_4.7.0
$ cd flume-ng
$ mvn install -DskipTests -Phadoop-2


如果提示找不到 hadoop-test 的 jar 包,則修改 pom.xml 中的版本,如改為2.0.0-mr1-cdh4.7.0,具體版本視你使用的分支版本而定,我這里是 cdh4.7.0。

如果提示找不到 uanodeset-parser 的 jarb,則在 pom.xml 中添加下面倉庫:

<repository>
  <id>tempo-db</id>
  <url>http://maven.tempo-db.com/artifactory/list/推ter/
  </url>
  <snapshots>
    <enabled>false</enabled>
  </snapshots>
</repository>


5. 最佳實踐

參考基于Flume的美團日志收集系統(一)架構和設計,列出一些最佳實踐:

  • 模塊命名規則:所有的 Source 以 src 開頭,所有的 Channel 以 ch 開頭,所有的 Sink 以 sink 開頭;
  • 模塊之間內部通信統一使用 Avro 接口;
  • 將日志采集系統系統分為三層:Agent 層,Collector 層和 Store 層,其中 Agent 層每個機器部署一個進程,負責對單機的日志收集工作;Collector 層部署在中心服務器上,負責接收Agent層發送的日志,并且將日志根據路由規則寫到相應的 Store 層中;Store 層負責提供永久或者臨時的日志存儲服務,或者將日志流導向其它服務器。
  • 擴展 MemoryChannel 和 FileChannel ,提供 DualChannel 的實現,以提供高吞吐和大緩存
  • 監控 collector HdfsSink寫數據到 hdfs 的速度、FileChannel 中擁堵的 events 數量,以及寫 hdfs 狀態(查看是否有 .tmp 文件生成)

美團對 flume 的改進代碼見 github:https://github.com/dashengju/mt-flume

 


 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!