kafka+spark streaming+redis學習

jopen 8年前發布 | 27K 次閱讀 Spark Kafka Redis 分布式/云計算/大數據

針對這段時間所學的做了一個簡單的綜合應用,應用的場景為統計一段時間內各個小區的網絡信號覆蓋率,計算公式如下所示:

分子:信號強度大于 35 的采樣點個數

分母:信號強度為非空的所有采樣點個數

網絡覆蓋率=分子/分母

原始數據為 xml 格式,記錄各小區在各時刻的采樣點,采樣時間精確到 ms ,我們需要做的是計算單個小區以小時為間隔的信號覆蓋率。通過簡單的 java 代碼解析 xml 文件,并將解析后的數據通過 kafka 生產者進程發送的 kafka 消息集群中,利用 spark streaming 進行實時處理并將處理結果存入 redis 。下面是數據處理過程

原始數據格式:  小區 ID               信號強度             時間

155058480           49                   2015-11-27T00:00:03.285

155058480           33                   2015-11-27T00:00:05.000

155058480 空               2015-11-27T00:00:23.285

原始數據處理:  小區 ID         分子        分母     時間

155058480     1           1        2015-11-27T00

155058480     0           1        2015-11-27T00

155058480     0           0        2015-11-27T00

統計合并處理:  小區 ID         分子        分母     時間

155058480     1           2        2015-11-27T00

小區155058480的網絡覆蓋率=分子/分母=1/2=0.5

說明:以小區 155058480 為例的三個采樣點,其中滿足上述分子條件的非空記錄的分子記為為 1 ,不滿足分子條件的非空記錄與空記錄的分子記為 0 ,非空記錄的分母記為 1 。同時對時間進行分割,保留到小時,并以時間個小區 id 為復合主鍵利用 reduceByKey 方法進行累加統計。

下面給出spark streaming代碼:

import org.apache.spark.SparkConf
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka.KafkaUtils
import redis.clients.jedis.Jedis

object SparkStreamConsumer {

    private val checkpointDir = "data-checkpoint"
    private val msgConsumerGroup = "message-consumer-group"

