怎樣利用Spark Streaming和Hadoop實現近實時的會話連接
這個 Spark Streaming 樣例是一個可持久化到Hadoop近實時會話的很好的例子。
Spark Streaming 是Apache Spark 中最有趣的組件之一。你用Spark Streaming可以創建數據管道來用批量加載數據一樣的API處理流式數據。此外,Spark Steaming的“micro-batching”方式提供相當好的彈性來應對一些原因造成的任務失敗。
在這篇文章中,我將通過網站的事件近實時回話的例子演示使你熟悉一些常見的和高級的Spark Streaming功能,然后加載活動有關的統計數據到Apache HBase,用不喜歡的BI用具來繪圖分析。 (Sessionization指的是捕獲的單一訪問者的網站會話時間范圍內所有點擊流活動。)你可以在這里找到了這個演示的代碼。
像這樣的系統對于了解訪問者的行為(無論是人還是機器)是超級有用的。通過一些額外的工作它也可以被設計成windowing模式來以異步方式檢測可能的欺詐。
Spark Streaming 代碼
我們的例子中的main class是:
com.cloudera.sa.example.sparkstreaming.sessionization.SessionizeData
讓我們來看看這段代碼段(忽略1-59行,其中包含imports 和其他無聊的東西)。
60到112行:設置Spark Streaming 這些行是非常基本的,用來設置的Spark Streaming,同時可以選擇從HDFS或socket接收數據流。如果你在Spark Streaming方面是一個新手,我已經添加了一些詳細的注釋幫助理解代碼。 (我不打算在這里詳談,因為仍然在樣例代碼里。)
//This is just creating a Spark Config object. I don't do much here but //add the app name. There are tons of options to put into the Spark config, //but none are needed for this simple example. val sparkConf = new SparkConf(). setAppName("SessionizeData " + args(0)). set("spark.cleaner.ttl", "120000") //These two lines will get us out SparkContext and our StreamingContext. //These objects have all the root functionality we need to get started. val sc = new SparkContext(sparkConf) val ssc = new StreamingContext(sc, Seconds(10)) //Here are are loading our HBase Configuration object. This will have //all the information needed to connect to our HBase cluster. //There is nothing different here from when you normally interact with HBase. val conf = HBaseConfiguration.create(); conf.addResource(new Path("/etc/hbase/conf/core-site.xml")); conf.addResource(new Path("/etc/hbase/conf/hbase-site.xml")); //This is a HBaseContext object. This is a nice abstraction that will hide //any complex HBase stuff from us so we can focus on our business case //HBaseContext is from the SparkOnHBase project which can be found at // https://github.com/tmalaska/SparkOnHBase val hbaseContext = new HBaseContext(sc, conf); //This is create a reference to our root DStream. DStreams are like RDDs but //with the context of being in micro batch world. I set this to null now //because I later give the option of populating this data from HDFS or from //a socket. There is no reason this could not also be populated by Kafka, //Flume, MQ system, or anything else. I just focused on these because //there are the easiest to set up. var lines: DStream[String] = null //Options for data load. Will be adding Kafka and Flume at some point if (args(0).equals("socket")) { val host = args(FIXED_ARGS); val port = args(FIXED_ARGS + 1); println("host:" + host) println("port:" + Integer.parseInt(port)) //Simple example of how you set up a receiver from a Socket Stream lines = ssc.socketTextStream(host, port.toInt) } else if (args(0).equals("newFile")) { val directory = args(FIXED_ARGS) println("directory:" + directory) //Simple example of how you set up a receiver from a HDFS folder lines = ssc.fileStream[LongWritable, Text, TextInputFormat](directory, (t: Path) => true, true).map(_._2.toString) } else { throw new RuntimeException("bad input type") }
114到124行: 字符串解析 這里是Spark Streaming的開始的地方. 請看下面四行::
val ipKeyLines = lines.map[(String, (Long, Long, String))](eventRecord => { //Get the time and ip address out of the original event val time = dateFormat.parse( eventRecord.substring(eventRecord.indexOf('[') + 1, eventRecord.indexOf(']'))). getTime() val ipAddress = eventRecord.substring(0, eventRecord.indexOf(' ')) //We are return the time twice because we will use the first at the start time //and the second as the end time (ipAddress, (time, time, eventRecord)) })
上面第一命令是在DSTREAM對象“lines”上進行了map函數和,解析原始事件來分離出的IP地址,時間戳和事件的body。對于那些Spark Streaming的新手,一個DSTREAM保存著要處理的一批記錄。這些記錄由以前所定義的receiver對象填充,并且此map函數在這個micro-batch內產生另一個DSTREAM存儲變換后的記錄來進行額外的處理。
當看像上面的Spark Streaming示意圖時,有一些事情要注意::
- 每個micro-batch在到達構建StreamingContext時設定的那一秒時被銷毀
- Receiver總是用被下一個micro-batch中的RDDS填充
- 之前micro batch中老的RDDs將被清理丟棄
126到135行:產生Sessions 現在,我們有從網絡日志中獲得的IP地址和時間,是時候建立sessions了。下面的代碼是通過micro-batch內的第一聚集事件建立session,然后在DSTREAM中reduce這些會話。
val latestSessionInfo = ipKeyLines. map[(String, (Long, Long, Long))](a => { //transform to (ipAddress, (time, time, counter)) (a._1, (a._2._1, a._2._2, 1)) }). reduceByKey((a, b) => { //transform to (ipAddress, (lowestStartTime, MaxFinishTime, sumOfCounter)) (Math.min(a._1, b._1), Math.max(a._2, b._2), a._3 + b._3) }). updateStateByKey(updateStatbyOfSessions)
這里有一個關于records如何在micro-batch中被reduce的例子:
在會話范圍內的 micro-batch 內加入,我們可以用超酷的updateStateByKey功能(做join/reduce-like操作)下圖說明了就DStreams而言,隨著時間變化這個處理過程是怎樣的。
現在,讓我們深入到updateStatbyOfSessions函數,它被定義在文件的底部。此代碼(注意詳細注釋)含有大量的魔法,使sessionization發生在micro-batch的連續模式中。
/** * This function will be called for to union of keys in the Reduce DStream * with the active sessions from the last micro batch with the ipAddress * being the key * * To goal is that this produces a stateful RDD that has all the active * sessions. So we add new sessions and remove sessions that have timed * out and extend sessions that are still going */ def updateStatbyOfSessions( //(sessionStartTime, sessionFinishTime, countOfEvents) a: Seq[(Long, Long, Long)], //(sessionStartTime, sessionFinishTime, countOfEvents, isNewSession) b: Option[(Long, Long, Long, Boolean)] ): Option[(Long, Long, Long, Boolean)] = { //This function will return a Optional value. //If we want to delete the value we can return a optional "None". //This value contains four parts //(startTime, endTime, countOfEvents, isNewSession) var result: Option[(Long, Long, Long, Boolean)] = null // These if statements are saying if we didn't get a new event for //this session's ip address for longer then the session //timeout + the batch time then it is safe to remove this key value //from the future Stateful DStream if (a.size == 0) { if (System.currentTimeMillis() - b.get._2 > SESSION_TIMEOUT + 11000) { result = None } else { if (b.get._4 == false) { result = b } else { result = Some((b.get._1, b.get._2, b.get._3, false)) } } } //Now because we used the reduce function before this function we are //only ever going to get at most one event in the Sequence. a.foreach(c => { if (b.isEmpty) { //If there was no value in the Stateful DStream then just add it //new, with a true for being a new session result = Some((c._1, c._2, c._3, true)) } else { if (c._1 - b.get._2 < SESSION_TIMEOUT) { //If the session from the stateful DStream has not timed out //then extend the session result = Some(( Math.min(c._1, b.get._1), //newStartTime Math.max(c._2, b.get._2), //newFinishTime b.get._3 + c._3, //newSumOfEvents false //This is not a new session )) } else { //Otherwise remove the old session with a new one result = Some(( c._1, //newStartTime c._2, //newFinishTime b.get._3, //newSumOfEvents true //new session )) } } }) result } }
在這段代碼做了很多事,而且通過很多方式,這是整個工作中最復雜的部分。總之,它跟蹤活動的會話,所以你知道你是繼續現有的會話還是啟動一個新的。
126到207行:計數和HBase 這部分做了大多數計數工作。在這里有很多是重復的,讓我們只看一個count的例子,然后一步步地我們把生成的同一個記錄counts存儲在HBase中。
val onlyActiveSessions = latestSessionInfo.filter(t => System.currentTimeMillis() - t._2._2 < SESSION_TIMEOUT) … val newSessionCount = onlyActiveSessions.filter(t => { //is the session newer then that last micro batch //and is the boolean saying this is a new session true (System.currentTimeMillis() - t._2._2 > 11000 && t._2._4) }). count. map[HashMap[String, Long]](t => HashMap((NEW_SESSION_COUNTS, t)))
總之,上面的代碼是過濾除了活動的會話其他所有會話,對他們進行計數,并把該最終計記錄到一個的HashMap實例中。它使用HashMap作為容 器,所以在所有的count做完后,我們可以調用下面的reduce函數把他們都到一個單一的記錄。 (我敢肯定有更好的方法來實現這一點,但這種方法工 作得很好。)
接下來,下面的代碼處理所有的那些HashMap,并把他們所有的值在一個HashMap中。
val allCounts = newSessionCount. union(totalSessionCount). union(totals). union(totalEventsCount). union(deadSessionsCount). union(totalSessionEventCount). reduce((a, b) => b ++ a)
用HBaseContext來使Spark Streaming與HBase交互超級簡單。所有你需要做的就是用HashMap和函數將其轉換為一個put對象提供給DSTREAM。
hbaseContext.streamBulkPut[HashMap[String, Long]]( allCounts, //The input RDD hTableName, //The name of the table we want to put too (t) => { //Here we are converting our input record into a put //The rowKey is C for Count and a backward counting time so the newest //count show up first in HBase's sorted order val put = new Put(Bytes.toBytes("C." + (Long.MaxValue - System.currentTimeMillis()))) //We are iterating through the HashMap to make all the columns with their counts t.foreach(kv => put.add(Bytes.toBytes(hFamily), Bytes.toBytes(kv._1), Bytes.toBytes(kv._2.toString))) put }, false)
現在,HBase的這些信息可以用Apache Hive table包起來,然后通過你喜歡的BI工具執行一個查詢來獲取像下面這樣的圖,它每次micro-batch會刷新。
209到215行:寫入HDFS 最后的任務是把擁有事件數據的活動會話信息加入,然后把事件以會話的開始時間來持久化到HDFS。
//Persist to HDFS ipKeyLines.join(onlyActiveSessions). map(t => { //Session root start time | Event message dateFormat.format(new Date(t._2._2._1)) + "t" + t._2._1._3 }). saveAsTextFiles(outputDir + "/session", "txt")
結論
我希望你跳出這個例子 來走像了很多工作,感覺與代碼只是一點點做,因為它是。想象一下你還可以用這種模式和Spark Streaming與HBase HDFS很容易交互的這種能力做什么東西。