使用 Azure、Hadoop 和 Mahout 構建一個推薦系統
今天想幫助別人嗎?
根據用戶之前的回答歷史,我們可以把Stack Exchange的新問題推薦給一個能夠回答的用戶,這與亞馬遜通過你之前的購買記錄給你提供推薦很相似。不知道Stack Exchange是做什么的? - 它運行了很多的Q&A網站,包括廣受歡迎的Stack Overflow。
我們目前的任務是如何分析用戶之前的回答,從而預測該用戶今后所能回答的問題。Stack Exchange目前的推薦邏輯可能比我們的好很多,但這不會阻止我們以個人學習目的去幫助他們;)。
我們將會做以下任務.
- 從Stack Exchange數據集中提取所需的信息
- 用提取的信息建立一個推薦
但是,一切從基礎開始!如果你是第一次接觸Apache Hadoop和Azure上的Hadoop,我建議你在開始之前閱讀這些介紹性的文章,在我介紹HDInsight和Map Reduce模式時以便知道更詳細的情況。
幕后
讓我們開始吧,先做一些分布式機器學習應用的“數據實驗”,酷!
- 建議 - 記得亞馬遜的建議吧?基于歷史來預測偏好。
- 聚類 - 從一堆文件內找出相關的文檔或從社區內找到志同道合的人等類似的任務。
- 分類 - 識別一個新條目屬于哪個分類,這通常包括一個先決的訓練系統,還需要一個檢測系統。
“大數據”這個術語通常在你需要處理一個非常大的數據集時使用。本文中,我們將從一個大的數據集中提取一些數據,然后根據我們提取的數據建立一個推薦。
推薦是什么?
從廣義上講,我們可以通過如下方式建立推薦:
- 基于一個用戶其類似用戶之前回答的問題來尋找該用戶可能愿意回答的問題。
- 尋找跟他之前回答問題相似的問題。
第一中技術手段稱為基于用戶的推薦,第二種稱為基于產品的推薦。
在第一種情況下,會嘗試確定你與該用戶共同回答了多少問題(你們倆都回答的問題)。例如,現在有User1、User2、User3和User4 - 回答了Q1、Q2、Q3和Q4這幾個問題。右邊的圖顯示了用戶及他們所回答的問題
根據這個圖,User1和User2回答了Q1、Q2和Q3 - User2回答了Q2和Q3但沒有回答Q1。現在,在一定程度上,我們可以放心地假設User3會有興起回答Q1 - 因為兩個和他一樣都回答過Q2和Q3的用戶已經回答過Q1。有點感覺了,是不是?所以,如果你有一個{用戶名,問題ID}的對應數組 - 建立一個推薦所需的數據基本已經滿足了。
邏輯表
現在,我們究竟該怎樣建立一個問題推薦呢?事實上,它是相當簡單的。
首先,我們需要找出用戶與問題對共同出現的次數。請注意,該矩陣與用戶無關。例如,如果Q1和Q2共同出現了2次(在上面的圖中),則共同發生值{Q1,Q2}即為2。這是個共生矩陣(希望我是對的)。
- Q1和Q2共同發生了2次 (User1、User2回答了Q1、Q2)
- Q1和Q3共同發生了2次 (User1、User2都回答了Q1、Q2)
- Q2和Q3共同發生了3次 (User1、User2和User3回答了Q2、Q3)
- 其他的類似統計
上面的矩陣顯示了上面我們所討論的問題(回答)對發生的次數。現在還沒有映射到用戶。現在,我們如何將這個聯系到用戶喜好呢?要找出一個問題匹配用戶的程度我們只需要:
- 找出該問題與另外問題共同被這個用戶回答的頻率
- 消除已經被該用戶回答的問題。
第一步,我們需要將上述矩陣變更成用戶的偏好矩陣。
讓我們拿User3做個示范,對User3來說,因為他已回答了問題Q2、Q3而沒有回答Q1、Q4,所以他對問題[Q1,Q2,Q3,Q4]的偏好映射為[0,1,1,0],現在,讓我們把這個數據與上面的矩陣合并,注意這是一個點積矩陣。結果顯示了用戶連帶回答各個問題的頻率(權重)。
我們可以在結果中忽略Q2和Q3,因為我們已經知道用戶回答了它們。現在剩下Q1和Q4 – Q1具有更高的值(4)更匹配User3。換種方式講,這表明Q1相比Q4來說與User3已回答的問題(Q2和Q3)關聯性更大–所以相比Q4,User3將更有興趣回答Q1。在實際的應用中需要注意:隨著用戶的回答會逐漸集中,用戶的偏好矩陣是將會是稀疏矩陣(主要是零)。該邏輯的優點:我們可以使用多個任務使用分布式的MapReduce模型來計算用戶矩陣,找到每個用戶的點積矩陣等。
查看我之前的例子或許對你有幫助:MapReduce介紹及例子。
實現
從實現的角度來看:
- 我們需要一個Hadoop集群
- 我們需要下載數據,解壓后分析 (Stack Overflow數據)
- Job 1 – 提取數據 - 將用戶回答的所有數據提取成{UserId, QuestionId}的形式,每條一行
- Job 2 – 建立推薦 - 使用上述的MapReduce模型輸出的用戶數據建立推薦。
讓我們運行起來!!
Step 1 - 配置集群
記住,Stack Exchange的數據量是很大的,因此我們需要有一個對等的分布式環境來處理。我們用Windows Azure,如果沒有帳戶的話可以注冊免費試用。進入起始頁面,打開HDInsight(Azure Hadoop)預設。
如果HDInsight可用便可以很容易地創建一個Hadoop集群,我創建了一個命名為stackanalyzer的集群。
集群就緒的話,你會在儀表盤界面上看到連接和管理按鈕(此處沒有圖片)。單擊“連接”按鈕就將連接到集群節點,這樣會打開一個遠程節點的桌面連接。你也可以點擊“管理”按鈕打開基于Web的儀表盤界面。(如果需要,請閱讀關于HDInsight的更多內容)
Step 2 - 分析你的數據
通過遠程桌面連接到集群的頭節點后就可以下載Stack Exchange數據了。你可以根據需要從Clear Bits下載Stack Exchange網站的數據。我在頭結點上安裝了Mu-Torrent客戶端,然后下載并解壓了http://cooking.stackexchange.com/的數據 – 解壓出的文件是一堆XML文件。
我們感興趣的東西在XML文件中。文件中的每一行包含一個問題或回答,如果是問題則posttypeid = 1,如果是回答則posttypeid = 2。ParentID代表回答對應的問題的ID,OwnerUserId代表做出該回答的人。
<row Id="16" PostTypeId="2" ParentId="2" CreationDate="2010-07-09T19:13:37.540" Score="3"
Body="<p>...shortenedforbrevity... </p>
"
OwnerUserId="34" LastActivityDate="2010-07-09T19:13:37.540" />
所以對于我們來說,我們需要提取PostTypeId=2(類型是回答)的所有發表內容的{OwnerUserId,ParentId},這是{User,Question,Votes}的一個表示。后面我們將使用這些數據生成推薦結果。
原文件的數據量很大,從中提取數據也是一個不小的工作。對于這個Cooking網站,它沒有那么多的數據–但如果分析整個Stack Overflow的話,文件大小也有數GB。針對提取的這個操作我們可以使用Hadoop并編寫一個自定義的MapReduce操作。
Step 3 - 從Dump(User,Question)中提取我們需要的數據
為了提取數據,我們將借助Hadoop來分配。首先要寫一個簡單的Mapper。就像前面提到過的,我們需要弄清楚所有PostTypeId=2的文章中的{OwnerUserId,ParentId}。這是因為我們要之后要為推薦工作輸入的是{user,item}。基于此,首先要把Posts.XML加載到HDFS。你可以使用hadoop fs命令把本地文件復制到指定的輸入路徑。
現在,是時候開始寫一個用戶映射來提取數據了。我們將使用Hadoop On Azure .NET SDK來寫Mapduce任務。不是我們在配置部分指明輸入目錄和輸出目錄。啟動Visual Studio,創建一個C#控制臺程序。如果你記得我之前寫的文章,你會知道hadoop fs<yourcommand>是用來訪問HDFS文件系統,當然如果你知道一些基本的*nix命令如 Is,cat等等會更好。
注意:: (之前的文章) 忽略HDInsight前面部分,你可以理解更多關于Map Reduce模型和Hadoop on Azure。
你需要通過Nuget包管理器來安裝Hadop SDK for .NET上的Hadoop Map Reduce包。
install-package Microsoft.Hadoop.MapReduce
有下面的代碼,我們可以
- 創建一個 映射
- 創建一個任務
- 提交任務到集群
具體如下:
using System; using System.Collections.Generic; using System.Globalization; using System.Linq; using System.Text; using System.Xml.Linq; using Microsoft.Hadoop.MapReduce; namespace StackExtractor { //Our Mapper that takes a line of XML input and spits out the {OwnerUserId,ParentId,Score} //i.e, {User,Question,Weightage} public class UserQuestionsMapper : MapperBase { public override void Map(string inputLine, MapperContext context) { try { var obj = XElement.Parse(inputLine); var postType = obj.Attribute("PostTypeId"); if (postType != null && postType.Value == "2") { var owner = obj.Attribute("OwnerUserId"); var parent = obj.Attribute("ParentId"); // Write output data. Ignore records will null values if any if (owner != null && parent != null ) { context.EmitLine(string.Format("{0},{1}", owner.Value, parent.Value)); } } } catch { //Ignore this line if we can't parse } } } //Our Extraction Job using our Mapper public class UserQuestionsExtractionJob : HadoopJob<UserQuestionsMapper> { public override HadoopJobConfiguration Configure(ExecutorContext context) { var config = new HadoopJobConfiguration(); config.DeleteOutputFolder = true; config.InputPath = "/input/Cooking"; config.OutputFolder = "/output/Cooking"; return config; } } //Driver that submits this to the cluster in the cloud //And will wait for the result. This will push your executables to the Azure storage //and will execute the command line in the head node (HDFS for Hadoop on Azure uses Azure storage) public class Driver { public static void Main() { try { var azureCluster = new Uri("https://{yoururl}.azurehdinsight.net:563"); const string clusterUserName = "admin"; const string clusterPassword = "{yourpassword}"; // This is the name of the account under which Hadoop will execute jobs. // Normally this is just "Hadoop". const string hadoopUserName = "Hadoop"; // Azure Storage Information. const string azureStorageAccount = "{yourstorage}.blob.core.windows.net"; const string azureStorageKey = "{yourstoragekey}"; const string azureStorageContainer = "{yourcontainer}"; const bool createContinerIfNotExist = true; Console.WriteLine("Connecting : {0} ", DateTime.Now); var hadoop = Hadoop.Connect(azureCluster, clusterUserName, hadoopUserName, clusterPassword, azureStorageAccount, azureStorageKey, azureStorageContainer, createContinerIfNotExist); Console.WriteLine("Starting: {0} ", DateTime.Now); var result = hadoop.MapReduceJob.ExecuteJob<UserQuestionsExtractionJob>(); var info = result.Info; Console.WriteLine("Done: {0} ", DateTime.Now); Console.WriteLine("\nInfo From Server\n----------------------"); Console.WriteLine("StandardError: " + info.StandardError); Console.WriteLine("\n----------------------"); Console.WriteLine("StandardOut: " + info.StandardOut); Console.WriteLine("\n----------------------"); Console.WriteLine("ExitCode: " + info.ExitCode); } catch(Exception ex) { Console.WriteLine("Error: {0} ", ex.StackTrace.ToString(CultureInfo.InvariantCulture)); } Console.WriteLine("Press Any Key To Exit.."); Console.ReadLine(); } } }
現在編譯和運行上的程序。 執行工作(ExecuteJob)會上傳所需的二進制文件到集群,并初始化一個Hadoop數據流工作(Streaming Job),它會在集群上運行我們的映射(Mappers),并輸入存儲在輸入文件夾中的Posts文件。我們的控制臺程序把任務提交到云端,并等待執行的結果。Hadoop SDK將更新映射-歸并二進制文件到二進制容器(blob)中,并組建所需命令行來執行任務(見之前寫的理解如何手動實現的文章-)。你可以點擊桌面快捷方式中的Hadoop映射歸并狀態追蹤來檢查頭結點中的任務。
如果一切進展順利的話,你會看到如下結果:
正如你上面看到的,你可以在/output/Cooking目錄中找到輸出。如果你RDP到你集群的頭結點,現在檢查你的輸出目錄,你應該可以看到Map Reduce Job創建的如下文件。
正如你所期待的,這些文件包含了提取的數據,這些數據代表UserId,QuestionId--所有被一個用戶回答的問題。如果你愿意,你可以把數據從HDFS加載到Hive,并用帶有ODBC的Microsoft Excel觀察到同樣的結果。可以參看我之前寫的文章。
第 4 步 – 構建推薦系統并產生推薦
作為下一步,我們需要構建共生矩陣并運行推薦作業,將{UserId,QuestionId}數據轉變為推薦。幸運的是,我們不需要為此寫Map Reduce作業。我們可以使用Hadoop的同時使用Mahout庫。 這里閱讀Mahout相關資料
RDP到集群的頭節點,因為需要安裝Mahout。下載本文寫作時最新版本的Mahout (0.7),原樣拷貝到集群頭節點的c:\app\dist目錄。
Mahout的推薦作業(Recommender Job)支持以多種算法創建推薦——在本例中,我們將使用 SIMILARITY_COOCCURRENCE。 Mahout網站的 算法頁面有更多關于推薦,聚類和分類的算法信息。我們將使用/output/Cooking目錄的文件創建推薦信息。
該運行推薦作業了。創建一個users.txt,在該文件中記下推薦用戶的ID,并且拷貝到HDFS(Hadoop Distributed File System)。
現在,接下來的命令將啟動推薦作業。記住,我們會用上面Map Reduce作業的輸出文件作為推薦系統的輸入。我們點擊啟動這個推薦作業。這會在/recommend/目錄下產生輸出,就是在users.txt文件中指定的所有用戶。可以使用–num 推薦開關指定針對每個用戶的推薦數量。如果用戶與條目之間有偏好關系,(如用戶播放一首歌的次數),那么可以采用{user,item,preferencevalue}形式給推薦系統提供輸入數據集——在本例中,我們省略了偏好權重。
注意: 如果下面的命令在運行后失敗了,提示輸出目錄已經存在,就試試使用hadoop fs –rmr temp and hadoop fs –rmr /recommend/ 刪除tmp目錄和輸出目錄。
hadoop jar c:\Apps\dist\mahout-0.7\mahout-core-0.7-job.jar org.apache.mahout.cf.taste.hadoop.item.RecommenderJob -s SIMILARITY_COOCCURRENCE --input=/output/Cooking --output=/recommend/ --usersFile=/data/users.txt
作業結束以后,檢查/recommend/文件夾,試著打印一下生成文件的內容。你將會看到top推薦,針對users.txt文件中的每個用戶Id。
因此,如果給用戶類似建議,推薦引擎會認為User 1393可以回答問題6419, 16897 等等。你可以試驗一下其他類似的類,像SIMILARITY_LOGLIKELIHOOD, SIMILARITY_PEARSON_CORRELATION等等,以便找到最好的結果。迭代與優化到你滿意為止。
不過這里嘗試的是另一種實踐——檢查Stack Exchange數據集,找出怎樣去創建一個推薦,根據用戶喜歡的問題,顯示出'你或許也會喜歡‘的問題?
總結
在這個例子中,我們手動上傳所需的輸入文件到HDFS,并手動觸發推薦系統任務(Recommender Job)。事實上, 你可以利用Hadoop For Azure SDK自動完成整個的工作流。但這是另一篇文章要講的,敬請期待。現實中有很多分析工作要做,包括為提取并轉存數據到HDFS編寫映射/歸并,自動創建hive表,用HiveQL或者PIG執行操作等等。然而,我們剛剛檢查的這些步驟涉及到用Azure,Hadoop和Mahout做一些有意義的事。
你在你的移動應用或者ASP.NET程序中訪問這些數據,或者使用Sqoop導入到SQL Server,或者把數據加載到Hive表中,這是我剛提到的。快樂編程,機器學習(Happy Coding and Machine Learning)。把HD Insight捆綁到現存的應用來建立端對端工作流,如果你對這種案列比較感興趣,你可以聯系我。
我推薦你繼續閱讀