Kafka實戰-實時日志統計流程
1.概述
在《Kafka實戰-簡單示例》一文中給大家介紹來Kafka的簡單示例,演示了如何編寫Kafka的代碼去生產數據和消費數據,今天給大家介紹如何去整 合一個完整的項目,本篇博客我打算為大家介紹Flume+Kafka+Storm的實時日志統計,由于涉及的內容較多,這里先給大家梳理一個項目的運用這 些技術的流程。下面是今天的內容目錄:
- 項目流程
- Flume
- Kafka
- Storm
下面開始今天的內容分享。
2.項目流程
在整合這套方案的時候,項目組也是經過一番討論,在討論中,觀點很多,有人認為直接使用Storm進行實時處理,去掉Kafka環節;也有認為直接使用Kafka的API去消費,去掉Storm的消費環節等等,但是最終組內還是一致決定使用這套方案,原因有如下幾點:
- 業務模塊化
- 功能組件化
我們認為,Kafka在整個環節中充當的職責應該單一,這項目的整個環節她就是一個中間件,下面用一個圖來說明這個原因,如下圖所示:
整個項目流程如上圖所示,這樣劃分使得各個業務模塊化,功能更加的清晰明了。
- Data Collection
負責從各個節點上實時收集用戶上報的日志數據,我們選用的是Apache的Flume NG來實現。
- Data Access
由于收集的數據的速度和數據處理的速度不一定是一致的,因此,這里添加了一個中間件來做處理,所使用的是Apache的Kafka,關于Kafka集群部署,大家可以參考我寫的《 Kafka實戰-Kafka Cluster 》。另外,有一部分數據是流向HDFS分布式文件系統了的,方便于為離線統計業務提供數據源。
- Stream Computing
在收集到數據后,我們需要對這些數據做實時處理,所選用的是Apache的Storm。關于Storm的集群搭建部署博客后面補上,較為簡單。
- Data Output
在使用Storm對數據做處理后,我們需要將處理后的結果做持久化,由于對相應速度要求較高,這里采用Redis+MySQL來做持久化。整個項目的流程架構圖,如下圖所示:
3.Flume
Flume是一個分布式的、高可用的海量日志收集、聚合和傳輸日志收集系統,支持在日志系統中定制各類數據發送方(如:Kafka,HDFS 等),便于收集數據。Flume提供了豐富的日志源收集類型,有:Console、RPC、Text、Tail、Syslog、Exec等數據源的收集, 在我們的日志系統中目前我們所使用的是spooldir方式進行日志文件采集,配置內容信息如下所示:
producer.sources.s.type = spooldir producer.sources.s.spoolDir = /home/hadoop/dir/logdfs
當然,Flume的數據發送方類型也是多種類型的,有:Console、Text、HDFS、RPC等,這里我們系統所使用的是Kafka中間件來接收,配置內容如下所示:
producer.sinks.r.type = org.apache.flume.plugins.KafkaSink producer.sinks.r.metadata.broker.list=dn1:9092,dn2:9092,dn3:9092 producer.sinks.r.partition.key=0 producer.sinks.r.partitioner.class=org.apache.flume.plugins.SinglePartition producer.sinks.r.serializer.class=kafka.serializer.StringEncoder producer.sinks.r.request.required.acks=0 producer.sinks.r.max.message.size=1000000 producer.sinks.r.producer.type=sync producer.sinks.r.custom.encoding=UTF-8 producer.sinks.r.custom.topic.name=test
關于,Flume的詳細搭建部署,大家可以參考我寫的《 高可用Hadoop平臺-Flume NG實戰圖解篇 》。這里就不多做贅述了。
4.Kafka
Kafka是一種提供高吞吐量的分布式發布訂閱消息系統,她的特性如下所示:
- 通過磁盤數據結構提供消息的持久化,這種結構對于即使數據達到TB+級別的消息,存儲也能夠保持長時間的穩定。
- 搞吞吐特性使得Kafka即使使用普通的機器硬件,也可以支持每秒數10W的消息。
- 能夠通過Kafka Cluster和Consumer Cluster來Partition消息。
Kafka的目的是提供一個發布訂閱解決方案,他可以處理Consumer網站中的所有流動數據,在網頁瀏覽,搜索以及用戶的一些行為,這些動作 是較為關鍵的因素。這些數據通常是由于吞吐量的要求而通過處理日志和日志聚合來解決。對于Hadoop這樣的日志數據和離線計算系統,這樣的方案是一個解 決實時處理較好的一種方案。
關于Kafka集群的搭建部署和使用,大家可以參考我寫的:《 Kafka實戰-Kafka Cluster 》,這里就不多做贅述了。
5.Storm
推ter將Storm開源了,這是一個分布式的、容錯的實時計算系統,已被貢獻到Apache基金會,下載地址如下所示:
http://storm.apache.org/downloads.html
Storm的主要特點如下:
- 簡單的編程模型。類似于MapReduce降低了并行批處理復雜性,Storm降低了進行實時處理的復雜性。
- 可以使用各種編程語言。你可以在Storm之上使用各種編程語言。默認支持Clojure、Java、Ruby和Python。要增加對其他語言的支持,只需實現一個簡單的Storm通信協議即可。
- 容錯性。Storm會管理工作進程和節點的故障。
- 水平擴展。計算是在多個線程、進程和服務器之間并行進行的。
- 可靠的消息處理。Storm保證每個消息至少能得到一次完整處理。任務失敗時,它會負責從消息源重試消息。
- 快速。系統的設計保證了消息能得到快速的處理,使用?MQ作為其底層消息隊列。
- 本地模式。Storm有一個本地模式,可以在處理過程中完全模擬Storm集群。這讓你可以快速進行開發和單元測試。
Storm集群由一個主節點和多個工作節點組成。主節點運行了一個名為“Nimbus”的守護進程,用于分配代碼、布置任務及故障檢測。每個工作 節 點都運行了一個名為“Supervisor”的守護進程,用于監聽工作,開始并終止工作進程。Nimbus和Supervisor都能快速失敗,而且是無 狀態的,這樣一來它們就變得十分健壯,兩者的協調工作是由Apache的ZooKeeper來完成的。
Storm的術語包括Stream、Spout、Bolt、Task、Worker、Stream Grouping和Topology。Stream是被處理的數據。Spout是數據源。Bolt處理數據。Task是運行于Spout或Bolt中的 線程。Worker是運行這些線程的進程。Stream Grouping規定了Bolt接收什么東西作為輸入數據。數據可以隨機分配(術語為Shuffle),或者根據字段值分配(術語為Fields),或者 廣播(術語為All),或者總是發給一個Task(術語為Global),也可以不關心該數據(術語為None),或者由自定義邏輯來決定(術語為 Direct)。Topology是由Stream Grouping連接起來的Spout和Bolt節點網絡。在Storm Concepts頁面里對這些術語有更詳細的描述。
關于Storm集群的搭建部署,博客在下一篇中更新,到時候會將更新地址附在這里,這里就先不對Storm集群的搭建部署做過多的贅述了。
6.總結
這里就是為大家介紹的Flume+Kafka+Storm的整體流程,后續會給大家用一個項目案例來實踐演示這個流程,包括具體的各個模塊的編碼實踐。今天大家可以先熟悉下實時計算項目的流程開發。