Collective的Spark ML經驗分享:讀者模型
Collective成立于2005年,其總部位于紐約,是一家從事 數字廣告業務的公司。 該公司的數字廣告業務非常依賴于機器學習和預測模型,對于特定的用戶在特定的時間應該投放什么樣的廣告完全是由實時或者離線的機器學習模型決定的。本文來 自Databricks的技術博客,Eugene Zhulenev分享了自己在Collective公司 從事機器學習和讀者模型工作的經驗。
Collective公司有很多使用機器學習的項目,這些項目可以統稱為讀者模型,因為這些項目都是基于用戶的瀏覽歷史、行為數據等因素預測讀者轉 化、點擊率等信息的。在機器學習庫的選擇上,Collective公司內部新開發的大部分項目都是基于Spark和Spark MLLib的,對于一些被大家廣泛使用而Spark并不具備的工具和類庫Collective還專門創建了一個擴展庫Spark Ext。在本文中,Eugene Zhulenev介紹了如何使用Spark Ext和Spark ML兩個類庫基于地理位置信息和瀏覽歷史數據來預測用戶轉化。
預測數據
預測數據包含兩種數據集,雖然這些數據都是使用虛擬的數據生成器生成的,但是它們與數字廣告所使用的真實數據非常相似。這兩類數據分別是:
用戶的瀏覽歷史日志
Cookie | Site | Impressions --------------- |-------------- | ------------- wKgQaV0lHZanDrp | live.com | 24 wKgQaV0lHZanDrp | pinterest.com | 21 rfTZLbQDwbu5mXV | wikipedia.org | 14 rfTZLbQDwbu5mXV | live.com | 1 rfTZLbQDwbu5mXV | amazon.com | 1 r1CSY234HTYdvE3 | 油Tube.com | 10
經緯度地理位置日志
Cookie | Lat | Lng | Impressions --------------- |---------| --------- | ------------ wKgQaV0lHZanDrp | 34.8454 | 77.009742 | 13 wKgQaV0lHZanDrp | 31.8657 | 114.66142 | 1 rfTZLbQDwbu5mXV | 41.1428 | 74.039600 | 20 rfTZLbQDwbu5mXV | 36.6151 | 119.22396 | 4 r1CSY234HTYdvE3 | 42.6732 | 73.454185 | 4 r1CSY234HTYdvE3 | 35.6317 | 120.55839 | 5 20ep6ddsVckCmFy | 42.3448 | 70.730607 | 21 20ep6ddsVckCmFy | 29.8979 | 117.51683 | 1
轉換預測數據
正如上面所展示的,預測數據是長格式,對于每一個cookie與之相關的記錄有多條,通常情況下,這種格式并不適合于機器學習算法,需要將其轉換成“主鍵——特征向量”的形式。
Gather轉換程序
受到了R語音 tidyr和reshape2包的啟發,Collective將每一個鍵對應的值的長數據框(long DataFrame)轉換成一個寬數據框(wide DataFrame),如果某個鍵對應多個值就應用聚合函數。
val gather = new Gather() .setPrimaryKeyCols("cookie") .setKeyCol("site") .setValueCol("impressions") .setValueAgg("sum") //通過key對impression的值求和 .setOutputCol("sites") val gatheredSites = gather.transform(siteLog)
轉換后的結果
Cookie | Sites -----------------|---------------------------------------------- wKgQaV0lHZanDrp | [ | { site: live.com, impressions: 24.0 }, | { site: pinterest.com, impressions: 21.0 } | ] rfTZLbQDwbu5mXV | [ | { site: wikipedia.org, impressions: 14.0 }, | { site: live.com, impressions: 1.0 }, | { site: amazon.com, impressions: 1.0 } | ]
Google S2幾何單元Id轉換程序
Google S2幾何類庫是一個球面幾何類庫,該庫非常適合于操作球面(通常是地球)上的區域和索引地理數據,它會為地球上的每一個區域分配一個唯一的單元Id。
為了將經緯度信息轉換成鍵值對的形式,Eugene Zhulenev結合使用了S2類庫和Gather,轉換后數據的鍵值是S2的單元Id。
// Transform lat/lon into S2 Cell Id val s2Transformer = new S2CellTransformer() .setLevel(5) .setCellCol("s2_cell") // Gather S2 CellId log val gatherS2Cells = new Gather() .setPrimaryKeyCols("cookie") .setKeyCol("s2_cell") .setValueCol("impressions") .setOutputCol("s2_cells") val gatheredCells = gatherS2Cells.transform(s2Transformer.transform(geoDf))
轉換后的結果
Cookie | S2 Cells -----------------|---------------------------------------------- wKgQaV0lHZanDrp | [ | { s2_cell: d5dgds, impressions: 5.0 }, | { s2_cell: b8dsgd, impressions: 1.0 } | ] rfTZLbQDwbu5mXV | [ | { s2_cell: d5dgds, impressions: 12.0 }, | { s2_cell: b8dsgd, impressions: 3.0 }, | { s2_cell: g7aeg3, impressions: 5.0 } | ]
生成特征向量
雖然Gather程序將與某個cookie相關的所有信息都組織到了一行中,變成了鍵值對的形式,但是這種形式依然不能作為機器學習算法的輸入。為了能夠訓練一個模型,預測數據需要表示成double類型的向量。
Gather 編碼程序
使用虛擬變量對明確的鍵值對進行編碼。
// Encode S2 Cell data val encodeS2Cells = new GatherEncoder() .setInputCol("s2_cells") .setOutputCol("s2_cells_f") .setKeyCol("s2_cell") .setValueCol("impressions") .setCover(0.95) // dimensionality reduction
原始數據
Cookie | S2 Cells -----------------|---------------------------------------------- wKgQaV0lHZanDrp | [ | { s2_cell: d5dgds, impressions: 5.0 }, | { s2_cell: b8dsgd, impressions: 1.0 } | ] rfTZLbQDwbu5mXV | [ | { s2_cell: d5dgds, impressions: 12.0 }, | { s2_cell: g7aeg3, impressions: 5.0 } | ]
轉換后的結果
Cookie | S2 Cells Features -----------------|------------------------ wKgQaV0lHZanDrp | [ 5.0 , 1.0 , 0 ] rfTZLbQDwbu5mXV | [ 12.0 , 0 , 5.0 ]
對于轉換后的結果,用戶還可以根據場景選擇性地使用頂部轉換進行降維。首先計算不同用戶每個特征的值,然后根據特征值進行降序排序,最后從結果 列表中選擇最上面那些數值總和占所有用戶總和的百分比超過某個閾值(例如,選擇最上面覆蓋99%用戶的那些網站)的數據作為最終的分類值。
Spark ML 管道
Spark ML 管道是Spark MLLib的一個新的高層API。一個真正的ML管道通常會包含數據預處理、特征提取、模型擬合和驗證幾個階段。例如,文本文檔的分類可能會涉及到文本分 割與清理、特征提取、使用交叉驗證訓練分類模型這幾步。在使用Spark ML時,用戶能夠將一個ML管道拆分成多個獨立的階段,然后可以在一個單獨的管道中將他們組合到一起,最后使用交叉驗證和參數網格運行該管道從而找到最佳 參數集合。
使用Spark ML管道將它們組合到一起
// Encode site data val encodeSites = new GatherEncoder() .setInputCol("sites") .setOutputCol("sites_f") .setKeyCol("site") .setValueCol("impressions") // Encode S2 Cell data val encodeS2Cells = new GatherEncoder() .setInputCol("s2_cells") .setOutputCol("s2_cells_f") .setKeyCol("s2_cell") .setValueCol("impressions") .setCover(0.95) // Assemble feature vectors together val assemble = new VectorAssembler() .setInputCols(Array("sites_f", "s2_cells_f")) .setOutputCol("features") // Build logistic regression val lr = new LogisticRegression() .setFeaturesCol("features") .setLabelCol("response") .setProbabilityCol("probability") // Define pipeline with 4 stages val pipeline = new Pipeline() .setStages(Array(encodeSites, encodeS2Cells, assemble, lr)) val evaluator = new BinaryClassificationEvaluator() .setLabelCol(Response.response) val crossValidator = new CrossValidator() .setEstimator(pipeline) .setEvaluator(evaluator) val paramGrid = new ParamGridBuilder() .addGrid(lr.elasticNetParam, Array(0.1, 0.5)) .build() crossValidator.setEstimatorParamMaps(paramGrid) crossValidator.setNumFolds(2) println(s"Train model on train set") val cvModel = crossValidator.fit(trainSet)
結論
Spark ML API讓機器學習變得更加容易。同時,用戶還可以通過Spark Ext創建自定義的轉換/估計,并對這些自定義的內容進行組裝使其成為更大管道中的一部分,此外這些程序還能夠很容易地在多個項目中共享和重用。如果想要查看本示例的代碼,可以點擊這里。
來自:http://www.infoq.com/cn/news/2015/11/collective-audience-model-exampl