微博廣告推薦中有關Hadoop的那些事
一、背景
微博,一個DAU上億、每日發博量幾千萬的社交性產品,擁有龐大的數據集。如何高效得從如此規模的數據集中挖掘出有價值的信息,以增強用戶粘性,提 高信息傳播速度,就成了重中之重。因此,我們引入了hadoop 分布式計算平臺,對用戶數據和內容數據進行分析和挖掘,作為廣告推薦的基礎。
二、問題及解決方案
在hadoop平臺上進行開發時,主要遇到了以下一些問題:
2.1 數據量龐大
問題:無論在進行針對用戶的協同過濾運算,還是在計算用戶可能錯過的微博中,無一例外的都遇到了數據量太大無法進行運算的情況。因此,精簡計算數據成為了亟待解決的問題。
解決方案一:在用戶推薦方面,可以對候選集合被推薦的概率進行預估, 將具有極小推薦機會的數據忽略不計。目前通用的方法,就是直接選取活躍用戶作為計算對象,既能夠降低計算量,又能夠保證獲得預期的推薦效果。另外,對于一 些超級節點,比如擁有很多粉絲的V用戶,它會衍生出大量的關系鏈,導致計算規模暴增和數據分布偏移。對于這類節點,需要將與其相關的數據進行優選過濾。簡 而言之,就是優選候選節點。
解決方案二:在微博內容推薦方面,主要從微博內容的質量入手。對于那 些信息量少、色情、垃圾等內容的微博,需要將其剔除,以保證候選集的質量。通過對同類微博推薦產品的點擊日志統計后,發現無圖微博的點擊率較低,而該類微 博大概占總微博數的10%,在對推薦效果影響不大的前提下,將該類數據從候選集中剔除,也能夠大大降低計算量。
2.2 HDFS數據與線下交互不便
問題一:目前數據挖掘方面的業務基本都是放到hadoop平臺上來進行,計算結果保存在HDFS上。而HDFS上的數據必須通過hadoop平臺通道機中轉后再傳送至服務器端,傳輸效率較低。
解決方案:針對HDFS數據與線下交互不便的問題,我們在hadoop gateway上搭建了socat服務。socat是一個多功能的網絡工具,它是兩個獨立數據通道之間的雙向數據傳輸的繼電器。這些數據通道包含文件、管 道、設備、TCP/UDP、SSL\SOCKS4客戶端或者代理CONNECT。在任何一臺與gateway互通的服務器上,拉取相應的hadoop 和jdk,就可以方便地與hadoop平臺交互,實現互通。
問題二:目前對于離線數據,我們常用lushan來進行掛載,但其數據格式與hdfs上默認支持的數據格式不同,無法直接使用。
解決方案:直接在hadoop平臺生成lushan需要的文件格式。我們繼承了 FileOutputFormat,實現相應的write方法,生成了一個LushanFileOutputFormat,用于直接將結果數據以 lushan數據格式輸出。同理,用戶也可以實現任何自定義輸入和輸出格式。
2.3業務邏輯復雜且運行過程不便監控
問題一:在日常的數據挖掘中,往往需要綜合多種數據,業務邏輯紛繁復,用戶只能自己實現業務流程。
問題二:hadoop job 正式上線后,用戶最關注的就是該job是否正常執行,一旦異常能否及時收到通知,而人工通過jobtracker來監測是不現實的。
解決方案:基于以上兩點,我們引入了hadoop平臺提供的調度系統(Scheduler System)。用戶可以將業務分為幾個子模塊,每個模塊作為一個節點來實現對應的功能。用戶只需要通過圖形化界面將相互獨立或者依賴的job節點進行連 接,即可完成整個業務流程的搭建,還能夠實現節點的復用。用戶還可以對相應的節點設置監控報警信息,一旦出錯,調度系統會根據用戶設置進行報警提示。
2.4 mapreduce開發過程繁瑣
問題:做過mapreduce開發的人可能都有一個同感,除了核心邏輯以外,需要敲入大量相對固定的代碼,比如map/reduce函數的定義,Job的輸入、輸出數據以及對應的數據格式等等。這些信息相對固定,但又不可或缺。
解決方案:mapreduce開發框架為此誕生了。該框架致力于讓程序員將關注點放在核心功能的實現上,更簡便的實現map/reduce的調用流程。其功能說明如下:
- Driver.java
實現模塊集成化,在運行時通過指定類名來執行相應的操作。可以將多種功能集合到一個jar包中,便于維護。見如下例子,就可以將GetUserSchoolPro 類加入到jar包中進行調用。
ProgramDriver pgd = new ProgramDriver();
pgd.addClass(GetUserCompanyPro.Name, GetUserCompanyPro.class, "GetUserCompanyPro");
假設生成的jar包為frame_mapred.jar,其執行方式如下:
hadoop jar frame_mapred.jar GetUserCompanyPro -companydata /dw/ods/ods_user_career/$yestday -outputpath /dw_ext/recmd/dongna/userinfo/user_company/$yestday_d -outputformat text -reducenum 200
- FrameMapred.java:
該部分提供4個接口可供用戶使用, AddMapper函數用來進行map操作,AddReducer函數用來進行reduce操作,loadResource函數可以用來從本地加載資源數據至內存中,供map/reduce 使用。
// generate data only with map
public static int AddMapper(Configuration conf, JobConf job, String strInputPath, String strOutputPath, Class<? extends InputFormat> clsInputFileFormat, Class<? extends Mapper> clsMapClass, String strOutputFormat, Class<?> clsMapOutputKey, Class<?> clsMapOutputValue)
//generate data with map and reduce
public static int AddMapper(JobConf job, String strInputPath, Class<? extends InputFormat> clsInputFileFormat, Class<? extends Mapper> clsMapClass, Class<?> clsMapOutputKey, Class<?> clsMapOutputValue)
//add reduce
public static int AddReducer(Configuration conf, JobConf job, String strOutputPath, String strOutputFormat, Class<? extends Reducer> clsRedClass, Class<?> clsOutputKey, Class<?> clsOutputValue)
// load local resource
public static int loadResource(JobConf job, String strFilePath, String strResName)
另外,在日常工作中,經常會針對badcase來查錯,無一例外的需要查看各種中間數據的正確性。由于hadoop生成數據大部分都是非文本數據, 就必須要先編寫解析程序以達到目的。基于此,該hadoop開發框架中對于常見的rcfile, sequencefile 文件也提供了通用的解析工具,以期降低這方面的人力消耗。
三、系統架構
3.1 獲取離線數據架構
圖2 獲取離線數據架構圖
該框架可以實現Hadoop數據挖掘-線下加載的自動化,可靠性較高。通過調度系統定時啟動或者由外部調用接口觸發計算流程,計算完畢后,數據存儲至 HDFS上。線下存儲服務通過訪問SOCAT可以與HDFS進行數據交互,同時線下存儲服務中的數據也可以通過SOCAT中轉上傳至HDFS。
3.2 數據實時獲取架構
請求數據發送至RIN(統一數據入口),經隊列消費程序確定數據的獲取位置(后臺存儲、OPENAPI,HADOOP)后并分發。通過訪問OPENAPI 和后臺存儲來獲取全部數據后直接進行數據分發。有時需要hadoop平臺和存儲服務相結合并對數據進行合并,再進行數據分發。
圖3 數據實時獲取架構圖
四、發展
4.1 hadoop 開發框架擴展
目前hadoop開發框架的功能還不完善,主要有以下幾個功能:
- 支持基于map/reduce業務的快捷開發
- 將相互獨立的功能模塊進行打包,便于維護
- RCFile、SequenceFile、LZO文件的解析工具
后續會添加以下功能:
- 充實通用工具包,提供轉置、倒排、簡單map/reduce的數據抽取工具。
- 編寫通用的MapReduce作業的鏈接工具,能夠支持具有依賴、預處理和后處理階段的鏈接。以減少中間階段的IO,提高效率。
4.2 R9 Interface 任務提交平臺
該平臺致力于遠程提交MapReduce任務和Hive sql操作,并能夠與線下實現互通,完成數據分發及存儲,結合報警監控工具保障整個業務流程的可控性。其框架圖如下:
圖4 R9 Interface 框架圖
來自:http://www.wbrecom.com/?p=512