使用Spark Streaming進行情感分析
這里將使用推ter流式數據,它符合所有所需:持續而且無止境的數據源。
Spark Streaming
Spark Streaming在電子書 《手把手教你學習Spark》 第六章有詳細介紹,這里略過Streaming API的詳細介紹,直接進行程序開發 。
程序開發設置部分
程序開發起始部分需要做好準備工作。
val config = new SparkConf().setAppName("推ter-stream-sentiment")
val sc = new SparkContext(config)
sc.setLogLevel("WARN")
val ssc = new StreamingContext(sc, Seconds(5))
System.setProperty("推ter4j.oauth.consumerKey", "consumerKey")
System.setProperty("推ter4j.oauth.consumerSecret", "consumerSecret")
System.setProperty("推ter4j.oauth.accessToken", accessToken)
System.setProperty("推ter4j.oauth.accessTokenSecret", "accessTokenSecret")
val stream = 推terUtils.createStream(ssc, None)
這里創建一個Spark Context sc ,設置日志級別為WARN來消除Spark生成的日志。使用 sc 創建Streaming Context ssc ,然后設置 推ter證書來獲得 推ter網站數據。
推ter上現在的趨勢是什么?
很容易的能夠找到任意給定時刻的推ter趨勢,僅僅需要計算數據流每個標簽的數目。讓我們看下Spark如何實現這個操作的。
val tags = stream.flatMap { status =>
status.getHashtagEntities.map(_.getText)
}
tags.countByValue()
.foreachRDD { rdd =>
val now = org.joda.time.DateTime.now()
rdd
.sortBy(_._2)
.map(x => (x, now))
.saveAsTextFile(s"~/推ter/$now")
}
首先從Tweets獲取標記,并計算標記的數量,按數量排序,然后持久化結果。我們基于前面的結果建立一個監控面板來跟蹤趨勢標簽。作者的同事就可以創建一個廣告標記(campaigns),并吸引更多的用戶。
分析Tweets
現在我們想增加一個功能來獲得用戶主要感興趣的主題集。為了這個目的我們想對Tweets的大數據和食物兩個不相關的主題進行情感分析。
有幾種API可以在Tweets上做情感分析,但是作者選擇斯坦福自然語言處理組開發的庫來抽取相關情感。
在 build.sbt 文件中增加相對應的依賴。
libraryDependencies += "edu.stanford.nlp" % "stanford-corenlp" % "3.5.1"
libraryDependencies += "edu.stanford.nlp" % "stanford-corenlp" % "3.5.1" classifier "models"
現在,我們通過Streaming過濾一定的哈希標簽,只選擇感興趣的Tweets,如下所示:
val tweets = stream.filter {t =>
val tags = t.getText.split(" ").filter(_.startsWith("#")).map(_.toLowerCase)
tags.contains("#bigdata") && tags.contains("#food")
}
得到Tweets上所有標簽,然后標記出#bigdata和 #food兩個標簽。接下來定一個函數從Tweets抽取相關的情感:
def detectSentiment(message: String): SENTIMENT_TYPE
然后對detectSentiment進行測試以確保其可以工作:
it("should detect not understood sentiment") {
detectSentiment("") should equal (NOT_UNDERSTOOD)
}
it("should detect a negative sentiment") {
detectSentiment("I am feeling very sad and frustrated.") should equal (NEGATIVE)
}
it("should detect a neutral sentiment") {
detectSentiment("I'm watching a movie") should equal (NEUTRAL)
}
it("should detect a positive sentiment") {
detectSentiment("It was a nice experience.") should equal (POSITIVE)
}
it("should detect a very positive sentiment") {
detectSentiment("It was a very nice experience.") should equal (VERY_POSITIVE)
}
完整列子如下:
val data = tweets.map { status =>
val sentiment = SentimentAnalysisUtils.detectSentiment(status.getText)
val tags = status.getHashtagEntities.map(_.getText.toLowerCase)
(status.getText, sentiment.toString, tags)
}
data中包含相關的情感。
和SQL協同進行分析
現在作者想把情感分析的數據存儲在外部數據庫,為了后續可以使用SQL查詢。具體操作如下:
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
data.foreachRDD { rdd =>
rdd.toDF().registerTempTable("sentiments")
}
將Dstream轉換成DataFrame,然后注冊成一個臨時表,其他喜歡使用SQL的同事就可以使用不同的數據源啦。
sentiment表可以被任意查詢,也可以使用Spark SQL和其他數據源(比如,Cassandra數據等)進行交叉查詢。查詢DataFrame的列子:
sqlContext.sql("select * from sentiments").show()
窗口操作
Spark Streaming的窗口操作可以進行回溯數據,這在其他流式引擎中并沒有。
為了使用窗口函數,你需要checkpoint流數據,具體詳情見 http://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing 。
簡單的一個窗口操作:
tags
.window(Minutes(1))
. (...)
結論
此列子雖然簡單,但是其可以使用Spark解決實際問題。我們可以計算推ter上主題趨勢。
來自: http://www.infoq.com/cn/articles/emotional-analysis-using-streaming-spark