Kafka實戰-數據持久化

f627 9年前發布 | 36K 次閱讀 Kafka 消息系統

原文: http://www.cnblogs.com/smartloli/p/4648249.html

 

1.概述

經過前面Kafka實戰系列的學習,我們通過學習《Kafka實戰-入門》了解Kafka的應用場景和基本原理,《 Kafka實戰-Kafka Cluster 》一文給大家分享了Kafka集群的搭建部署,讓大家掌握了集群的搭建步驟,《 Kafka實戰-實時日志統計流程 》一文給大家講解一個項目(或者說是系統)的整體流程,《 Kafka實戰-Flume到Kafka 》一文給大家介紹了Kafka的數據生產過程,《 Kafka實戰-Kafka到Storm 》一文給大家介紹了Kafka的數據消費,通過Storm來實時計算處理。今天進入Kafka實戰的最后一個環節,那就是Kafka實戰的結果的數據持久化。下面是今天要分享的內容目錄:

  • 結果持久化
  • 實現過程
  • 結果預覽

下面開始今天的分享內容。

2.結果持久化

一般,我們在進行實時計算,將結果統計處理后,需要將結果進行輸出,供前端工程師去展示我們統計的結果(所說的報表)。結果的存儲,這里我們選擇的是Redis+MySQL進行存儲,下面用一張圖來展示這個持久化的流程,如下圖所示:

Kafka實戰-數據持久化

從途中可以看出,實時計算的部分由Storm集群去完成,然后將計算的結果輸出到Redis和MySQL庫中進行持久化,給前端展示提供數據源。接下來,我給大家介紹如何實現這部分流程。

3.實現過程

首先,我們去實現Storm的計算結果輸出到Redis庫中,代碼如下所示:

package cn.hadoop.hdfs.storm;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import redis.clients.jedis.Jedis;
import cn.hadoop.hdfs.util.JedisFactory;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
/**
 * @Date Jun 10, 2015
 *
 * @Author dengjie
 *
 * @Note Calc WordsCount eg.
 */
public class WordsCounterBlots implements IRichBolt {
  /**
   * 
   */
  private static final long serialVersionUID = -619395076356762569L;
  OutputCollector collector;
  Map<String, Integer> counter;
  @SuppressWarnings("rawtypes")
  public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    this.collector = collector;
    this.counter = new HashMap<String, Integer>();
  }
  public void execute(Tuple input) {
    String word = input.getString(0);
    Integer integer = this.counter.get(word);
    if (integer != null) {
      integer += 1;
      this.counter.put(word, integer);
    } else {
      this.counter.put(word, 1);
    }
    for (Entry<String, Integer> entry : this.counter.entrySet()) {
       // write result to redis
      Jedis jedis = JedisFactory.getJedisInstance("real-time");
      jedis.set(entry.getKey(), entry.getValue().toString());
      // write result to mysql
      // ...
    }
this.collector.ack(input);
  }
  public void cleanup() {
    // TODO Auto-generated method stub

  }
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    // TODO Auto-generated method stub

  }
  public Map<String, Object> getComponentConfiguration() {
    // TODO Auto-generated method stub
    return null;
  }
}

注:這里關于輸出到MySQL就不贅述了,大家可以按需處理即可。

4.結果預覽

在實現持久化到Redis的代碼實現后,接下來,我們通過提交Storm作業,來觀察是否將計算后的結果持久化到了Redis集群中。結果如下圖所示:

Kafka實戰-數據持久化

通過Redis的Client來瀏覽存儲的Key值,可以觀察統計的結果持久化到來Redis中。

5.總結

我們在提交作業到Storm集群的時候需要觀察作業運行狀況,有可能會出現異常,我們可以通過Storm UI界面來觀察,會有提示異常信息的詳細描述。若是出錯,大家可以通過Storm UI的錯誤信息和Log日志打印的錯誤信息來定位出原因,從而找到對應的解決辦法。

 本文由用戶 f627 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!