大數據(七) - Flume

jopen 9年前發布 | 14K 次閱讀 分布式/云計算/大數據

flume[flu:m]:日志采集、聚合和傳輸的系統, java語言實現


flume是干什么的?

收集日志的

flume如何搜集日志?

我們把flume比作情報人員

(1)搜集信息

(2)獲取記憶信息

(3)傳遞報告間諜信息

flume是怎么完成上面三件事情的,三個組件:

source: 搜集信息

channel:傳遞信息

sink:存儲信息
</div>



</div> </div>

flume OG( original generation初始版本 )和 NG( next generation,cdh4以及之后的版本

Flume OG 代碼工程臃腫、核心組件設計不合理、核心配置不標準等

Flume OG 有三種角色節點:agent、collector、master節點。

Flume NG 只有一種角色的節點:代理節點(agent),去掉了 collector、master 節點,這是核心組件最核心的變化。 

agent 節點的組成也發生了變化,由 source、sink、channel 組成。


</div>

NG要求jdk1.6以上,而且只有linux上的啟動腳本

     OG版本已經不更新了

    NG的核心組件:

        source:完成對 日志數據的收集,分成transition和event打入到channel中。

            source有多種實現包括AvroSource(監控端口)、NetCat Source、Syslog Source、Syslog TCP Source、Syslog UDP Source、Http Source、HDFS Source、Spooling Directory Source(對目錄下新增文件的監控,并讀取文件數據)、Exec Source(以運行linux命令的方式,持續輸出最新數據,如tail -F)等

            flume可以和log4j配合使用

        sink:取出channel中的數據,輸出到存儲文件系統,數據庫,或遠程服務器

            多種實現方式如Avro sink、HDFS Sink、HBase Sink、Logger Sink(測試用,后臺打印)

        小數據可以存儲在文件或數據庫中,海量數據(每天GB、TB級別的數據)存儲到hadoop中

        Channel:管道,提供一個隊列的功能,對source提供的數據進行簡單緩存

            實現由Memory/File/jdbc channel,Memory無法保證數據完成性,官方建議使用File Channel,保證數據完整性和一致性
</div>
</div>

Flow Pipeline

1、多個Agent順序連接

2、多個Agent的數據匯聚到同一個Agent


3、多路(一個agent上有多個channel)(Multiplexing)Agent


這種模式,有兩種方式,一種是用來復制(Replication),另一種是用來分流(Multiplexing)。Replication方式,可以將最前端的數據源復制多份,分別傳遞到多個channel中,每個channel接收到的數據都是相同的;Multiplexing方式,selector可以根據header的值來確定數據傳遞到哪一個channel

4、實現load balance功能


5、實現failover功能


flume source

    Avro Source:接收外部avro客戶端的事件

    Thrift Source:接收外部thrift客戶端的事件

    Exec Source:接收來自一個給定的Unix命令的標準輸出上的數據

    Jms Source:接收來自消息隊列的事件

    NetCat Source:netcat在一端偵聽,每一行文字變成一個事件源

    Spooling Directory Source:以目錄中文件內容為事件源

    SequenceGenerator Source:一個簡單地序列生成器,主要用于測試

    Syslog Source:讀取syslog數據

        Syslog  UDP Source

        Syslog  TCP Source

        Multiport Syslog TCP Source

    Http Source:接收http post,get事件,get只用于試驗

    Custom Source:自定義source


flume sink

    HDFS Sink將事件寫入到hadoop分布式文件系統HDFS

    Logger sink 通常用于調試、測試

    Avro sink 可以批量傳送,可以配置批量大小

    Thrift sink

    IRC sink 從通道中取得信息到irc server

    File Roll sink存儲文件到本地文件系統中

    Null sink丟棄從通道接收的所有事件

    HBase sink將數據寫入到hbase中

    AsyncHbase sink異步方式將數據寫入到hbase中

    Custom sink 自定義sink

flume channel

    Memory channel 時間存儲在一個可配置的最大尺寸的內存中的隊列;速度快,吞吐量大,但是代理出現故障時數據丟失

    JDBC channel 時間存儲在數據庫中

    File channel 不同的file channel應該寫到不同的磁盤上,避免單磁盤io過大

    Pseudo Thansaction channel 用于測試

    Custom channel 自定義channel

flume channel selector

    Replicating channel selector (default) 復制,相同的數據發送到多個channel

    Multiplexing channel selector 復用,以header區分一個event發送到哪個channel

    Custom channel selector 自定義channel selector


</div>


數據通信系統或計算機網絡系統中,傳輸媒體的帶寬或容量往往會大于傳輸單一信號的需求,為了有效地利用通信線路,希望一個信道同時傳輸多路信號,這就是所謂的 多路復用技術(Multiplexing)。采用多路復用技術能把多個信號組合起來在一條物理信道上進行傳輸,在遠距離傳輸時可大大節省電纜的安裝和維護費用。



Flume sink processor

    Default sink processor

    Failover sink processor 故障轉移(主備)

    Load balancing sink processor 負載均衡:輪詢round_robin或隨機random


flume interceptor 

    攔截器主要是對事件的header信息信息操作,要么直接忽略他,要么修改他的數據

    一、Event Serializers

  file_roll sink 和hdfs sink 都支持EventSerializer接口

    Body TextSerializer,別名:text。這個攔截器將把事件的body部分寫入到輸出流中而不需要任何轉換或者修改。事件的header將直接被忽略。

    Avro Event Serializer別名:avro_event。這個攔截器將把事件序列化到一個Avro容器文件中。使用的模式和RPC Avro機制使用到的處理flume事件的機制一樣。這個序列化器繼承自AbstractAvroEventSerializer類。

    二、Timestamp Interceptor

    Flume 可以在事件傳輸過程中對它進行修改與刪除,而這個都是通過Interceptor進行實現的,實際都是往事件的header里插數據。而Timestamp Interceptor攔截器就是可以往event的header中插入關鍵詞為timestamp的時間戳。

    三、Host Interceptor

  該攔截器可以往event的header中插入關鍵詞默認為host主機名或者ip地址(注意是agent運行的機器的主機名或者ip地址)

    四、Static Interceptor

  Static Interceptor攔截器允許用戶增加一個static的header并為所有的事件賦值。范圍是所有事件。

    五、Regex FilteringInterceptor

  Regex Filtering Interceptor攔截器用于過濾事件,篩選出與配置的正則表達式相匹配的事件。可以用于包含事件和排除事件(include 或者是exclude)。常用于數據清洗,通過正則表達式把數據過濾出來。
</div> </div>


flume開發

    1、RPC

        flume雖然包含一些內部機制來采集數據,但是有時候用戶希望能將應用程序和flume直接相通。flume client是一個庫,允許應用程序鏈接flume和通過rpc往flume發送數據。

        avro是flume默認的rpc協議

    2、 Transaction    

        Flume 的核心是把數據從數據源收集過來,再送到目的地。為了保證輸送一定成功,在送到目的地之前,會先緩存數據,待數據真正到達目的地后,刪除自己緩存的數據。

        Flume 使用事務性的方式保證傳送Event整個過程的可靠性。Sink 必須在 Event 被存入 Channel 后,或者,已經被傳達到下一站agent里,又或者,已經被存入外部數據目的地之后,才能把 Event 從 Channel 中 remove 掉。這樣數據流里的 event 無論是在一個 agent 里還是多個 agent 之間流轉,都能保證可靠,因為以上的事務保證了 event 會被成功存儲起來。而 Channel 的多種實現在可恢復性上有不同的保證。也保證了 event 不同程度的可靠性。比如 Flume 支持在本地保存一份文件 channel 作為備份,而memory channel 將 event 存在內存 queue 里,速度快,但丟失的話無法恢復。

    3、Source

    4、Sink
</div>


最佳實踐

參考基于Flume的美團日志收集系統(一)架構和設計,列出一些最佳實踐:

  • 模塊命名規則:所有的 Source 以 src 開頭,所有的 Channel 以 ch 開頭,所有的 Sink 以 sink 開頭
  • 模塊之間內部通信統一使用 Avro 接口
  • 日志采集系統系統分為三層:Agent 層,Collector 層和 Store 層,其中 Agent 層每個機器部署一個進程,負責對單機的日志收集工作;Collector 層部署在中心服務器上,負責接收Agent層發送的日志,并且將日志根據路由規則寫到相應的 Store 層中;Store 層負責提供永久或者臨時的日志存儲服務,或者將日志流導向其它服務器。
  • 擴展 MemoryChannel 和 FileChannel ,提供 DualChannel 的實現,以提供高吞吐和大緩存
  • 監控 collector HdfsSink寫數據到 hdfs 的速度、FileChannel 中擁堵的 events 數量,以及寫 hdfs 狀態(查看是否有 .tmp 文件生成)

美團對 flume 的改進代碼見 github:https://github.com/javachen/mt-flume

來自: http://blog.csdn.net//matthewei6/article/details/50534563

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!