讓數據告訴你未來:Spark Streaming+Kudu+Impala構建預測引擎

ususking 8年前發布 | 35K 次閱讀 Spark Impala 分布式/云計算/大數據

這篇文章將介紹基于流式API數據來演示如何預測資源需求變化來調整資源分配。

隨著用戶使用天數的增加,不管你的業務是擴大還是縮減了,為什么你的大數據中心架構保持線性增長的趨勢?很明顯需要一個穩定的基本架構來保障你的業務線。當你的客戶處在休眠期,或者你的業務處在淡季,你增加的計算資源就處在浪費階段;相對應地,當你的業務在旺季期,或者每周一每個人對上周的數據進行查詢分析,有多少次你忒想擁有額外的計算資源。

根據需求水平動態分配資源 VS 固定的資源分配方式,似乎不太好實現。幸運的是,借助于現今強大的開源技術,可以很輕松的實現你所愿。

我們旨在用流式回歸模型預測接下來十分鐘的海量事件數據,并與傳統批處理的方法預測的結果進行對比。這個預測結果可用來動態規劃計算機資源,或者業務優化。傳統的批處理方法預測采用Impala和Spark兩種方法,動態預測使用Spark Streaming。 

任何預測的起點是基于海量歷史數據和實時更新的數據來預測未來的數據業務。流式API提供穩定的流失RSVP數據,用來預測未來一段時間RSVP數據。

1.動態資源分配預測架構圖

這個例子的數據通過流式API進入Kafka,然后使用Spark Streaming從Kafka加載數據到Kudu。Kafka允許數據同時進入兩個獨立的Spark Streaming作業:一個用來進行特征工程;一個用來使用MLlib進行流式預測。預測的結果存儲在Kudu中,我們也可以使用Impala或者Spark SQL進行交互式查詢,見圖1。

你可能急切想知道我的技術選型,下面是一些技術概要:

Kafka:Kafka可抽象數據輸入,支持擴展,并耦合Spark Streaming框架。Kafka擁有每秒處理百萬事件的擴展能力,并能和其他各項技術集成,比如,Spark Streaming。

Spark Streaming:Spark Streaming能夠處理復雜的流式事件,并且采用Scala編程僅需簡單的幾行代碼即可,也支持Java、Python或者R語言。Spark Streaming提供和Kafka、MLlib(Spark的機器學習庫)的集成。 

Apache Kudu:Kudu支持事件的增量插入,它旨在提供一種基于HDFS(HDFS優勢在于大數據存儲下的快速掃描能力)和HBase(HBase優勢是基于主鍵的快速插入/查詢)之間超存儲層。本項目可以采用HBase或者Cassandra,但Kudu為數據分析提供了快速的掃描能力、列式存儲架構。

Impala:使用Impala可很容易的即席查詢。它提供一個查詢引擎直接查詢加載到Kudu上的數據,并能理解生成模型。作為可選的方案可使用Spark SQL,但這里為了比較使用MADlib庫訓練的回歸模型和使用Saprk MLlib訓練的模型,故用Impala。

2.構建實例

現在解釋下架構的選擇,詳細細節如下: 

首先,粗略瀏覽一下流式數據源。通過Kafka來監測文件,tail文件變化發送到Kafka,部分代碼見Github。下面給出RSVP內容樣例:

