記錄一下互聯網日志實時收集和實時計算的簡單方案

jopen 9年前發布 | 51K 次閱讀 日志 日志處理

作為互聯網公司,網站監測日志當然是數據的最大來源。我們目前的規模也不大,每天的日志量大約1TB。后續90%以上的業務都是需要基于日志來完 成,之前,業務中對實時的要求并不高,最多也就是準實時(延遲半小時以上),因此,我們使用Flume將數據收集到HDFS,然后進行清洗和分析。

后來,根據業務需要,我們有了兩個Hadoop集群,并且部署在不同的地方(北京和西安),而所有的日志收集服務器在北京,因此需要將日志數據通過外網傳輸到西安,于是有了這樣的部署:

很快,通過Flume流到西安Hadoop集群的數據就遇到了問題,比原始數據多或者少一些,造成這個問題的主要原因是在網絡不穩定的情況下,北京 Flume Agent發送到西安Flume Collector的過程中,會發送失敗,或者響應失敗。另外,之前的數據準實時也不能滿足業務的需求。

為了解決數據實時跨外網傳輸以及實時業務的問題,于是有了現在的架構:

  1. 引入Kafka,并且和日志收集服務器部署在北京同機房;
  2. 每臺日志收集服務器上的Flume Agent,通過內網將數據發送至Kafka;
  3. Kafka的第一個消費者,北京網關機上的Flume,負責從Kafka中消費數據,然后流到北京Hadoop集群;
  4. Kafka的第二個消費者,西安網關機上的Flume,負責從Kafka中消費數據,然后流到西安Hadoop集群;這里是西安的Flume通過 外網連接北京Kafka,主動拉取數據,如果網絡不穩定,那么當前批次拉取失敗,最多重新拉一次,數據不會進Flume channel,更不會流到HDFS上,因此,這種方式在網絡不穩定的情況下,不會造成數據缺失或重復;
  5. Kafka的第三個消費者,北京網關機上的實時計算模塊,后面再說;
  6. Kafka的第N個消費者,其他;

Kafka中的數據分區及副本

這種架構下,Kafka成為了統一的日志數據提供者,至關重要。我們目前有4臺Broker節點,每個Topic在創建時候都指定了4個分區,副本數為2;

數據在進入Kafka分區的時候,使用了Flume的攔截器,從日志中提取用戶ID,然后通過HASH取模,將數據流到Kafka相應的分區中。這 種方式,一方面,完成了簡單的負載均衡,另一方面,確保相同的用戶數據都處于同一個分區中,為后面實時計算模塊的統計提供了極大的便利。

Flume攔截器的使用

在整個流程中,有兩個地方用到了同一個Flume攔截器(Regex Extractor Interceptor),就是在Flume Source中從消息中提取數據,并加入到Header,供Sink使用;

  • 一處是在LogServer上部署的Flume Source,它從原始日志中提取出用戶ID,然后加入到Header中,Flume Sink(Kafka Sink)再入Kafka之前,從Header中拿出該用戶ID,然后通過應用分區規則,將該條消息寫入Kafka對應的分區中;
  • 另外一處是部署在西安的Flume Source,它從Kafka中讀取消息之后,從消息中抽取出時間字段,并加入到Header中,后面的Flume Sink(HDFS Sink)通過讀取Header中時間,根據消息中的時間,將數據寫入HDFS相應的目錄和文件中。如果在HDFS Sink中僅僅使用當前時間來確定HDFS目錄和文件名稱,這樣會造成一小部分數據沒有寫入到正確的目錄和文件中,比如:日志中8點59分59秒的數據可 能會被寫進HDFS上9點的目錄和文件中,因為原始數據經過Kafka,通過外網傳輸到西安的Flume,有個幾秒的延時,那是很正常的。

Flume消費者的負載均衡和容錯

在北京部署的Flume,使用Kafka Source從Kafka中讀取數據流向北京Hadoop集群,西安的也一樣,在消費同一Topic的消息時候,我們都是在兩臺機器上啟動了兩個 Flume Agent,并且設置的統一消費組(group.id),根據Kafka相同的Topic,一條消息只能被同一消費組內的一個消費者消費,因 此,Kafka中的一條消息,只會被這兩個Flume Agent其中的一個消費掉,如果一個Flume Agent掛掉,那么另外一個將會消費所有消息;

這種方式,也是在流向HDFS的消費者端做了負載均衡和容錯。

實時計算模塊

目前我們實時計算的業務比較簡單,就是類似于根據不同維度統計PV和UV。比如:實時統計一個網站當天累計PV、UV、IP數等,目前我們直接開發的JAVA程序,使用streamlib統計這些指標,UV和IP數這種需要去重的指標有2%以內的誤差,業務可以接受。

記錄一下互聯網日志實時收集和實時計算的簡單方案

實時計算模塊使用Kafka low-level API,針對每一個Topic,都使用和分區數相等的線程去處理,每個線程消費一個分區的數據,由于數據在進入Kafka分區的時候,都是經過相應規則的分區,因此相同用戶的數據會在同一個分區中;

另外,每個線程會在Redis中維護自己當前的Offsets,比如:在實時計算當天累計指標的業務場景中,每天0天在Redis中記錄當前的 Offsets,這樣,如果實時計算程序掛掉,下次啟動時候,從Redis中讀取當天的Offsets,重新讀取和計算當天的所有消息。

由于我們的需求是實時統計當天累計的指標,而且能接受一定的誤差,因此采用這種方式。如果需要精確統計累計去重指標,那么可能需要采用其它方式,比如:精確統計當天實時累計用戶數,一種簡單的辦法是在HBase中使用計數器來配合完成。

其它實時數據消費者

如果需要實時統計一小段時間(比如十分鐘、一小時)之內的PV、UV等指標,那么可以使用SparkStreaming來完成,比較簡單。如果單獨使用Spark Streaming來完成一天內海量數據的累計去重統計,我還不太清楚有什么好的解決辦法。

另外,實時OLAP也可能作為Kafka的實時消費者應用,比如:Druid。

相關閱讀

Kafka分區機制介紹與示例
Kafka架構和原理深度剖析
利用Flume攔截器(interceptors)實現Kafka Sink的自定義規則多分區寫入
Java使用極小的內存完成對超大數據的去重計數,用于實時計算中統計UV
Druid.io實時OLAP數據分析存儲系統介紹

就這么多吧,很多東西都是初次嘗試,肯定有很多不足之處,慢慢探索吧。

 

轉載請注明:lxw的大數據田地 ? 記錄一下互聯網日志實時收集和實時計算的簡單方案

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!