kafka+spark streaming+redis學習
針對這段時間所學的做了一個簡單的綜合應用,應用的場景為統計一段時間內各個小區的網絡信號覆蓋率,計算公式如下所示:
分子:信號強度大于 35 的采樣點個數
分母:信號強度為非空的所有采樣點個數
網絡覆蓋率=分子/分母
原始數據為 xml 格式,記錄各小區在各時刻的采樣點,采樣時間精確到 ms ,我們需要做的是計算單個小區以小時為間隔的信號覆蓋率。通過簡單的 java 代碼解析 xml 文件,并將解析后的數據通過 kafka 生產者進程發送的 kafka 消息集群中,利用 spark streaming 進行實時處理并將處理結果存入 redis 。下面是數據處理過程
原始數據格式: 小區 ID 信號強度 時間
155058480 49 2015-11-27T00:00:03.285
155058480 33 2015-11-27T00:00:05.000
155058480 空 2015-11-27T00:00:23.285
原始數據處理: 小區 ID 分子 分母 時間
155058480 1 1 2015-11-27T00
155058480 0 1 2015-11-27T00
155058480 0 0 2015-11-27T00
統計合并處理: 小區 ID 分子 分母 時間
155058480 1 2 2015-11-27T00
小區155058480的網絡覆蓋率=分子/分母=1/2=0.5
說明:以小區 155058480 為例的三個采樣點,其中滿足上述分子條件的非空記錄的分子記為為 1 ,不滿足分子條件的非空記錄與空記錄的分子記為 0 ,非空記錄的分母記為 1 。同時對時間進行分割,保留到小時,并以時間個小區 id 為復合主鍵利用 reduceByKey 方法進行累加統計。
下面給出spark streaming代碼:
import org.apache.spark.SparkConf import org.apache.spark.streaming.Seconds import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.kafka.KafkaUtils import redis.clients.jedis.Jedis object SparkStreamConsumer { private val checkpointDir = "data-checkpoint" private val msgConsumerGroup = "message-consumer-group" def main(args: Array[String]) { if (args.length < 2) { println("Usage:zkserver1:2181,zkserver2:2181,zkserver3:2181 consumeMsgDataTimeInterval(secs)") System.exit(1) } val Array(zkServers,processingInterval) = args val conf = new SparkConf().setAppName("Web Page Popularity Value Calculator") val ssc = new StreamingContext(conf, Seconds(processingInterval.toInt)) //using updateStateByKey asks for enabling checkpoint ssc.checkpoint(checkpointDir) val kafkaStream = KafkaUtils.createStream( //Spark streaming context ssc, //zookeeper quorum. e.g zkserver1:2181,zkserver2:2181,... zkServers, //kafka message consumer group ID msgConsumerGroup, //Map of (topic_name -> numPartitions) to consume. Each partition is consumed in its own thread Map("spark-stream-topic" -> 3)) //原始數據為 (topic,data) val msgDataRDD = kafkaStream.map(_._2) //原始數據處理 val lines = msgDataRDD.map { msgLine => { val dataArr: Array[String] = msgLine.split("\t") val id = dataArr(0) val timeArr: Array[String] = dataArr(1).split(":") val time = timeArr(0) val val1: Double = { if (dataArr(2).equals("NIL")) 0 else if (dataArr(2).toFloat > 35) 1 else 0 } val val2: Double = { if (dataArr(2).equals("NIL")) 0 else if (dataArr(2).toFloat > 0) 1 else 0 } ((id,time),(val1,val2)) } } //通過reduceByKey方法對相同鍵值的數據進行累加 val test = lines.reduceByKey((a,b)=>(a._1+b._1,a._2+b._2)) //錯誤記錄:Task not serializable //遍歷接收到的數據,存入redis數據庫 test.foreachRDD(rdd=>{ rdd.foreachPartition(partition=>{ val jedis = new Jedis("192.168.1.102",6379) partition.foreach(pairs=>{ jedis.hincrByFloat("test",pairs._1._1+"_"+pairs._1._2+":1",pairs._2._1) jedis.hincrByFloat("test",pairs._1._1+"_"+pairs._1._2+":2",pairs._2._2) jedis.close() }) }) }) /*//通過保存在spark內存中的數據與當前數據累加并保存在內存中 val updateValue = (newValue:Seq[(Double,Double)], prevValueState:Option[(Double,Double)]) => { val val1:Double = newValue.map(x=>x._1).foldLeft(0.0)((sum,i)=>sum+i) val val2:Double = newValue.map(x=>x._2).foldLeft(0.0)((sum,i)=>sum+i) // 已累加的值 val previousCount:(Double,Double) = prevValueState.getOrElse((0.0,0.0)) Some((val1,val2)) } val initialRDD = ssc.sparkContext.parallelize(List((("id","time"), (0.00,0.00)))) val stateDstream = lines.updateStateByKey[(Double,Double)](updateValue) //set the checkpoint interval to avoid too frequently data checkpoint which may //may significantly reduce operation throughput stateDstream.checkpoint(Duration(8*processingInterval.toInt*1000)) //after calculation, we need to sort the result and only show the top 10 hot pages stateDstream.foreachRDD { rdd => { val sortedData = rdd.map{ case (k,v) => (v,k) }.sortByKey(false) val topKData = sortedData.take(10).map{ case (v,k) => (k,v) } //val topKData = sortedData.map{ case (v,k) => (k,v) } //org.apache.spark.SparkException: Task not serializable topKData.foreach(x => { println(x) jedis.hincrByFloat("test",x._1._1+"_"+x._1._2+":1",x._2._1) }) } }*/ ssc.start() ssc.awaitTermination() } }
一開始我將數據庫連接操作放在foreachRDD方法之后,程序運行出錯,在網上沒有找到對應的解決方案,于是仔細閱讀官網資料,在官網上找到了下面一段話:
Often writing data to external system requires creating a connection object (e.g. TCP connection to a remote server) and using it to send data to a remote system. For this purpose, a developer may inadvertently try creating a connection object at the Spark driver, and then try to use it in a Spark worker to save records in the RDDs. For example (in Scala),This is incorrect as this requires the connection object to be serialized and sent from the driver to the worker. Such connection objects are rarely transferrable across machines. This error may manifest as serialization errors (connection object not serializable), initialization errors (connection object needs to be initialized at the workers), etc. The correct solution is to create the connection object at the worker.
其中,需要注意的是foreachRDD方法的調用,該方法運行于driver之上,如果將數據庫連接放在該方法位置會導致連接運行在driver上,會拋出connection object not serializable的錯誤。因此需要將數據庫連接方法創建在foreach方法之后,需要注意的是這種做法還需要優化,因為這樣會對每個rdd記錄創建數據庫連接,導致系統運行變慢,可以通過先調用foreachPartition方法為每個分區單獨重建一個數據庫連接然后再該分區之內再遍歷rdd記錄。這樣可以減少數據庫連接的創建次數,還可以通過構建數據庫連接池的方法繼續優化,這里就不再贅述了。
利用idea將程序編譯成jar包之后上傳到spark安裝目錄的lib目錄下,通過spark-submit SparkStreamConsumer.jar datanode1:2181 2,運行程序。其中datanode1:2181是集群中zookeeper的地址。
另外需要注意的就是,需要將jedis包發送到集群中各節點的spark安裝目錄的lib目錄下 。
通過redis客戶端可以查看存儲的計算結果,存儲結構為(key,value)=>(id_time:1,val1) (id_time:2,val2),例如155159602小區在2015-11-26 21點的網絡覆蓋率為70/82=0.85。