{"response":"yes","member":{"member_name":"Richard 

Williamson","photo":"http:\/\/photos3.meetupstatic.com\/photos\/member\/d\/a\/4\/0\/thu

mb_231595872.jpeg","member_id":29193652},"visibility":"public","event":

{"time":1424223000000,"event_url":"http:\/\/www.meetup.com\/Big-Data-

Science\/events\/217322312\/","event_id":"fbtgdlytdbbc","event_name":"Big Data Science 

@Strata Conference, 

2015"},"guests":0,"mtime":1424020205391,"rsvp_id":1536654666,"group":{"group_name":"Big 

Data 

Science","group_state":"CA","group_city":"Fremont","group_lat":37.52,"group_urlname":"Big-

Data-Science","group_id":3168962,"group_country":"us","group_topics":

[{"urlkey":"data-visualization","topic_name":"Data Visualization"},{"urlkey":"data-

mining","topic_name":"Data Mining"},{"urlkey":"businessintell","topic_name":"Business 

Intelligence"},{"urlkey":"mapreduce","topic_name":"MapReduce"},

{"urlkey":"hadoop","topic_name":"Hadoop"},{"urlkey":"opensource","topic_name":"Open 

Source"},{"urlkey":"r-project-for-statistical-computing","topic_name":"R Project for Statistical 

Computing"},{"urlkey":"predictive-analytics","topic_name":"Predictive Analytics"},

{"urlkey":"cloud-computing","topic_name":"Cloud Computing"},{"urlkey":"big-

data","topic_name":"Big Data"},{"urlkey":"data-science","topic_name":"Data Science"},

{"urlkey":"data-analytics","topic_name":"Data Analytics"},

{"urlkey":"hbase","topic_name":"HBase"},

{"urlkey":"hive","topic_name":"Hive"}],"group_lon":-121.93},"venue":

{"lon":-121.889122,"venue_name":"San Jose Convention Center, Room 

210AE","venue_id":21805972,"lat":37.330341}}

一旦Kafka運行起來,數據從Kafka經過Spark Streaming進入Kudu,代碼見這里。

流式作業在Kudu上初始化一個表,接著運行Spark Streaming加載數據到數據表。你可以創建一個Impala外部表,并指向Kudu上存儲的數據。

CREATE EXTERNAL TABLE `kudu_meetup_rsvps` (

`event_id` STRING,

`member_id` INT,

`rsvp_id` INT,

`event_name` STRING,

`event_url` STRING,

`TIME` BIGINT,

`guests` INT,

`member_name` STRING,

`非死book_identifier` STRING,

`linkedin_identifier` STRING,

`推ter_identifier` STRING,

`photo` STRING,

`mtime` BIGINT,

`response` STRING,

`lat` DOUBLE,

`lon` DOUBLE,

`venue_id` INT,

`venue_name` STRING,

`visibility` STRING

)

TBLPROPERTIES(

'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler',

'kudu.table_name' = 'kudu_meetup_rsvps',

'kudu.master_addresses' = 'quickstart.cloudera:7051',

'kudu.key_columns' = 'event_id, member_id, rsvp_id'

);

緊接著用Impala表查詢獲得小時RSVP數據:

create 

table   rsvps_by_hour as

select  from_unixtime(cast(mtime/1000 as bigint), "yyyy-MM-dd") as mdate 

,cast(from_unixtime(cast(mtime/1000 as bigint), "HH") as int) as mhour 

,count(*) as rsvp_cnt

from    kudu_meetup_rsvps

group 

by      1,2

有了RSVP數據后可以畫隨時間的變化圖,見圖2: 

接著可以進行特征工程,為了后續可以直接用Impala建立預測模型:

create 

table rsvps_by_hr_training as

select

case when mhour=0 then 1 else 0 end as hr0

,case when mhour=1 then 1 else 0 end as hr1

,case when mhour=2 then 1 else 0 end as hr2

,case when mhour=3 then 1 else 0 end as hr3

,case when mhour=4 then 1 else 0 end as hr4

,case when mhour=5 then 1 else 0 end as hr5

,case when mhour=6 then 1 else 0 end as hr6

,case when mhour=7 then 1 else 0 end as hr7

,case when mhour=8 then 1 else 0 end as hr8

,case when mhour=9 then 1 else 0 end as hr9

,case when mhour=10 then 1 else 0 end as hr10

,case when mhour=11 then 1 else 0 end as hr11

,case when mhour=12 then 1 else 0 end as hr12

,case when mhour=13 then 1 else 0 end as hr13

,case when mhour=14 then 1 else 0 end as hr14

,case when mhour=15 then 1 else 0 end as hr15

,case when mhour=16 then 1 else 0 end as hr16

,case when mhour=17 then 1 else 0 end as hr17

,case when mhour=18 then 1 else 0 end as hr18

,case when mhour=19 then 1 else 0 end as hr19

,case when mhour=20 then 1 else 0 end as hr20

,case when mhour=21 then 1 else 0 end as hr21

,case when mhour=22 then 1 else 0 end as hr22

,case when mhour=23 then 1 else 0 end as hr23

,case when mdate in ("2015-02-14","2015-02-15") then 1 else 0 end as weekend_day

,mdate

,mhour

,rsvp_cnt

from  rsvps_by_hour;

在Impala上安裝MADlib,這樣就可以直接在Impala上構建回歸模型。

采用MADlib訓練回歸模型的第一步:

select  printarray(linr(toarray(hr0,hr1,hr2,hr3,hr4,hr5,hr6,hr7,hr8,hr9,hr10,hr11,hr12,hr13,hr14, hr15,hr16,hr17,hr18,hr19,hr20,hr21,hr22,hr23,weekend_day), rsvp_cnt))

from    rsvps_by_hr_training;

下面展示回歸系數。你可看到前面的24個系數顯示了一天的按小時趨勢,在晚上很少的人在線;最后一個系數是周末,如果是周末的話,系數是負值。 

Feature Coefficient 

hr0 8037.43 

hr1 7883.93 

hr2 7007.68 

hr3 6851.91 

hr4 6307.91 

hr5 5468.24 

hr6 4792.58 

hr7 4336.91 

hr8 4330.24 

hr9 4360.91 

hr10 4373.24 

hr11 4711.58 

hr12 5649.91 

hr13 6752.24 

hr14 8056.24 

hr15 9042.58 

hr16 9761.37 

hr17 10205.9 

hr18 10365.6 

hr19 10048.6 

hr20 9946.12 

hr21 9538.87 

hr22 9984.37 

hr23 9115.12 

weekend_day -2323.73

通過上述系數進行預測:

select       mdate,

mhour,

cast(linrpredict(toarray(8037.43, 7883.93, 7007.68, 6851.91, 6307.91, 5468.24, 4792.58, 4336.91, 4330.24, 4360.91, 4373.24, 4711.58, 5649.91, 6752.24, 8056.24, 9042.58, 9761.37, 10205.9, 10365.6, 10048.6, 9946.12, 9538.87, 9984.37, 9115.12, -2323.73), toarray(hr0, hr1, hr2, hr3, hr4, hr5, hr6, hr7, hr8, hr9, hr10, hr11, hr12, hr13, hr14, hr15, hr16, hr17, hr18, hr19, hr20, hr21, hr22, hr23, weekend_day)) as int) as rsvp_cnt_pred,

rsvp_cnt

from         rsvps_by_hr_testing

圖3 按小時對比預測數據和RSVP真實值,由于數據有限,只列出兩天的預測。 

3.使用Spark MLlib訓練模型

下面使用Spark MLlib建立類似的模型,在海量數據下這種方式更優吸引力。 

首先,Spark加載JSON文件并使用Spark SQL注冊為一張表。你也可以直接從Kudu加載數據,但此列子直接用Spark讀取JSON文件。

val path = "/home/demo/meetupstream1M.json"

val meetup = sqlContext.read.json(path)

meetup.registerTempTable("meetup")

你可以使用Spark SQL運行一個類似在前面Impala中使用的查詢語句來獲取小時的RSVP數據:

val meetup2 = sqlContext.sql("

select from_unixtime(cast(mtime/1000 as bigint), 'yyyy-MM-dd') as dy,

case when from_unixtime(cast(mtime/1000 as bigint),'yyyy-MM-dd') in ('2015-02-14','2015-02-15') then 1 else 0 end as weekend_day,

from_unixtime(cast(mtime/1000 as bigint), 'HH') as hr,

count(*) as rsvp_cnt

from  meetup

where from_unixtime(cast(mtime/1000 as bigint), 'yyyy-MM-dd') >= '2015-10-30'

group

by    from_unixtime(cast(mtime/1000 as bigint), 'yyyy-MM-dd'),

from_unixtime(cast(mtime/1000 as bigint), 'HH')")

接下來,創建特征向量。你可以參照前面類的方法做特征工程,但這里介紹一個Andrew Ray的簡便方法,使用一句話即可實現特征向量:

val meetup3 = meetup2.groupBy("dy","weekend_day","hr","rsvp_cnt").pivot("hr").count().orderBy("dy")

現在有了這些數據,可以訓練回歸模型了:

import org.apache.spark.mllib.regression.RidgeRegressionWithSGD

import org.apache.spark.mllib.linalg.Vectors

import org.apache.spark.mllib.regression.LabeledPoint

val trainingData = meetup3.map { row =>

val features = Array[Double](1.0,row(1).toString().toDouble,row(4).toString().toDouble, 

row(5).toString().toDouble,row(6).toString().toDouble,

row(7).toString().toDouble,row(8).toString().toDouble,

row(9).toString().toDouble,row(10).toString().toDouble, 

row(11).toString().toDouble,row(12).toString().toDouble, 

row(13).toString().toDouble,row(14).toString().toDouble, 

row(15).toString().toDouble,row(16).toString().toDouble,

row(17).toString().toDouble,row(18).toString().toDouble,

row(19).toString().toDouble,row(20).toString().toDouble, 

row(21).toString().toDouble,row(22).toString().toDouble, 

row(23).toString().toDouble,row(24).toString().toDouble, 

row(25).toString().toDouble,row(26).toString().toDouble, 

row(27).toString().toDouble)

LabeledPoint(row(3).toString().toDouble, Vectors.dense(features))

}

trainingData.cache()

val model = new RidgeRegressionWithSGD().run(trainingData)

得到一個新的數據集評分,

val scores = meetup3.map { row =>

val features = Vectors.dense(Array[Double](1.0,row(1).toString().toDouble, 

row(4).toString().toDouble,row(5).toString().toDouble, 

row(6).toString().toDouble,row(7).toString().toDouble,

row(8).toString().toDouble,row(9).toString().toDouble,

row(10).toString().toDouble,row(11).toString().toDouble, 

row(12).toString().toDouble,row(13).toString().toDouble,

row(14).toString().toDouble,row(15).toString().toDouble,

row(16).toString().toDouble,row(17).toString().toDouble,

row(18).toString().toDouble,row(19).toString().toDouble,

row(20).toString().toDouble,row(21).toString().toDouble, 

row(22).toString().toDouble,row(23).toString().toDouble,

row(24).toString().toDouble,row(25).toString().toDouble, 

row(26).toString().toDouble,row(27).toString().toDouble))

(row(0),row(2),row(3), model.predict(features)) 

}

scores.foreach(println)

圖4描述Spark模型結果和真實RSVP數據的對比。 

4.使用Spark Streaming建立回歸模型

前面的兩個例子展示了我們如何基于批處理數據構建模型和即席查詢,現在開始建立一個Spark Streaming回歸模型。使用流式的方法建立模型使得我們可以更頻繁的更新模型,獲取最新的數據,預測也更準確。

這里可能和批處理的方法稍有不同。為了展示使用流式回歸模型,這里簡單的使用每分鐘的RSVP數據(替代前面批量預測中按小時處理)來生成連續的流數據來預測接下來的十分鐘內的數據。 

首先,使用Kafka來輸入數據,代碼見這里。這部分代碼簡單的設置Kafka為輸入源,設置topic、broker list和Spark Streaming作為輸入參數,它可以連接Kafka并獲取數據。

def loadDataFromKafka(topics: String,

brokerList: String,

ssc: StreamingContext): DStream[String] = {

val topicsSet = topics.split(",").toSet

val kafkaParams = Map[String, String]("metadata.broker.list" -> brokerList)

val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)

messages.map(_._2)

}

val dstream = loadDataFromKafka(topics, brokerList, ssc)

對DStream進行transform操作獲得RSVP值:

val stream = dstream.transform { rdd =>

val parsed1 = sqlContext.read.json(rdd)

parsed1.registerTempTable("parsed1")

val parsed2 = sqlContext.sql("

select  m,

cnt,

mtime

from    (select   (round(mtime/60000)-(" + current_time + "/60000 ))/1000.0 as m,

count(*) as cnt,

round(mtime/60000) as mtime

from      (select distinct * from parsed1) a

group

by        (round(mtime/60000)-(" + current_time + "/60000 ))/1000.0,

round(mtime/60000) ) aa

where   cnt > 20

")

parsed2.rdd

}

stream.print()

轉換數據結構來訓練模型:一個數據流為訓練數據,actl_stream;另一個數據流用來預測,pred_stream。預測數據流為當前訓練數據流時刻的下一個10分鐘時間間隔。

val actl_stream = stream.map(x => 

LabeledPoint(x(1).toString.toDouble,Vectors.dense(Array(1.0,x(0).toString.toDouble))) ).cache()

actl_stream.print()

val pred_stream = stream.map(x => 

LabeledPoint((x(2).toString.toDouble+10)*60000,Vectors.dense(Array(1.0,x(0).toString.toDouble))) )

pred_stream.print()

用時間間隔的數據作為特征訓練流式模型,這里的場景非常簡單,只是為了說明問題。實際的產品模型需要結合前面討論的按天和周末的模型來提高預測的準確性。

val numFeatures = 2

val model = new StreamingLinearRegressionWithSGD().setInitialWeights(Vectors.zeros(numFeatures)

model.trainOn(actl_stream)

最后,應用預測模型對下一個時間間隔的數據進行預測:

val rslt_stream = model.predictOnValues(pred_stream.map(lp => (lp.label, lp.features)))

rslt_stream.print()

圖5為流式模型預測的結果。 

如你所見,假如我們利用最近十分鐘的RSVP數據,可以更好的預測接下來的十分鐘左右的數據。將來為了更好的預測需要考慮增加更多的特征來提高模型的健壯性。預測的結果流式的寫入Kudu,使用API可以很容易的使用這些預測數據來自動的分配資源。

 

來自: http://www.dataguru.cn/article-9343-1.html

 

 本文由用戶 ususking 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!