使用Spark Streaming進行情感分析

這里將使用推ter流式數據,它符合所有所需:持續而且無止境的數據源。

Spark Streaming

Spark Streaming在電子書 《手把手教你學習Spark》 第六章有詳細介紹,這里略過Streaming API的詳細介紹,直接進行程序開發 。

程序開發設置部分

程序開發起始部分需要做好準備工作。

val config = new SparkConf().setAppName("推ter-stream-sentiment")
val sc = new SparkContext(config)
sc.setLogLevel("WARN")

val ssc = new StreamingContext(sc, Seconds(5))

System.setProperty("推ter4j.oauth.consumerKey", "consumerKey")
System.setProperty("推ter4j.oauth.consumerSecret", "consumerSecret")
System.setProperty("推ter4j.oauth.accessToken", accessToken)
System.setProperty("推ter4j.oauth.accessTokenSecret", "accessTokenSecret")

val stream = 推terUtils.createStream(ssc, None)

這里創建一個Spark Context sc ,設置日志級別為WARN來消除Spark生成的日志。使用 sc 創建Streaming Context ssc ,然后設置 推ter證書來獲得 推ter網站數據。

推ter上現在的趨勢是什么?

很容易的能夠找到任意給定時刻的推ter趨勢,僅僅需要計算數據流每個標簽的數目。讓我們看下Spark如何實現這個操作的。

val tags = stream.flatMap { status =>
   status.getHashtagEntities.map(_.getText)
}
tags.countByValue()
   .foreachRDD { rdd =>
       val now = org.joda.time.DateTime.now()
       rdd
         .sortBy(_._2)
         .map(x => (x, now))
         .saveAsTextFile(s"~/推ter/$now")
     }

首先從Tweets獲取標記,并計算標記的數量,按數量排序,然后持久化結果。我們基于前面的結果建立一個監控面板來跟蹤趨勢標簽。作者的同事就可以創建一個廣告標記(campaigns),并吸引更多的用戶。

分析Tweets

現在我們想增加一個功能來獲得用戶主要感興趣的主題集。為了這個目的我們想對Tweets的大數據和食物兩個不相關的主題進行情感分析。

有幾種API可以在Tweets上做情感分析,但是作者選擇斯坦福自然語言處理組開發的庫來抽取相關情感。

build.sbt 文件中增加相對應的依賴。

libraryDependencies += "edu.stanford.nlp" % "stanford-corenlp" % "3.5.1"
libraryDependencies += "edu.stanford.nlp" % "stanford-corenlp" % "3.5.1" classifier "models"

現在,我們通過Streaming過濾一定的哈希標簽,只選擇感興趣的Tweets,如下所示:

val tweets = stream.filter {t =>
     val tags = t.getText.split(" ").filter(_.startsWith("#")).map(_.toLowerCase)
     tags.contains("#bigdata") && tags.contains("#food")
   }

得到Tweets上所有標簽,然后標記出#bigdata和 #food兩個標簽。接下來定一個函數從Tweets抽取相關的情感:

def detectSentiment(message: String): SENTIMENT_TYPE

然后對detectSentiment進行測試以確保其可以工作:

it("should detect not understood sentiment") {
     detectSentiment("") should equal (NOT_UNDERSTOOD)
}

it("should detect a negative sentiment") {
     detectSentiment("I am feeling very sad and frustrated.") should equal (NEGATIVE)
}

it("should detect a neutral sentiment") {
     detectSentiment("I'm watching a movie") should equal (NEUTRAL)
}

it("should detect a positive sentiment") {
     detectSentiment("It was a nice experience.") should equal (POSITIVE)
}

it("should detect a very positive sentiment") {
     detectSentiment("It was a very nice experience.") should equal (VERY_POSITIVE)
}

完整列子如下:

val data = tweets.map { status =>
   val sentiment = SentimentAnalysisUtils.detectSentiment(status.getText)
   val tags = status.getHashtagEntities.map(_.getText.toLowerCase)

   (status.getText, sentiment.toString, tags)
}

data中包含相關的情感。

和SQL協同進行分析

現在作者想把情感分析的數據存儲在外部數據庫,為了后續可以使用SQL查詢。具體操作如下:

val sqlContext = new SQLContext(sc)

import sqlContext.implicits._

data.foreachRDD { rdd =>
   rdd.toDF().registerTempTable("sentiments")
}

將Dstream轉換成DataFrame,然后注冊成一個臨時表,其他喜歡使用SQL的同事就可以使用不同的數據源啦。

sentiment表可以被任意查詢,也可以使用Spark SQL和其他數據源(比如,Cassandra數據等)進行交叉查詢。查詢DataFrame的列子:

sqlContext.sql("select * from sentiments").show()

窗口操作

Spark Streaming的窗口操作可以進行回溯數據,這在其他流式引擎中并沒有。

為了使用窗口函數,你需要checkpoint流數據,具體詳情見 http://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing

簡單的一個窗口操作:

tags
   .window(Minutes(1))
   . (...)

結論

此列子雖然簡單,但是其可以使用Spark解決實際問題。我們可以計算推ter上主題趨勢。

 

來自: http://www.infoq.com/cn/articles/emotional-analysis-using-streaming-spark

 

 本文由用戶 AnjaBrother 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!