一位算法師工程師的Spark機器學習筆記:構建一個簡單的推薦系統
來自: http://python.jobbole.com/84445/
- 基于item的過濾:使用item的內容或者屬性,選擇給定item的相似的item列表,這些屬性一般為文本內容,包括題目、名、標簽以及一些產品的元信息,通常也包括一些media信息,比如圖像、音頻等等
- 協同過濾:協同過濾是一種集體智慧的推薦模型,在基于用戶的協同過濾方法中,如果兩個用戶有相似的偏好(通過用戶對物品的評分、用戶查看物品的記錄、用戶對物品的評論),當為給定用戶來推薦相關產品時,會使用其他相似偏好的用戶的產品列表來對該用戶進行推薦。基于item的協同過濾,一般數據組成為用戶和用戶對某些items的rating,產品被相似偏好的用戶rating相同的趨勢比較大,因而我們可以用所有用戶對物品的偏好,來發現物品與物品之間的相似度,根據用戶的歷史偏好物品,根據相似信息來推薦給該用戶
- MatrixFactorization
因為在Spark的MLlib模塊中只有MF算法,文章之后會講述如何使用Matrix Factorization來做相關的推薦。
Matrix Factorization
MF在Netflix Prize中得到最好的名詞,關于MF的一片overview:http://techblog.netflix.com/2012/04/netflix-recommendations-beyond-5-stars.html。
Explicit matrix factorization
user ratings 數據:
Tom, Star Wars, 5Jane, Titanic, 4Bill, Batman, 3Jane, Star Wars, 2Bill, Titanic, 3
以user為行,movie為列構造對應rating matrix:
MF就是一種直接建模user-item矩陣的方法,利用兩個低維度的小矩陣的乘積來表示,屬于一種降維的技術。
如果我們有U個用戶,I個items,若不經過MF處理,它看來會使這樣的:
是一個極其稀疏的矩陣,經過MF處理后,表示為兩個維度較小的矩陣相乘:
這類模型被稱為latent feature models,旨在尋找那些潛在的特征,來間接表示user-item rating的矩陣。這類潛在的features并不直接建模user對item的rating關系,而是通過latent features更趨近于建模用戶對某類items的偏好,例如某類影片、風格等等,而這些事通過MF尋找其內在的信息,無需items的詳細描述(和基于content的方法不同)。
MF模型如何計算一個user對某個item的偏好,對應向量相乘即可:
如何計算兩個item的相似度:
MF模型的好處是一旦模型創建好后,predict變得十分容易,并且性能也很好,但是在海量的用戶和itemset時,存儲和生產MF中的如上圖的這兩個矩陣會變得具有挑戰性。
Implicit matrixfactorization
前面我們都在討論顯式的一些偏好信息,比如rating,但是在大部分應用中,拿不到這類信息,我們更多滴搜集的是一些隱性的反饋信息,這類反饋信息沒有明確地告訴某個用戶對某個item的偏好信息,但是卻可以從用戶對某個item的交互信息中建模出來,例如一些二值特征,包括是否瀏覽過、是否購買過產品、以及多少次看過某部電影等等。
MLlib中提供了一種處理這類隱性特征的方法,將前面的輸入ratings矩陣其實可以看做是兩個矩陣:二值偏好矩陣P和信心權重矩陣C;
舉個例子:假定我們的網站上面沒有設計對movie的rating部分,只能通過log查看到用戶是否觀看過影片,然后通過后期處理,可以看出他觀看到過多少次某部影片,這里P來表示影片是否被某用戶看過,C來描述這里的confidence weighting也就是觀看的次數:
這里我們把P和C的dot product來替代前面的rating矩陣,那么我們最終建模來預估某用戶對item的偏好
Alternating least squares
ALS是解決MF問題的一個優化技術,被證明高效、高性能并且能有效地并行化,目前為止,是MLlib中推薦模塊的唯一一個算法。Spark官網上有專門地描述。
特征提取
特征提取是從已有數據中找到有用的數據來對算法進行建模,本文中使用顯式數據也就是用戶對movie的rating信息,這個數據來源于網絡上的MovieLens標準數據集,以下代碼為《Machine Learning with Spark》這本書里面的python的重寫版本,會有專門的ipython notebook放到github上。
rawData = sc.textFile(“../data/ML_spark/MovieLens/u.data”) print rawData.first rawRatings = rawData.map(lambda x: x.split(‘t’)) rawRatings.take(5)
數據分別是userId,itemId,rating和timestamp。
from pyspark.mllib.recommendationimport Rating from pyspark.mllib.recommendation import ALS ratings = rawRatings.map(lambda x : Rating(int(x[0]),int(x[1]),float(x[2]))) print ratings.first
格式化數據,用于后面建模數據,導入Rating,ALS模塊,下面是ALS類的使用說明:
其中rank就是上面latent feature model中矩陣的k,在下面的實驗中,我們設為50:
model = ALS.train(ratings,50) # modelImplicit = ALS.(ratings,50,alpha=0.02) userFeatures = model.userFeatures print userFeatures.take(2)
這里user1與user2,均用50維的向量來表示,也就是上面U*k那個矩陣的每個向量
predictRating = model.predict(789,123) print predictRating
預測用戶789對item 123的rating值,結果為3.76599662082。
topKRecs = model.recommendProducts(userId,K) for rec in topKRecs: print rec moviesForUser = ratings.groupBy(lambda x : x.user).mapValues(list).lookup(userId) # print moviesForUser for i in sorted(moviesForUser[0],key=lambda x : x.rating,reverse=True): print i.product # for # print moviesForUser
使用recommendProducts來為用戶推薦top10的items,其items順序為降序。MoviesForUser是從ratings數據中找出的用戶789rating最高的數據,仔細看下發現數據和我們的ratings里面找出的數據貌似一個都沒有相同的,那么是不是說明我們的算法不給力呢?!這個可不一定,想想看,如果推薦系統只是推薦給你看過的電影,那么它一定是一個失敗的,并且完全對系統的kpi數據無提升作用,前面提到,MF的實質是通過latent feature去找到與用戶過去偏好高的有某些隱性相同特征的電影(這些由整體用戶的集體智慧得到),比如可能是某一類型的電影、又或者相同的演員等等,所以這里不能說明推薦系統不給力,但是確實也很難具有解釋性。
Item recommendations
基于MF的方法中,我們可以利用之前看到k*I的矩陣,計算兩個向量質檢的相似性,也就是item的相似性。這樣,可以很容易做相似商品推薦的場景。這里我們定義相似函數為余弦相似性:
import numpy as np def cosineSImilarity(x,y): return np.dot(x,y)/(np.linalg.norm(x)*np.linalg.norm(y))testx = np.array([1.0,2.0,3.0]) print cosineSImilarity(testx,testx)
然后,通過ALS建模的item的向量,拿到對應地item的向量表示:
itemId = 567 itemFactor = model.productFeatures.lookup(itemId)[0] # itemFactor = itemFactor[1] print itemFactor # model.productFeatures.collect sims = model.productFeatures.map(lambda (id,factor):(id,cosineSImilarity(np.array(factor), np.array(itemFactor))))sims.sortBy(lambda (x,y):y,ascending=False).take(10)
利用ALS的item向量拿到itemId為567的向量表示,然后對model的item的特征向量來計算與567的相似度,按降序排序并取top10
這樣,可以找到與567這個item相似性最大的itemlist。
怎么判斷我們生成的模型性能呢?常用的有一些比如Mean Squared Error,Root Mean Squared Error,但是這類標準無法考量推薦最終的items的排序問題,在實際工作中用的比較多的是MeanAverage Precision,考慮到了item的排序造成的影響。
MSE&RMSE:
userProducts = ratings.map(lambda rating:(rating.user,rating.product)) print userProducts.take(1)[0] predictions = model.predictAll(userProducts).map(lambda rating:((rating.user,rating.product) ,rating.rating)) print predictions.take(5)ratingsAndPredictions = ratings.map(lambda rating:((rating.user,rating.product),rating.rating)) .join(predictions)
MSE = ratingsAndPredictions.map(lambda ((x,y),(m,n)):math.pow(m-n,2)).reduce(lambda x,y:x+y)/ratingsAndPredictions.count print MSE print math.sqrt(MSE)
先map ratings數據得到用戶對item的組合,然后對這類數據predictAll計算該用戶對item的rating估計值。然后利用join函數將預測的數據與ratings中的數據”聯合”起來,塞入相似度函數進行計算,最終結果如下:
備注:看到這里肯定有人會問題,你之前在前面recommendProducts的,沒有一個item是與ratings的數據相同,但是這里為什么又對比ratings中的評分信息來衡量推薦模型的好壞呢。猜想:recommendProduct是基于最終預測的ratings的高低來推薦的,但是,考慮到前面分析的原因,應該是不僅僅是按predict的rating的高低來給定推薦產品而是參入了其他的考量,所以這里并不矛盾。
APK :
什么是APK?可以看下這里,里面有R,Matlab,Python的各種Metrics的實現,還有kaggle里對APK的說明,邏輯很簡單,相對于MSE和RMSE,考慮了推薦的排序對最后metrics的影響,如果檢索出來的item排序越靠前,得分越高。
def avgPrecisionK(actual, predicted,k=10): iflen(predicted)>k: predicted = predicted[:k] score = 0.0 num_hits = 0.0 for i,p in enumerate(predicted): if p in actual and p not in predicted[:i]: num_hits += 1.0 score += num_hits / (i+1.0) if not actual: return 1.0 return score / min(len(actual), k)itemFactors = model.productFeatures.map(lambda (id,factor):factor).collectitemMatrix = np.array(itemFactors)imBroadcast = sc.broadcast(itemMatrix)
拿到product的所有向量表示,初始化矩陣 ,然后broadcast到各個節點。
userVector = model.userFeatures.map(lambda (userId,array):(userId,np.array(array))) # print userVector[0] userVector = userVector.map(lambda (userId,x): (userId,imBroadcast.value.dot((np.array(x).transpose)))) userVectorId = userVector.map(lambda (userId,x) : (userId,[(xx,i) for i,xx in enumerate(x.tolist)])) sortUserVectorId = userVectorId.map(lambda (userId,x):(userId,sorted(x,key=lambda x:x[0],reverse=True))) sortUserVectorRecId = sortUserVectorId.map(lambda (userId,x): (userId,[xx[1] for xx in x]))
為每一個user推薦一個對應的item list,并按user向量與item向量相乘計算的該用戶對該item的rating值來進行排序,最終給定一個有序的item的list。
userMovies = ratings.map(lambda rating: (rating.user,rating.product)).groupBy(lambda (x,y):x) userMovies = userMovies.map(lambda (userId,x):(userId, [xx[1] for xx in x] )) allAPK=sortUserVectorRecId.join(userMovies).map(lambda (userId,(predicted, actual)) :avgPrecisionK(actual,predicted,2000)) print allAPK.reduce(lambda x,y:x+y)/allAPK.count
然后從rating中找到對應的的item 列表,然后塞入之前我們寫的apk函數,然后求平均,最終結果為0.115484271925。
當然我們可以直接使用MLlib內置的evaluation模塊來對我們的模型進行評價,如MSE,RMSE:
from pyspark.mllib.evaluation import RegressionMetrics from pyspark.mllib.evaluationimport RankingMetrics predictedAndTrue = ratingsAndPredictions.map(lambda ((userId,product),(predicted, actual)) :(predicted,actual)) # print predictedAndTrue.take(1) regressionMetrics = RegressionMetrics(predictedAndTrue) print “Mean Squared Error = %f”%regressionMetrics.meanSquaredError print “Root Mean Squared Error %f”% regressionMetrics.rootMeanSquaredError
MAP:
#MAP # The implementation of the average precision at the K function in RankingMetrics is slightly different # from ours, # so we will get different results. However, the computation of the overall mean average precision #(MAP, which does not use a threshold at K) is the same as our function if we select K to be very high # (say, at least as high as the number of items in our item set) sortedLabels = sortUserVectorRecId.join(userMovies).map(lambda (userId,(predicted, actual)) :(predicted,actual)) # print sortedLabels.take(1) rankMetrics = RankingMetrics(sortedLabels) print “Mean Average Precision = %f” % rankMetrics.meanAveragePrecision print “MeanAverage Precision(at K=10) = %f” % rankMetrics.precisionAt(5)
這里結果與我們前面取k=2000的結果相同,說明我們的計算和MLlib是一致的,但是K=10或者比較小的值時,不一樣,這是因為MLlib在precisionAt(k)這個函數與我們前面邏輯不同,這里我們不做考慮。
本章的代碼放到了github上面,是ipython notebook的可以直接調用試用下,這版代碼是我學習spark寫的,水平很差,而且notebook中也沒有基本的代碼說明,算是對原書中這部分的scala的一次重寫,喜歡python和spark的可以研究下,一步一步看下還是會熟悉python操作spark的流程的。
</div>