    def main(args: Array[String]) {
      if (args.length < 2) {
        println("Usage:zkserver1:2181,zkserver2:2181,zkserver3:2181 consumeMsgDataTimeInterval(secs)")
        System.exit(1)
      }
      val Array(zkServers,processingInterval) = args
      val conf = new SparkConf().setAppName("Web Page Popularity Value Calculator")
      val ssc = new StreamingContext(conf, Seconds(processingInterval.toInt))
      //using updateStateByKey asks for enabling checkpoint
      ssc.checkpoint(checkpointDir)
      val kafkaStream = KafkaUtils.createStream(
        //Spark streaming context
        ssc,
        //zookeeper quorum. e.g zkserver1:2181,zkserver2:2181,...
        zkServers,
        //kafka message consumer group ID
        msgConsumerGroup,
        //Map of (topic_name -> numPartitions) to consume. Each partition is consumed in its own thread
        Map("spark-stream-topic" -> 3))
      //原始數據為 (topic,data)
      val msgDataRDD = kafkaStream.map(_._2)

      //原始數據處理
      val lines = msgDataRDD.map { msgLine =>
      {
        val dataArr: Array[String] = msgLine.split("\t")
        val id = dataArr(0)
        val timeArr: Array[String] = dataArr(1).split(":")
        val time = timeArr(0)
        val val1: Double = {
          if (dataArr(2).equals("NIL")) 0 else if (dataArr(2).toFloat > 35) 1 else 0
        }
        val val2: Double = {
          if (dataArr(2).equals("NIL")) 0 else if (dataArr(2).toFloat > 0) 1 else 0
        }
        ((id,time),(val1,val2))
      }
      }

      //通過reduceByKey方法對相同鍵值的數據進行累加
      val test = lines.reduceByKey((a,b)=>(a._1+b._1,a._2+b._2))

      //錯誤記錄:Task not serializable
      //遍歷接收到的數據,存入redis數據庫
      test.foreachRDD(rdd=>{
        rdd.foreachPartition(partition=>{
          val jedis = new Jedis("192.168.1.102",6379)
          partition.foreach(pairs=>{

            jedis.hincrByFloat("test",pairs._1._1+"_"+pairs._1._2+":1",pairs._2._1)
            jedis.hincrByFloat("test",pairs._1._1+"_"+pairs._1._2+":2",pairs._2._2)
            jedis.close()


          })
        })
      })


      /*//通過保存在spark內存中的數據與當前數據累加并保存在內存中
      val updateValue = (newValue:Seq[(Double,Double)], prevValueState:Option[(Double,Double)]) => {

        val val1:Double = newValue.map(x=>x._1).foldLeft(0.0)((sum,i)=>sum+i)
        val val2:Double = newValue.map(x=>x._2).foldLeft(0.0)((sum,i)=>sum+i)
        // 已累加的值
        val previousCount:(Double,Double) = prevValueState.getOrElse((0.0,0.0))

        Some((val1,val2))
      }
      val initialRDD = ssc.sparkContext.parallelize(List((("id","time"), (0.00,0.00))))

      val stateDstream = lines.updateStateByKey[(Double,Double)](updateValue)
      //set the checkpoint interval to avoid too frequently data checkpoint which may
      //may significantly reduce operation throughput
      stateDstream.checkpoint(Duration(8*processingInterval.toInt*1000))
      //after calculation, we need to sort the result and only show the top 10 hot pages
      stateDstream.foreachRDD { rdd => {
        val sortedData = rdd.map{ case (k,v) => (v,k) }.sortByKey(false)
        val topKData = sortedData.take(10).map{ case (v,k) => (k,v) }
        //val topKData = sortedData.map{ case (v,k) => (k,v) }
        //org.apache.spark.SparkException: Task not serializable
        topKData.foreach(x => {
          println(x)
          jedis.hincrByFloat("test",x._1._1+"_"+x._1._2+":1",x._2._1)
        })
      }
      }*/
      ssc.start()
      ssc.awaitTermination()
    }
}

一開始我將數據庫連接操作放在foreachRDD方法之后,程序運行出錯,在網上沒有找到對應的解決方案,于是仔細閱讀官網資料,在官網上找到了下面一段話:

Often writing data to external system requires creating a connection object (e.g. TCP connection to a remote server) and using it to send data to a remote system. For this purpose, a developer may inadvertently try creating a connection object at the Spark driver, and then try to use it in a Spark worker to save records in the RDDs. For example (in Scala),This is incorrect as this requires the connection object to be serialized and sent from the driver to the worker. Such connection objects are rarely transferrable across machines. This error may manifest as serialization errors (connection object not serializable), initialization errors (connection object needs to be initialized at the workers), etc. The correct solution is to create the connection object at the worker.

其中,需要注意的是foreachRDD方法的調用,該方法運行于driver之上,如果將數據庫連接放在該方法位置會導致連接運行在driver上,會拋出connection object not serializable的錯誤。因此需要將數據庫連接方法創建在foreach方法之后,需要注意的是這種做法還需要優化,因為這樣會對每個rdd記錄創建數據庫連接,導致系統運行變慢,可以通過先調用foreachPartition方法為每個分區單獨重建一個數據庫連接然后再該分區之內再遍歷rdd記錄。這樣可以減少數據庫連接的創建次數,還可以通過構建數據庫連接池的方法繼續優化,這里就不再贅述了。

利用idea將程序編譯成jar包之后上傳到spark安裝目錄的lib目錄下,通過spark-submit SparkStreamConsumer.jar datanode1:2181 2,運行程序。其中datanode1:2181是集群中zookeeper的地址。

另外需要注意的就是,需要將jedis包發送到集群中各節點的spark安裝目錄的lib目錄下 。

通過redis客戶端可以查看存儲的計算結果,存儲結構為(key,value)=>(id_time:1,val1) (id_time:2,val2),例如155159602小區在2015-11-26 21點的網絡覆蓋率為70/82=0.85。

來自: http://www.cnblogs.com/xiaoxiongbb/p/5143406.html

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!