Kafka實戰:從RDBMS到Hadoop,七步實現實時傳輸

本文是關于Flume成功應用Kafka的研究案例,深入剖析它是如何將RDBMS實時數據流導入到HDFS的Hive表中。

對于那些想要把數據快速攝取到Hadoop中的企業來講,Kafka是一個很好的選擇。Kafka是什么?Kafka是一個分布式、可伸縮、可信賴的消息傳遞系統,利用發布-訂閱模型來集成應用程序/數據流。同時,Kafka還是Hadoop技術堆棧中的關鍵組件,能夠很好地支持實時數據分析或者貨幣化的物聯網數據。

本文服務于技術人群。下面就圖解Kafka是如何把數據流從RDBMS(關系數據庫管理系統)導入Hive,同時借助一個實時分析用例加以說明。作為參考,本文中使用的組件版本分別為Hive 1.2.1,Flume 1.6 以及 Kafka 0.9。

Kafka所在位置:解決方案的整體結構

下圖顯示了解決方案的整體結構: Kafka 和  Flume 的結合,再加上Hive的交易功能,RDBMS的交易數據被成功傳遞到目標  Hive 表中。

七步實現Hadoop實時數據導入

現在讓我們深入方案細節,并展示如何在幾個步驟內將數據流導入Hadoop。

1.從RDBMS中提取數據

所有關系型數據庫都有一個日志文件,用來記錄最新的交易。解決方案的第一步就是獲取這些交易數據,同時要確保這些數據格式是可以被Hadoop所接受的。

2.設置Kafka生產商

發布Kafka話題消息的過程稱為“生產商”。“話題”里有各種Kafka所需要維護的信息類別,RDBMS數據也會被轉換成Kafka話題。對于這個示例,要求設置一個服務于整個銷售團隊的數據庫,且該數據庫中的交易數據均以Kafka話題形式發布。以下步驟都需要設置Kafka 生產商:

$cd /usr/hdp/2.4.0.0-169/kafka
$bin/kafka-topics.sh --create --zookeeper sandbox.hortonworks.com:2181 --replication-factor 1 --partitions 1 --topic SalesDBTransactions
Created topic "SalesDBTransactions".
$bin/kafka-topics.sh --list --zookeeper sandbox.hortonworks.com:2181
SalesDBTransactions

3.設置Hive

接下來將創建一個Hive表,準備接收銷售團隊的數據庫交易數據。這個例子中,我們將創建一個用戶數據表:

[bedrock@sandbox ~]$ beeline -u jdbc:hive2:// -n hive -p hive
0: jdbc:hive2://> use raj; create table customers (id string, name string, email string, street_address string, company string) partitioned by (time string) clustered by (id) into 5 buckets stored as orc location '/user/bedrock/salescust' TBLPROPERTIES ('transactional'='true');

為了確保Hive能夠有效處理交易數據,以下設置要求在Hive配置中進行:

hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager

4.為Kafka到Hive的數據流設置Flume代理

現在來看下如何創建一個Flume代理,用于收集Kafka話題資料并向Hive表發送數據。

在啟用Flume代理前,要通過這幾個步驟設置運行環境:

$ pwd
/home/bedrock/streamingdemo
$ mkdir flume/checkpoint
$ mkdir flume/data
$ chmod 777 -R flume
$ export HIVE_HOME=/usr/hdp/current/hive-server2
$ export HCAT_HOME=/usr/hdp/current/hive-webhcat

$ pwd
/home/bedrock/streamingdemo/flume
$ mkdir logs

再如下所示創建一個log4j屬性文件:

[bedrock@sandbox conf]$ vi log4j.properties
flume.root.logger=INFO,LOGFILE
flume.log.dir=/home/bedrock/streamingdemo/flume/logs
flume.log.file=flume.log

然后為Flume代理配置以下文件:

