海量數據實時OLAP分析系統-Druid.io安裝配置和體驗
關鍵字:druid、druid.io、實時olap、大數據實時分析
導讀:
一、Druid簡介
二、Druid架構組成及相關依賴
三、Druid集群配置
四、Druid集群啟動
五、Druid查詢
六、后記
一、Druid簡介
Druid是一個為大型冷數據集上實時探索查詢而設計的開源數據分析和存儲系統,提供極具成本效益并且永遠在線的實時數據攝取和任意數據處理。
主要特性:
- 為分析而設計——Druid是為OLAP工作流的探索性分析而構建。它支持各種filter、aggregator和查詢類型,并為添加新功能提供了一個框架。用戶已經利用Druid的基礎設施開發了高級K查詢和直方圖功能。
- 交互式查詢——Druid的低延遲數據攝取架構允許事件在它們創建后毫秒內查詢,因為Druid的查詢延時通過只讀取和掃描有必要的元素被優化。Aggregate和 filter沒有坐等結果。
- 高可用性——Druid是用來支持需要一直在線的SaaS的實現。你的數據在系統更新時依然可用、可查詢。規模的擴大和縮小不會造成數據丟失。
- 可伸縮——現有的Druid部署每天處理數十億事件和TB級數據。Druid被設計成PB級別。
就系統而言,Druid功能位于PowerDrill和Dremel之間。它實現幾乎所有Dremel提供的工具(Dremel處理任意嵌套數據結構,而Druid只允許一個基于數組的嵌套級別)并且從PowerDrill吸收一些有趣的數據格式和壓縮方法。
Druid對于需要實時單一、海量數據流攝取產品非常適合。特別是如果你面向無停機操作時,如果你對查詢查詢的靈活性和原始數據訪問要求,高于對速度和無停機操作,Druid可能不是正確的解決方案。在談到查詢速度時候,很有必要澄清“快速”的意思是:Druid是完全有可能在6TB的數據集上實現秒級查詢。
二、Druid架構組成及其他依賴
2.1 Overlord Node (Indexing Service)
Overlord會形成一個加載批處理和實時數據到系統中的集群,同時會對存儲在系統中的數據變更(也稱為索引服務)做出響應。另外,還包含了Middle Manager和Peons,一個Peon負責執行單個task,而Middle Manager負責管理這些Peons。
2.2 Coordinator Node
監控Historical節點組,以確保數據可用、可復制,并且在一般的“最佳”配置。它們通過從MySQL讀取數據段的元數據信息,來決定哪些數據段應該在集群中被加載,使用Zookeeper來確定哪個Historical節點存在,并且創建Zookeeper條目告訴Historical節點加載和刪除新數據段。
2.3 Historical Node
是對“historical”數據(非實時)進行處理存儲和查詢的地方。Historical節點響應從Broker節點發來的查詢,并將結果返回給broker節點。它們在Zookeeper的管理下提供服務,并使用Zookeeper監視信號加載或刪除新數據段。
2.4 Broker Node
接收來自外部客戶端的查詢,并將這些查詢轉發到Realtime和Historical節點。當Broker節點收到結果,它們將合并這些結果并將它們返回給調用者。由于了解拓撲,Broker節點使用Zookeeper來確定哪些Realtime和Historical節點的存在。
2.5 Real-time Node
實時攝取數據,它們負責監聽輸入數據流并讓其在內部的Druid系統立即獲取,Realtime節點同樣只響應broker節點的查詢請求,返回查詢結果到broker節點。舊數據會被從Realtime節點轉存至Historical節點。
2.6 ZooKeeper
為集群服務發現和維持當前的數據拓撲而服務;
2.7 MySQL
用來維持系統服務所需的數據段的元數據;
2.8 Deep Storage
保存“冷數據”,可以使用HDFS。
三、Druid集群配置
3.1 環境信息
我這里有兩臺機器,node1有32G內存,上面部署了Histotical Node和Coordinator Node;node2有72G內存,上面部署了其他四個服務。
3.2 通用配置(Common Configuration)
##創建MySQL數據庫
CREATE DATABASE `druid` DEFAULT CHARACTER SET utf8 COLLATE utf8_general_ci;
grant all on druid.* to druid@’%’ identified by ‘druid1234′ WITH GRANT OPTION;
flush privileges;
##配置文件
cd $DRUID_HOME/config/_common
vi common.runtime.properties(所有節點)
##使用Mysql存儲元數據 druid.extensions.coordinates=["io.druid.extensions:druid-examples","io.druid.extensions:druid-kafka-eight", "io.druid.extensions:mysql-metadata-storage"] ##zookeeper druid.zk.service.host=zkNode1:2181,zkNode2:2181,zkNode3:2181 ##Mysql配置 druid.metadata.storage.type=mysql druid.metadata.storage.connector.connectURI=jdbc:mysql://node1:3306/druid druid.metadata.storage.connector.user=druid druid.metadata.storage.connector.password=diurd1234 ##配置deep storage到HDFS druid.storage.type=hdfs druid.storage.storageDirectory=hdfs://cdh5/tmp/druid/storage ##配置查詢緩存,暫用本地,可配置memcached druid.cache.type=local druid.cache.sizeInBytes=10737418240 ##配置監控 druid.monitoring.monitors=["com.metamx.metrics.JvmMonitor"] ##配置Indexing service的名字 druid.selectors.indexing.serviceName=druid/overlord ## druid.emitter=logging
3.3 Overlord Node(Indexing Service)
在運行Overlord Node節點上:
cd $DRUID_HOME/config/overlord
vi runtime.properties
druid.host=node2 druid.port=8090 druid.service=druid/overlord # Only required if you are autoscaling middle managers druid.indexer.autoscale.doAutoscale=true druid.indexer.autoscale.strategy=ec2 druid.indexer.autoscale.workerIdleTimeout=PT90m druid.indexer.autoscale.terminatePeriod=PT5M druid.indexer.autoscale.workerVersion=0 # Upload all task logs to deep storage druid.indexer.logs.type=hdfs druid.indexer.logs.directory=hdfs://cdh5/tmp/druid/indexlog # Run in remote mode druid.indexer.runner.type=remote druid.indexer.runner.minWorkerVersion=0 # Store all task state in the metadata storage druid.indexer.storage.type=metadata
3.4 MiddleManager Node
在運行MiddleManager Node節點上:
cd $DRUID_HOME/config/middleManager
vi runtime.properties
druid.host=node2 druid.port=8091 druid.service=druid/middlemanager druid.indexer.logs.type=hdfs druid.indexer.logs.directory=hdfs://cdh5/tmp/druid/indexlog # Resources for peons druid.indexer.runner.javaOpts=-server -Xmx2g -XX:+UseG1GC -XX:MaxGCPauseMillis=100 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps druid.indexer.task.baseTaskDir=/tmp/persistent/task/
3.5 Coordinator Node
在運行Coordinator Node節點上:
cd $DRUID_HOME/config/coordinator
vi runtime.properties
druid.host=node1 druid.port=8081 druid.service=coordinator druid.coordinator.startDelay=PT5M
3.6 Historical Node
在運行Historical Node節點上:
cd $DRUID_HOME/config/historical
vi runtime.properties
druid.host=node1 druid.port=8082 druid.service=druid/historical druid.historical.cache.useCache=true druid.historical.cache.populateCache=true druid.processing.buffer.sizeBytes=1073741824 druid.processing.numThreads=9 druid.server.http.numThreads=9 druid.server.maxSize=300000000000 druid.segmentCache.locations=[{"path": " /tmp/druid/indexCache", "maxSize": 300000000000}] druid.monitoring.monitors=["io.druid.server.metrics.HistoricalMetricsMonitor", "com.metamx.metrics.JvmMonitor"]
3.7 Broker Node
在運行Broker Node節點上:
cd $DRUID_HOME/config/broker
vi runtime.properties
druid.host=node2 druid.port=8092 druid.service=druid/broker druid.broker.http.numConnections=20 druid.broker.http.readTimeout=PT5M druid.processing.buffer.sizeBytes=2147483647 druid.processing.numThreads=11 druid.server.http.numThreads=20
3.8 Real-time Node
在運行Real-time Node節點上:
cd $DRUID_HOME/config/realtime
vi runtime.properties
druid.host=node2 druid.port=8093 druid.service=druid/realtime druid.processing.buffer.sizeBytes=1073741824 druid.processing.numThreads=5 # Override emitter to print logs about events ingested, rejected, etc druid.emitter=logging druid.monitoring.monitors=["io.druid.segment.realtime.RealtimeMetricsMonitor", "com.metamx.metrics.JvmMonitor"]
四、Druid集群啟動
首次啟動時候,可以遵循下面的啟動順序。
4.1 Broker Node
cd $DRUID_HOME/
cp run_druid_server.sh run_broker.sh
vi run_broker.sh
替換以下內容:
SERVER_TYPE=broker # start process JAVA_ARGS="${JAVA_ARGS} -Xmx10g -Xms5g -XX:NewSize=2g -XX:MaxNewSize=2g -XX:MaxDirectMemorySize=24g -XX:+UseConcMarkSweepGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps" JAVA_ARGS="${JAVA_ARGS} -Duser.timezone=GMT+8 -Dfile.encoding=UTF-8" JAVA_ARGS="${JAVA_ARGS} -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager -Djava.io.tmpdir=/tmp/druid" JAVA_ARGS="${JAVA_ARGS} -Dcom.sun.management.jmxremote.port=17071 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Ddruid.extensions.localRepository=${MAVEN_DIR}"
執行./run_broker.sh啟動Broker Node:
4.2 Historical Node
cd $DRUID_HOME/
cp run_druid_server.sh run_historical.sh
vi run_historical.sh
替換以下內容:
SERVER_TYPE=historical # start process JAVA_ARGS="${JAVA_ARGS} -Xmx10g -Xms10g -XX:NewSize=2g -XX:MaxNewSize=2g -XX:MaxDirectMemorySize=16g -XX:+UseConcMarkSweepGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps" JAVA_ARGS="${JAVA_ARGS} -Duser.timezone=GMT+8 -Dfile.encoding=UTF-8" JAVA_ARGS="${JAVA_ARGS} -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager -Djava.io.tmpdir=/tmp/druid" JAVA_ARGS="${JAVA_ARGS} -Ddruid.extensions.localRepository=${MAVEN_DIR}"
執行命令./run_historical.sh啟動Historical Node:
4.3 Coordinator Node
cd $DRUID_HOME/
cp run_druid_server.sh run_coordinator.sh
vi run_coordinator.sh
替換以下內容:
SERVER_TYPE=coordinator # start process JAVA_ARGS="${JAVA_ARGS} -Xmx10g -Xms10g -XX:NewSize=512m -XX:MaxNewSize=512m -XX:+UseG1GC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps" JAVA_ARGS="${JAVA_ARGS} -Duser.timezone=GMT+8 -Dfile.encoding=UTF-8" JAVA_ARGS="${JAVA_ARGS} -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager -Djava.io.tmpdir=/tmp/druid" JAVA_ARGS="${JAVA_ARGS} -Ddruid.extensions.localRepository=${MAVEN_DIR}"
執行命令./run_coordinator.sh啟動Coordinator Node.
4.4 Middle Manager
cd $DRUID_HOME/
cp run_druid_server.sh run_middleManager.sh
vi run_middleManager.sh
替換以下內容:
SERVER_TYPE=middleManager # start process JAVA_ARGS="${JAVA_ARGS} -Xmx64m -Xms64m -XX:+UseConcMarkSweepGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -Duser.timezone=GMT+8 -Dfile.encoding=UTF-8" JAVA_ARGS="${JAVA_ARGS} -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager -Djava.io.tmpdir=/tmp/druid -Ddruid.extensions.localR epository=${MAVEN_DIR}"
執行命令./run_middleManager.sh啟動MiddleManager Node。
4.5 Overlord Node
cd $DRUID_HOME/
cp run_druid_server.sh run_overlord.sh
vi run_overlord.sh
替換以下內容:
SERVER_TYPE=overlord # start process JAVA_ARGS="${JAVA_ARGS} -Xmx4g -Xms4g -XX:NewSize=256m -XX:MaxNewSize=256m -XX:+UseConcMarkSweepGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps" JAVA_ARGS="${JAVA_ARGS} -Duser.timezone=GMT+8 -Dfile.encoding=UTF-8" JAVA_ARGS="${JAVA_ARGS} -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager -Djava.io.tmpdir=/tmp/druid" JAVA_ARGS="${JAVA_ARGS} -Ddruid.extensions.localRepository=${MAVEN_DIR}"
執行命令./run_overlord.sh啟動Overlord Node:
4.6 Real-time Node
cd $DRUID_HOME/
cp run_druid_server.sh run_realtime.sh
vi run_realtime.sh
替換以下內容:
SERVER_TYPE=realtime # start process JAVA_ARGS="${JAVA_ARGS} -Xmx13g -Xms13g -XX:NewSize=2g -XX:MaxNewSize=2g -XX:MaxDirectMemorySize=9g -XX:+UseConcMarkSweepGC -XX:+PrintGCDetails - XX:+PrintGCTimeStamps -XX:+HeapDumpOnOutOfMemoryError" JAVA_ARGS="${JAVA_ARGS} -Duser.timezone=GMT+8 -Dfile.encoding=UTF-8" JAVA_ARGS="${JAVA_ARGS} -Ddruid.realtime.specFile=/home/liuxiaowen/druid-0.8.1/examples/wikipedia/wikipedia_realtime.spec" JAVA_ARGS="${JAVA_ARGS} -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager -Djava.io.tmpdir=/tmp/druid" JAVA_ARGS="${JAVA_ARGS} -Dcom.sun.management.jmxremote.port=17072 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremot e.ssl=false" JAVA_ARGS="${JAVA_ARGS} -Ddruid.extensions.localRepository=${MAVEN_DIR}"
##特別需要注意參數:
-Ddruid.realtime.specFile=/home/liuxiaowen/druid-0.8.1/examples/wikipedia/wikipedia_realtime.spec
啟動RealTime Node需要指定一個realtime數據源的配置文件,本文中使用example提供的wikipedia_realtime.spec,啟動后,該數據源從irc.wikimedia.org獲取實時數據。
關于RealTime Node的配置,后續文章將會詳細介紹。
執行命令./run_realtime.sh啟動RealTime Node。
五、Druid查詢
第四部分中啟動RealTime Node時候使用了例子中自帶的配置文件wikipedia_realtime.spec,啟動后,該RealTime Node會從irc.wikimedia.org獲取實時數據,本章將以該數據源為例,學習幾種最常見的查詢。
5.1 select查詢
首先編輯查詢配置文件select_query.json
{ "queryType": "select", "dataSource": "wikipedia", "dimensions":[], "metrics":[], "granularity": "all", "intervals": [ "2015-11-01/2015-11-20" ], "pagingSpec":{"pagingIdentifiers": {}, "threshold":10} }
該配置文件的含義是從數據源”wikipedia”進行select查詢所有列,時間區間為2015-11-01/2015-11-20,每10條記錄一個分頁。
執行命令查詢:
curl -X POST ‘http://node2:8093/druid/v2/?pretty’ -H ‘content-type: application/json’ -d @select_query.json
瞬間返回結果:
5.2 基于時間序列的查詢Timeseries query
編輯查詢配置文件timeseries.json
{ "queryType": "timeseries", "dataSource": "wikipedia", "intervals": [ "2010-01-01/2020-01-01" ], "granularity": "minute", "aggregations": [ {"type": "longSum", "fieldName": "count", "name": "edit_count"}, {"type": "doubleSum", "fieldName": "added", "name": "chars_added"} ] }
該配置文件的含義是:從數據源” wikipedia”中進行時間序列查詢,區間為2010-01-01/2020-01-01,按分鐘匯總結果,匯總字段為count和added;
執行查詢命令:
curl -X POST ‘http://node2:8093/druid/v2/?pretty’ -H ‘content-type: application/json’ -d @timeseries.json
同樣瞬間返回結果:
5.3 TopN查詢
編輯查詢文件topn.json
{ "queryType": "topN", "dataSource": "wikipedia", "granularity": "all", "dimension": "page", "metric": "edit_count", "threshold" : 10, "aggregations": [ {"type": "longSum", "fieldName": "count", "name": "edit_count"} ], "filter": { "type": "selector", "dimension": "country", "value": "United States" }, "intervals": ["2012-10-01T00:00/2020-01-01T00"] }
該文件含義是:從數據源” wikipedia”進行TopN查詢,其中N=10,維度為page,指標為edit_count,也就是,在page維度上將edit_count匯總后取Top 10.
執行查詢命令:
curl -X POST ‘http://node2:8093/druid/v2/?pretty’ -H ‘content-type: application/json’ -d @topn.json
結果為:
六、后記
Druid目前已經有很多公司用于實時計算和實時OLAP,而且效果很好。雖然它的配置和查詢都比較復雜和繁瑣,但如果是真正基于海量數據的實時OLAP,它的威力還是很強大的。我將持續學習和分享Druid的相關技術,驗證它在海量數據實時OLAP上的效果,敬請關注我的博客。
參考文章:
http://druid.io
http://www.csdn.net/article/2014-10-30/2822381/2