如何使用Spark ALS實現協同過濾

jopen 9年前發布 | 73K 次閱讀 Spark 分布式/云計算/大數據

轉載自 JavaChen Blog ,作者: Junez

本文主要記錄最近一段時間學習和實現Spark MLlib中的協同過濾的一些總結,希望對大家熟悉Spark ALS算法有所幫助。

更新:

【2016.06.12】Spark1.4.0中MatrixFactorizationModel提供了recommendForAll方法實現離線批量推薦,見 SPARK-3066

測試環境

為了測試簡單,在本地以local方式運行Spark,你需要做的是下載編譯好的壓縮包解壓即可,可以參考 Spark本地模式運行

測試數據使用 MovieLensMovieLens 10M 數據集,下載之后解壓到data目錄。數據的格式請參考README中的說明,需要注意的是ratings.dat中的數據被處理過, 每個用戶至少訪問了20個商品

下面的代碼均在spark-shell中運行,啟動時候可以根據你的機器內存設置JVM參數,例如:

bin/spark-shell --executor-memory 3 g --driver-memory 3 g --driver-java-options '-Xms2g -Xmx2g -XX:+UseCompressedOops'

預測評分

這個例子主要演示如何訓練數據、評分并計算根均方差。

準備工作

首先,啟動spark-shell,然后引入mllib包,我們需要用到ALS算法類和Rating評分類:

import org.apache.spark.mllib.recommendation.{ALS, Rating}

Spark的日志級別默認為INFO,你可以手動設置為WARN級別,同樣先引入log4j依賴:

import org.apache.log4j.{Logger,Level}

然后,運行下面代碼:

Logger.getLogger( "org.apache.spark" ).setLevel(Level.WARN)

Logger.getLogger( "org.eclipse.jetty.server" ).setLevel(Level.OFF)

加載數據

spark-shell啟動成功之后,sc為內置變量,你可以通過它來加載測試數據:

val data = sc.textFile( "data/ml-1m/ratings.dat" )

接下來解析文件內容,獲得用戶對商品的評分記錄:

val ratings = data.map(_.split( "::" ) match { case Array(user, item, rate, ts) =>

Rating(user.toInt, item.toInt, rate.toDouble)

}).cache()

查看第一條記錄:

scala> ratings.first

res81: org.apache.spark.mllib.recommendation.Rating = Rating( 1 , 1193 , 5.0 )

我們可以統計文件中用戶和商品數量:

val users = ratings.map(_.user).distinct()

val products = ratings.map(_.product).distinct()

println( "Got " +ratings.count()+ " ratings from " +users.count+ " users on " +products.count+ " products." )

可以看到如下輸出:

//Got 1000209 ratings from 6040 users on 3706 products.

你可以對評分數據生成訓練集和測試集,例如:訓練集和測試集比例為8比2:

val splits = ratings.randomSplit(Array( 0.8 , 0.2 ), seed = 111 l)

val training = splits( 0 ).repartition(numPartitions)

val test = splits( 1 ).repartition(numPartitions)

這里,我們是將評分數據全部當做訓練集,并且也為測試集。

訓練模型

接下來調用ALS.train()方法,進行模型訓練:

val rank = 12

val lambda = 0.01

val numIterations = 20

val model = ALS.train(ratings, rank, numIterations, lambda)

訓練完后,我們看看model中的用戶和商品特征向量:

model.userFeatures

model.userFeatures.count

//res84: Long = 6040

model.productFeatures

model.productFeatures.count

//res86: Long = 3706

評測

我們要對比一下預測的結果,注意:我們將 訓練集當作測試集 來進行對比測試。從訓練集中獲取用戶和商品的映射:

val usersProducts= ratings.map { case Rating(user, product, rate) =>

(user, product)

}

顯然,測試集的記錄數等于評分總記錄數,驗證一下:

usersProducts.count //Long = 1000209

使用推薦模型對用戶商品進行預測評分,得到預測評分的數據集:

var predictions = model.predict(usersProducts).map { case Rating(user, product, rate) =>

((user, product), rate)

}

查看其記錄數:

predictions.count //Long = 1000209

將真實評分數據集與預測評分數據集進行合并,這樣得到用戶對每一個商品的實際評分和預測評分:

val ratesAndPreds = ratings.map { case Rating(user, product, rate) =>

((user, product), rate)

}.join(predictions)

ratesAndPreds.count //Long = 1000209

然后計算根均方差:

val rmse= math.sqrt(ratesAndPreds.map { case ((user, product), (r1, r2)) =>

val err = (r1 - r2)

err * err

}.mean())

println(s "RMSE = $rmse" )

上面這段代碼其實就是 對測試集進行評分預測并計算與實際評分的相似度 ,這段代碼可以抽象為一個方法,如下:

/** Compute RMSE (Root Mean Squared Error). */

def computeRmse(model: MatrixFactorizationModel, data: RDD[Rating]) = {

val usersProducts = data.map { case Rating(user, product, rate) =>

(user, product)

}

val predictions = model.predict(usersProducts).map { case Rating(user, product, rate) =>

((user, product), rate)

}

val ratesAndPreds = data.map { case Rating(user, product, rate) =>

((user, product), rate)

}.join(predictions)

math.sqrt(ratesAndPreds.map { case ((user, product), (r1, r2)) =>

val err = (r1 - r2)

err * err

}.mean())

}

除了RMSE指標,我們還可以計算AUC以及Mean average precision at K (MAPK),關于AUC的計算方法,參考 RunRecommender.scala ,關于MAPK的計算方法可以參考 《Packt.Machine Learning with Spark.2015.pdf》 一書第四章節內容,或者你可以看本文后面內容。

保存真實評分和預測評分

我們還可以保存用戶對商品的真實評分和預測評分記錄到本地文件:

ratesAndPreds.sortByKey().repartition( 1 ).sortBy(_._1).map({

case ((user, product), (rate, pred)) => (user + "," + product + "," + rate + "," + pred)

}).saveAsTextFile( "/tmp/result" )

上面這段代碼先按用戶排序,然后重新分區確保目標目錄中只生成一個文件。如果你重復運行這段代碼,則需要先刪除目標路徑:

import scala.sys.process._

"rm -r /tmp/result" .!

我們還可以對預測的評分結果按用戶進行分組并按評分倒排序:

predictions.map { case ((user, product), rate) =>

(user, (product, rate))

}.groupByKey(numPartitions).map{ case (user_id,list)=>

(user_id,list.toList.sortBy { case (goods_id,rate)=> - rate})

}

給一個用戶推薦商品

這個例子主要是記錄如何給一個或大量用戶進行推薦商品,例如,對用戶編號為384的用戶進行推薦,查出該用戶在測試集中評分過的商品。

找出5個用戶:

users.take( 5 )

//Array[Int] = Array(384, 1084, 4904, 3702, 5618)

查看用戶編號為384的用戶的預測結果中預測評分排前10的商品:

val userId = users.take( 1 )( 0 ) //384

val K = 10

val topKRecs = model.recommendProducts(userId, K)

println(topKRecs.mkString( "\n" ))

// Rating(384,2545,8.354966018818265)

// Rating(384,129,8.113083736094676)

// Rating(384,184,8.038113395650853)

// Rating(384,811,7.983433591425284)

// Rating(384,1421,7.912044967873945)

// Rating(384,1313,7.719639594879865)

// Rating(384,2892,7.53667094600392)

// Rating(384,2483,7.295378004543803)

// Rating(384,397,7.141158013610967)

// Rating(384,97,7.071089782695754)

查看該用戶的評分記錄:

val goodsForUser=ratings.keyBy(_.user).lookup( 384 )

productsForUser.size //Int = 22

productsForUser.sortBy(-_.rating).take( 10 ).map(rating => (rating.product, rating.rating)).foreach(println)

// (593,5.0)

// (1201,5.0)

// (3671,5.0)

// (1304,5.0)

// (1197,4.0)

// (3037,4.0)

// (1610,4.0)

// (3074,4.0)

// (204,4.0)

// (260,4.0)

可以看到該用戶對22個商品評過分以及瀏覽的商品是哪些。

我們可以該用戶對某一個商品的實際評分和預測評分方差為多少:

val actualRating = productsForUser.take( 1 )( 0 )

val predictedRating = model.predict( 384 , actualRating.product)

//predictedRating: Double = 1.9426030777174637

val squaredError = math.pow(predictedRating - actualRating.rating, 2.0 )

//squaredError: Double = 0.0032944066875075172

如何找出和一個已知商品最相似的商品呢?這里,我們可以使用余弦相似度來計算:

import org.jblas.DoubleMatrix

/* Compute the cosine similarity between two vectors */

def cosineSimilarity(vec1: DoubleMatrix, vec2: DoubleMatrix): Double = {

vec1.dot(vec2) / (vec1.norm2() * vec2.norm2())

}

以2055商品為例,計算實際評分和預測評分相似度

val itemId = 2055

val itemFactor = model.productFeatures.lookup(itemId).head

val itemVector = new DoubleMatrix(itemFactor)

cosineSimilarity(itemVector, itemVector)

// res99: Double = 0.9999999999999999

找到和該商品最相似的10個商品:

val sims = model.productFeatures.map{ case (id, factor) =>

val factorVector = new DoubleMatrix(factor)

val sim = cosineSimilarity(factorVector, itemVector)

(id, sim)

}

