Kafka實戰:從RDBMS到Hadoop,七步實現實時傳輸
本文是關于Flume成功應用Kafka的研究案例,深入剖析它是如何將RDBMS實時數據流導入到HDFS的Hive表中。
對于那些想要把數據快速攝取到Hadoop中的企業來講,Kafka是一個很好的選擇。Kafka是什么?Kafka是一個分布式、可伸縮、可信賴的消息傳遞系統,利用發布-訂閱模型來集成應用程序/數據流。同時,Kafka還是Hadoop技術堆棧中的關鍵組件,能夠很好地支持實時數據分析或者貨幣化的物聯網數據。
本文服務于技術人群。下面就圖解Kafka是如何把數據流從RDBMS(關系數據庫管理系統)導入Hive,同時借助一個實時分析用例加以說明。作為參考,本文中使用的組件版本分別為Hive 1.2.1,Flume 1.6 以及 Kafka 0.9。
Kafka所在位置:解決方案的整體結構
下圖顯示了解決方案的整體結構: Kafka 和 Flume 的結合,再加上Hive的交易功能,RDBMS的交易數據被成功傳遞到目標 Hive 表中。
七步實現Hadoop實時數據導入
現在讓我們深入方案細節,并展示如何在幾個步驟內將數據流導入Hadoop。
1.從RDBMS中提取數據
所有關系型數據庫都有一個日志文件,用來記錄最新的交易。解決方案的第一步就是獲取這些交易數據,同時要確保這些數據格式是可以被Hadoop所接受的。
2.設置Kafka生產商
發布Kafka話題消息的過程稱為“生產商”。“話題”里有各種Kafka所需要維護的信息類別,RDBMS數據也會被轉換成Kafka話題。對于這個示例,要求設置一個服務于整個銷售團隊的數據庫,且該數據庫中的交易數據均以Kafka話題形式發布。以下步驟都需要設置Kafka 生產商:
$cd /usr/hdp/2.4.0.0-169/kafka $bin/kafka-topics.sh --create --zookeeper sandbox.hortonworks.com:2181 --replication-factor 1 --partitions 1 --topic SalesDBTransactions Created topic "SalesDBTransactions". $bin/kafka-topics.sh --list --zookeeper sandbox.hortonworks.com:2181 SalesDBTransactions
3.設置Hive
接下來將創建一個Hive表,準備接收銷售團隊的數據庫交易數據。這個例子中,我們將創建一個用戶數據表:
[bedrock@sandbox ~]$ beeline -u jdbc:hive2:// -n hive -p hive 0: jdbc:hive2://> use raj; create table customers (id string, name string, email string, street_address string, company string) partitioned by (time string) clustered by (id) into 5 buckets stored as orc location '/user/bedrock/salescust' TBLPROPERTIES ('transactional'='true');
為了確保Hive能夠有效處理交易數據,以下設置要求在Hive配置中進行:
hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager
4.為Kafka到Hive的數據流設置Flume代理
現在來看下如何創建一個Flume代理,用于收集Kafka話題資料并向Hive表發送數據。
在啟用Flume代理前,要通過這幾個步驟設置運行環境:
$ pwd /home/bedrock/streamingdemo $ mkdir flume/checkpoint $ mkdir flume/data $ chmod 777 -R flume $ export HIVE_HOME=/usr/hdp/current/hive-server2 $ export HCAT_HOME=/usr/hdp/current/hive-webhcat $ pwd /home/bedrock/streamingdemo/flume $ mkdir logs
再如下所示創建一個log4j屬性文件:
[bedrock@sandbox conf]$ vi log4j.properties flume.root.logger=INFO,LOGFILE flume.log.dir=/home/bedrock/streamingdemo/flume/logs flume.log.file=flume.log
然后為Flume代理配置以下文件:
$ vi flumetohive.conf flumeagent1.sources = source_from_kafka flumeagent1.channels = mem_channel flumeagent1.sinks = hive_sink # Define / Configure source flumeagent1.sources.source_from_kafka.type = org.apache.flume.source.kafka.KafkaSource flumeagent1.sources.source_from_kafka.zookeeperConnect = sandbox.hortonworks.com:2181 flumeagent1.sources.source_from_kafka.topic = SalesDBTransactions flumeagent1.sources.source_from_kafka.groupID = flume flumeagent1.sources.source_from_kafka.channels = mem_channel flumeagent1.sources.source_from_kafka.interceptors = i1 flumeagent1.sources.source_from_kafka.interceptors.i1.type = timestamp flumeagent1.sources.source_from_kafka.consumer.timeout.ms = 1000 # Hive Sink flumeagent1.sinks.hive_sink.type = hive flumeagent1.sinks.hive_sink.hive.metastore = thrift://sandbox.hortonworks.com:9083 flumeagent1.sinks.hive_sink.hive.database = raj flumeagent1.sinks.hive_sink.hive.table = customers flumeagent1.sinks.hive_sink.hive.txnsPerBatchAsk = 2 flumeagent1.sinks.hive_sink.hive.partition = %y-%m-%d-%H-%M flumeagent1.sinks.hive_sink.batchSize = 10 flumeagent1.sinks.hive_sink.serializer = DELIMITED flumeagent1.sinks.hive_sink.serializer.delimiter = , flumeagent1.sinks.hive_sink.serializer.fieldnames = id,name,email,street_address,company # Use a channel which buffers events in memory flumeagent1.channels.mem_channel.type = memory flumeagent1.channels.mem_channel.capacity = 10000 flumeagent1.channels.mem_channel.transactionCapacity = 100 # Bind the source and sink to the channel flumeagent1.sources.source_from_kafka.channels = mem_channel flumeagent1.sinks.hive_sink.channel = mem_channel
5.啟用Flume代理
通過以下指令啟用Flume代理:
$ /usr/hdp/apache-flume-1.6.0/bin/flume-ng agent -n flumeagent1 -f ~/streamingdemo/flume/conf/flumetohive.conf
6.啟用Kafka流
作為示例下面是一個模擬交易的消息集,這在實際系統中需要通過源數據庫才能生成。例如,以下可能來自Oracle流,在回放被提交到數據庫的SQL交易數據,也可能來自GoldenGate。
$ cd /usr/hdp/2.4.0.0-169/kafka $ bin/kafka-console-producer.sh --broker-list sandbox.hortonworks.com:6667 --topic SalesDBTransactions 1,"Nero Morris","porttitor.interdum@Sedcongue.edu","P.O. Box 871, 5313 Quis Ave","Sodales Company" 2,"Cody Bond","ante.lectus.convallis@antebibendumullamcorper.ca","232-513 Molestie Road","Aenean Eget Magna Incorporated" 3,"Holmes Cannon","a@metusAliquam.edu","P.O. Box 726, 7682 Bibendum Rd.","Velit Cras LLP" 4,"Alexander Lewis","risus@urna.edu","Ap #375-9675 Lacus Av.","Ut Aliquam Iaculis Inc." 5,"Gavin Ortiz","sit.amet@aliquameu.net","Ap #453-1440 Urna. St.","Libero Nec Ltd" 6,"Ralph Fleming","sociis.natoque.penatibus@quismassaMauris.edu","363-6976 Lacus. St.","Quisque Fringilla PC" 7,"Merrill Norton","at.sem@elementum.net","P.O. Box 452, 6951 Egestas. St.","Nec Metus Institute" 8,"Nathaniel Carrillo","eget@massa.co.uk","Ap #438-604 Tellus St.","Blandit Viverra Corporation" 9,"Warren Valenzuela","tempus.scelerisque.lorem@ornare.co.uk","Ap #590-320 Nulla Av.","Ligula Aliquam Erat Incorporated" 10,"Donovan Hill","facilisi@augue.org","979-6729 Donec Road","Turpis In Condimentum Associates" 11,"Kamal Matthews","augue.ut@necleoMorbi.org","Ap #530-8214 Convallis, St.","Tristique Senectus Et Foundation"
7.接收Hive數據
如果上面所有的步驟都完成了,那么現在就可以從Kafka發送數據,可以看到數據流在幾秒鐘內就會被發送到Hive表。
來自:http://www.thebigdata.cn/Hadoop/31071.html