$ vi flumetohive.conf
flumeagent1.sources = source_from_kafka
flumeagent1.channels = mem_channel
flumeagent1.sinks = hive_sink
# Define / Configure source
flumeagent1.sources.source_from_kafka.type = org.apache.flume.source.kafka.KafkaSource
flumeagent1.sources.source_from_kafka.zookeeperConnect = sandbox.hortonworks.com:2181
flumeagent1.sources.source_from_kafka.topic = SalesDBTransactions
flumeagent1.sources.source_from_kafka.groupID = flume
flumeagent1.sources.source_from_kafka.channels = mem_channel
flumeagent1.sources.source_from_kafka.interceptors = i1
flumeagent1.sources.source_from_kafka.interceptors.i1.type = timestamp
flumeagent1.sources.source_from_kafka.consumer.timeout.ms = 1000

# Hive Sink flumeagent1.sinks.hive_sink.type = hive flumeagent1.sinks.hive_sink.hive.metastore = thrift://sandbox.hortonworks.com:9083 flumeagent1.sinks.hive_sink.hive.database = raj flumeagent1.sinks.hive_sink.hive.table = customers flumeagent1.sinks.hive_sink.hive.txnsPerBatchAsk = 2 flumeagent1.sinks.hive_sink.hive.partition = %y-%m-%d-%H-%M flumeagent1.sinks.hive_sink.batchSize = 10 flumeagent1.sinks.hive_sink.serializer = DELIMITED flumeagent1.sinks.hive_sink.serializer.delimiter = , flumeagent1.sinks.hive_sink.serializer.fieldnames = id,name,email,street_address,company # Use a channel which buffers events in memory flumeagent1.channels.mem_channel.type = memory flumeagent1.channels.mem_channel.capacity = 10000 flumeagent1.channels.mem_channel.transactionCapacity = 100 # Bind the source and sink to the channel flumeagent1.sources.source_from_kafka.channels = mem_channel flumeagent1.sinks.hive_sink.channel = mem_channel

5.啟用Flume代理

通過以下指令啟用Flume代理:

$ /usr/hdp/apache-flume-1.6.0/bin/flume-ng agent -n flumeagent1 -f ~/streamingdemo/flume/conf/flumetohive.conf

6.啟用Kafka流

作為示例下面是一個模擬交易的消息集,這在實際系統中需要通過源數據庫才能生成。例如,以下可能來自Oracle流,在回放被提交到數據庫的SQL交易數據,也可能來自GoldenGate。

$ cd /usr/hdp/2.4.0.0-169/kafka
$ bin/kafka-console-producer.sh --broker-list sandbox.hortonworks.com:6667 --topic SalesDBTransactions
1,"Nero Morris","porttitor.interdum@Sedcongue.edu","P.O. Box 871, 5313 Quis Ave","Sodales Company"
2,"Cody Bond","ante.lectus.convallis@antebibendumullamcorper.ca","232-513 Molestie Road","Aenean Eget Magna Incorporated"
3,"Holmes Cannon","a@metusAliquam.edu","P.O. Box 726, 7682 Bibendum Rd.","Velit Cras LLP"
4,"Alexander Lewis","risus@urna.edu","Ap #375-9675 Lacus Av.","Ut Aliquam Iaculis Inc."
5,"Gavin Ortiz","sit.amet@aliquameu.net","Ap #453-1440 Urna. St.","Libero Nec Ltd"
6,"Ralph Fleming","sociis.natoque.penatibus@quismassaMauris.edu","363-6976 Lacus. St.","Quisque Fringilla PC"
7,"Merrill Norton","at.sem@elementum.net","P.O. Box 452, 6951 Egestas. St.","Nec Metus Institute"
8,"Nathaniel Carrillo","eget@massa.co.uk","Ap #438-604 Tellus St.","Blandit Viverra Corporation"
9,"Warren Valenzuela","tempus.scelerisque.lorem@ornare.co.uk","Ap #590-320 Nulla Av.","Ligula Aliquam Erat Incorporated"
10,"Donovan Hill","facilisi@augue.org","979-6729 Donec Road","Turpis In Condimentum Associates"
11,"Kamal Matthews","augue.ut@necleoMorbi.org","Ap #530-8214 Convallis, St.","Tristique Senectus Et Foundation"

7.接收Hive數據

如果上面所有的步驟都完成了,那么現在就可以從Kafka發送數據,可以看到數據流在幾秒鐘內就會被發送到Hive表。

 

來自:http://www.thebigdata.cn/Hadoop/31071.html

 

 本文由用戶 LynHolcomb 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!