val sortedSims = sims.top(K)(Ordering.by[(Int, Double), Double] { case (id, similarity) => similarity })

println(sortedSims.mkString( "\n" ))

// (2055,0.9999999999999999)

// (2051,0.9138311231145874)

// (3520,0.8739823400539756)

// (2190,0.8718466671129721)

// (2050,0.8612639515847019)

// (1011,0.8466911667526461)

// (2903,0.8455764332511272)

// (3121,0.8227325520485377)

// (3674,0.8075743004357392)

// (2016,0.8063817280259447)

顯然第一個最相似的商品即為該商品本身,即2055,我們可以修改下代碼,取前k+1個商品,然后排除第一個:

val sortedSims2 = sims.top(K + 1 )(Ordering.by[(Int, Double), Double] { case (id, similarity) => similarity })

sortedSims2.slice( 1 , 11 ).map{ case (id, sim) => (id, sim) }.mkString( "\n" )

// (2051,0.9138311231145874)

// (3520,0.8739823400539756)

// (2190,0.8718466671129721)

// (2050,0.8612639515847019)

// (1011,0.8466911667526461)

// (2903,0.8455764332511272)

// (3121,0.8227325520485377)

// (3674,0.8075743004357392)

// (2016,0.8063817280259447)

// (3672,0.8016276723120674)

接下來,我們可以計算給該用戶推薦的前K個商品的平均準確度MAPK,該算法定義如下(該算法是否正確還有待考證):

def avgPrecisionK(actual: Seq[Int], predicted: Seq[Int], k: Int): Double = {

val predK = predicted.take(k)

var score = 0.0

var numHits = 0.0

for ((p, i) <- predK.zipWithIndex) {

if (actual.contains(p)) {

numHits += 1.0

score += numHits / (i.toDouble + 1.0 )

}

}

if (actual.isEmpty) {

1.0

} else {

score / scala.math.min(actual.size, k).toDouble

}

}

給該用戶推薦的商品為:

val actualProducts = productsForUser.map(_.product)

給該用戶預測的商品為:

val predictedProducts = topKRecs.map(_.product)

最后的準確度為:

val apk10 = avgPrecisionK(actualProducts, predictedProducts, 10 )

// apk10: Double = 0.0

批量推薦

你可以評分記錄中獲得所有用戶然后依次給每個用戶推薦:

val users = ratings.map(_.user).distinct()

users.collect.flatMap { user =>

model.recommendProducts(user, 10 )

}

這種方式是遍歷內存中的一個集合然后循環調用RDD的操作,運行會比較慢,另外一種方式是直接操作model中的userFeatures和productFeatures,代碼如下:

val itemFactors = model.productFeatures.map { case (id, factor) => factor }.collect()

val itemMatrix = new DoubleMatrix(itemFactors)

println(itemMatrix.rows, itemMatrix.columns)

//(3706,12)

// broadcast the item factor matrix

val imBroadcast = sc.broadcast(itemMatrix)

//獲取商品和索引的映射

var idxProducts=model.productFeatures.map { case (prodcut, factor) => prodcut }.zipWithIndex().map{ case (prodcut, idx) => (idx,prodcut)}.collectAsMap()

val idxProductsBroadcast = sc.broadcast(idxProducts)

val allRecs = model.userFeatures.map{ case (user, array) =>

val userVector = new DoubleMatrix(array)

val scores = imBroadcast.value.mmul(userVector)

val sortedWithId = scores.data.zipWithIndex.sortBy(-_._1)

//根據索引取對應的商品id

val recommendedProducts = sortedWithId.map(_._2).map{idx=>idxProductsBroadcast.value.get(idx).get}

(user, recommendedProducts)

}

這種方式其實還不是最優方法,更好的方法可以參考 Personalised recommendations using Spark ,當然這篇文章中的代碼還可以繼續優化一下。我修改后的代碼如下,供大家參考:

val productFeatures = model.productFeatures.collect()

var productArray = ArrayBuffer[Int]()

var productFeaturesArray = ArrayBuffer[Array[Double]]()

for ((product, features) <- productFeatures) {

productArray += product

productFeaturesArray += features

}

val productArrayBroadcast = sc.broadcast(productArray)

val productFeatureMatrixBroadcast = sc.broadcast( new DoubleMatrix(productFeaturesArray.toArray).transpose())

start = System.currentTimeMillis()

val allRecs = model.userFeatures.mapPartitions { iter =>

// Build user feature matrix for jblas

var userFeaturesArray = ArrayBuffer[Array[Double]]()

var userArray = new ArrayBuffer[Int]()

while (iter.hasNext) {

val (user, features) = iter.next()

userArray += user

userFeaturesArray += features

}

var userFeatureMatrix = new DoubleMatrix(userFeaturesArray.toArray)

var userRecommendationMatrix = userFeatureMatrix.mmul(productFeatureMatrixBroadcast.value)

var productArray=productArrayBroadcast.value

var mappedUserRecommendationArray = new ArrayBuffer[String](params.topk)

// Extract ratings from the matrix

for (i <- 0 until userArray.length) {

var ratingSet = mutable.TreeSet.empty(Ordering.fromLessThan[(Int,Double)](_._2 > _._2))

for (j <- 0 until productArray.length) {

var rating = (productArray(j), userRecommendationMatrix.get(i,j))

ratingSet += rating

}

mappedUserRecommendationArray += userArray(i)+ "," +ratingSet.take(params.topk).mkString( "," )

}

mappedUserRecommendationArray.iterator

}

2015.06.12 更新:

悲哀的是,上面的方法還是不能解決問題,因為矩陣相乘會撐爆集群內存;可喜的是,如果你關注Spark最新動態,你會發現Spark1.4.0中MatrixFactorizationModel提供了recommendForAll方法實現離線批量推薦,詳細說明見 SPARK-3066 。因為,我使用的Hadoop版本是CDH-5.4.0,其中Spark版本還是1.3.0,所以暫且不能在集群上測試Spark1.4.0中添加的新方法。

如果上面結果跑出來了,就可以驗證推薦結果是否正確。還是以384用戶為例:

allRecs.lookup( 384 ).head.take( 10 )

topKRecs.map(_.product)

接下來,我們可以計算所有推薦結果的準確度了,首先,得到每個用戶評分過的所有商品:

val userProducts = ratings.map{ case Rating(user, product, rating) => (user, product) }.groupBy(_._1)

然后,預測的商品和實際商品關聯求準確度:

val MAPK = allRecs.join(userProducts).map{ case (userId, (predicted, actualWithIds)) =>

val actual = actualWithIds.map(_._2).toSeq

avgPrecisionK(actual, predicted, K)

}.reduce(_ + _) / allRecs.count

println( "Mean Average Precision at K = " + MAPK)

//Mean Average Precision at K = 0.018827551771260383

其實,我們也可以使用Spark內置的算法計算RMSE和MAE:

// MSE, RMSE and MAE

import org.apache.spark.mllib.evaluation.RegressionMetrics

val predictedAndTrue = ratesAndPreds.map { case ((user, product), (actual, predicted)) => (actual, predicted) }

val regressionMetrics = new RegressionMetrics(predictedAndTrue)

println( "Mean Squared Error = " + regressionMetrics.meanSquaredError)

println( "Root Mean Squared Error = " + regressionMetrics.rootMeanSquaredError)

// Mean Squared Error = 0.5490153087908566

// Root Mean Squared Error = 0.7409556726220918

// MAPK

import org.apache.spark.mllib.evaluation.RankingMetrics

val predictedAndTrueForRanking = allRecs.join(userProducts).map{ case (userId, (predicted, actualWithIds)) =>

val actual = actualWithIds.map(_._2)

(predicted.toArray, actual.toArray)

}

val rankingMetrics = new RankingMetrics(predictedAndTrueForRanking)

println( "Mean Average Precision = " + rankingMetrics.meanAveragePrecision)

// Mean Average Precision = 0.04417535679520426

計算推薦2000個商品時的準確度為:

val MAPK2000 = allRecs.join(userProducts).map{ case (userId, (predicted, actualWithIds)) =>

val actual = actualWithIds.map(_._2).toSeq

avgPrecisionK(actual, predicted, 2000 )

}.reduce(_ + _) / allRecs.count

println( "Mean Average Precision = " + MAPK2000)

//Mean Average Precision = 0.025228311843069083

保存和加載推薦模型

對與實時推薦,我們需要啟動一個web server,在啟動的時候生成或加載訓練模型,然后提供API接口返回推薦接口,需要調用的相關方法為:

save(model: MatrixFactorizationModel, path: String)

load(sc: SparkContext, path: String)

model中的userFeatures和productFeatures也可以保存起來:

val outputDir= "/tmp"

model.userFeatures.map{ case (id, vec) => id + "\t" + vec.mkString( "," ) }.saveAsTextFile(outputDir + "/userFeatures" )

model.productFeatures.map{ case (id, vec) => id + "\t" + vec.mkString( "," ) }.saveAsTextFile(outputDir + "/productFeatures" )

總結

本文主要記錄如何使用ALS算法實現協同過濾并給用戶推薦商品,以上代碼在Github倉庫中的 ScalaLocalALS.scala 文件。

如果你想更加深入了解Spark MLlib算法的使用,可以看看 Packt.Machine Learning with Spark.2015.pdf 這本電子書并下載書中的源碼,本文大部分代碼參考自該電子書。

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!