大數據挖掘更多時間都在于清洗數據
編者按:本文作者汪榕曾寫過一篇文章:《 以什么姿勢進入數據挖掘會少走彎路 》,是對想入行大數據的讀者的肺腑之言,其中也表達了作者的一些想法,希望大家不要隨便去上沒有結合業務的收費培訓班課程;而后,他有了結合他本人的工作經驗,寫一系列幫助大家進行實踐學習課程文章的想法,InfoQ也覺得這是件非常有意義的事情,特別是對于大數據行業1-3年工作經驗的人士,或者是沒有相關工作經驗但是想入行大數據行業的人。課程的名稱是“數據挖掘與數據產品的那些事”,目的是:1. 引導目標人群正確學習大數據挖掘與數據產品;2. 協助代碼能力薄弱的學習者逐漸掌握大數據核心編碼技巧;3. 幫助目標人群理解大數據挖掘生態圈的數據流程體系;4. 分享大數據領域實踐數據產品與數據挖掘開發案例;5.交流大數據挖掘從業者職業規劃和發展方向。這系列文章會在InfoQ上形成一個專欄,本文是專欄的第三篇。
前言:很多初學的朋友對大數據挖掘第一直觀的印象,都只是業務模型,以及組成模型背后的各種算法原理。往往忽視了整個業務場景建模過程中,看似最普通,卻又最精髓的特征數據清洗。可謂是平平無奇,卻又一掌定乾坤,稍有閃失,足以功虧一簣。
一、數據清洗的那些事
構建業務模型,在確定特征向量以后,都需要準備特征數據在線下進行訓練、驗證和測試。同樣,部署發布離線場景模型,也需要每天定時跑P加工模型特征表。
而這一切要做的事,都離不開數據清洗,業內話來說,也就是 ETL處理 (抽取Extract、轉換Transform、加載Load),三大法寶。

來自于百度百科
在大數據圈里和圈外,很多朋友都整理過數據,我們這里稱為 清洗數據 。
不管你是叱咤風云的Excel大牛,還是玩轉SQL的數據庫的能人,甚至是專注HQL開發ETL工程師,以及用MapReduce\Scala語言處理復雜數據的程序猿。(也許你就是小白一個)
我想說的是,解決問題的技術有高低,但是解決問題的初衷只有一個——把雜亂的數據清洗干凈,讓業務模型能夠輸入高質量的數據源。
不過,既然做的是大數據挖掘,面對的至少是G級別的數據量(包括用戶基本數據、行為數據、交易數據、資金流數據以及第三方數據等等)。那么選擇正確的方式來清洗特征數據就極為重要,除了讓你事半功倍,還至少能夠保證你在方案上是可行的。
二、大數據的必殺技
在大數據生態圈里,有著很多開源的數據ETL工具,每一種都私下嘗嘗鮮也可以。但是對于一個公司內部來說,穩定性、安全性和成本都是必須考慮的。
就拿Spark Hive和Hive來說,同樣是在Yarn上來跑P,而且替換任務的執行引擎也很方便。

修改任務執行引擎
的確,Spark的大多數任務都會比MapReduce執行效率要快差不多1/3時間。但是,Spark對內存的消耗是很大的,在程序運行期間,每個節點的負載都很高,隊列資源消耗很多。因此,我每次提交Spark離線模型跑任務時,都必須設置下面的參數,防止占用完集群所有資源。
spark-submit --master yarn-cluster --driver-memory 5g --executor-memory 2g --num-executors 20
其中:
- driver-memory 是用于設置Driver進程的內存,一般不設置,或者1G。我這里調整到5G是因為RDD的數據全部拉取到Driver上進行處理,那要確保Driver的內存足夠大,否則會出現OOM內存溢出。
- executor-memory 是用于設置每個Executor進程的內存。Executor內存的大小決定了Spark作業的性能。
- num-executors 是用于設置Spark作業總共要用多少個Executor進程來執行。這個參數如果不設置,默認啟動少量的Executor進程,會很大程度影響任務執行效率。
單獨的提交Spark任務,優化參數還可以解決大部分運行問題。但是完全替換每天跑P加工報表的執行引擎,從MapReduce到Spark,總會遇到不少意想不到的問題。對于一個大數據部門而言,另可效率有所延遲,但是數據穩定性是重中之重。

Spark運行Stage
所以,大部分數據處理,甚至是業務場景模型每天的數據清洗加工,都會優先考慮Hive基于MapRedcue的執行引擎,少部分會單獨使用編寫MapReduce、Spark程序來進行復雜處理。
三、實踐中的數據清洗
這節要介紹的內容其實很多,單獨對于Hive這方面,就包括執行計劃、常用寫法、內置函數、一些自定義函數,以及優化策略等等。
幸運的是,這方面資源在網上很全,這是一個值得欣慰的點,基本遇到的大多數問題都能夠搜到滿意答案。
因此,文章這個版塊主要順著這條主線來—— (我在大數據挖掘實踐中所做的模型特征清洗) ,這樣對于大數據挖掘的朋友們來說,更具有針對性。
3.1 知曉數據源
( 這里不擴展數據源的抽取和行為數據的埋點 )
大數據平臺的數據源集中來源于三個方面,按比重大小來排序:
60%來源于關系數據庫的同步遷移: 大多數公司都是采用MySQL和Oracle,就拿互聯網金融平臺來說,這些數據大部分是用戶基本信息,交易數據以及資金數據。
30%來源于平臺埋點數據的采集: 渠道有PC、Wap、安卓和IOS,通過客戶端產生請求,經過Netty服務器處理,再進Kafka接受數據并解碼,最后到Spark Streaming劃分為離線和實時清洗。
10%來源于第三方數據: 做互聯網金融都會整合第三方數據源,大體有工商、快消、車房、電商交易、銀行、運營商等等,有些是通過正規渠道來購買(已脫敏),大部分數據來源于黑市(未脫敏)。這個市場 魚龍混雜、臭氣熏天 ,很多真實數據被注入了污水,在這基礎上建立的模型可信度往往很差。

得數據,得天下?
3.2 業務場景模型的背景
看過我以前文章集的朋友都知道一點,我致力于做大數據產品。
在之前開發數據產品的過程中,有一次規劃了一個頁面—— 用戶關系網絡 ,底層是引用了一個組合模型。
簡單來說是對用戶群體細分,判斷用戶屬于那一類別的羊毛黨群體,再結合業務運營中的彈性因子去綜合評估用戶的風險。

截圖的原型Demo
大家看到這幅圖會有什么想法?
簡單來說,原型展示的是分析兩個用戶之間在很多維度方面的關聯度
當時這個功能在后端開發過程中對于特征數據的處理花了很多時間,有一部分是數據倉庫工具HQL所不能解決的,而且還需要考慮 完整頁面(截圖只是其中一部分) 查詢的響應時間, 這就得預先標準化業務模型的輸出結果 。
我可以簡單描述下需求場景:
- 拿IP地址來說,在最近30天范圍內,用戶使用互聯網金融平臺,不管是PC端,還是無線端,每個用戶每個月都會產生很多IP數據集。
- 對于擁有千萬級別用戶量的平臺,肯定會出現這樣的場景—— 很多用戶在最近一個月內都使用過相同的IP地址,而且數量有多有少。
- 對某個用戶來說,他就好像是一個 雪花中的焦點 ,他使用過的IP地址就像雪花一樣圍繞著他。而每個IP地址都曾被很多用戶使用過。
簡單來說,IP地址只是一個媒介,連接著不同用戶。—— 你中有我,我中有你。

雪花狀
有了上面的背景描述,那么就需要每個讀者都去思考下這三個問題:
問題一、如何先通過某個用戶最近30天的IP列表去找到使用相同IP頻數最多的那一批用戶列表呢?
問題二、如何結合關系網絡的每個維度(IP、設備指紋、身份證、銀行卡和加密隱私等等),去挖掘與該用戶關聯度最高的那一批用戶列表?
問題三、如何對接產品標準化模型輸出,讓頁面查詢的效應時間變得更快些?
思考就像吃大理核桃般,總是那么耐人尋味。
3.3 學會用Hive解決70%的數據清洗
對于 70% 的數據清洗都可以使用Hive來完美解決,而且網絡參考資料也很全,所以大多數場景我都推薦用 Hive 來清洗。—— 高效、穩定
不過在使用過程中,我有兩點建議送給大家:
第一點建議: 要學會顧全大局,不要急于求成,學會把復雜的查詢拆開寫,多考慮集群整個資源總量和并發任務數。
第二點建議: 心要細,在線下做好充足的測試,確保安全性、邏輯正確和執行效率才能上線。
禮物也送了,繼續介紹
對于上述的 用戶關系網絡 場景,這里舉IP維度來實踐下,如何利用Hive進行數據清洗。
下面是用戶行為日志表的用戶、IP地址和時間數據結構。

用戶、IP和時間
回到上面的第一個思考, 如何先通過某個用戶最近30天的IP列表去找到使用相同IP頻數最多的那一批用戶列表呢?
我當時采取了兩個步驟。
步驟一:清洗最近30天所有IP對應的用戶列表,并去重用戶
select ip,concat_ws('_',collect_set(cast(mid as string)))
from tmp.fraud_sheep_behavdetail_union
where ip is not null and systime='2016-12-06'
group by ip
這里解釋三個內置函數 concat_ws 、 collect_set 和 cast ,先更了解必須去親自實踐:
- concat_ws,它是用來分隔符字符串連接函數。
- collect_set,它是用來將一列多行轉換成一行多列,并去重用戶。
- cast,它是用來轉換字段數據類型。
果然很方便吧,下面是第一個步驟的執行結果。

IP馬賽克
步驟二:清洗用戶在IP媒介下,所有關聯的用戶集列表
select s1.mid,concat_ws('_',collect_set(s2.midset)) as ip_midset
from (select ip,mid from tmp.fraud_sheep_behavdetail_union where systime>='2016-11-06' group by ip,mid) s1
join (
select ip,concat_ws('_',collect_set(cast(mid as string))) as midset
from tmp.fraud_sheep_behavdetail_union
where ip is not null and systime>='2016-11-06'
group by ip) s2 on (s1.ip=s2.ip)
group by s1.mid
最終對于IP媒介清洗的數據效果如下所示:
1816945284629847 1816945284629847_3820150008135667_1850212776606754_3820150012550757
_3820150006640108_1823227153612976_3820150001918669_1816945284629847
1816945284629848 1816945284629848_3820150002527117_100433_3820150009829678_
100433_100433_3820150002811537_3820150008901840_3820150012766737
_100433_3800000242066917_100433
同理對于其他維度的媒介方法一樣,到這一步,算是完成Hive階段的初步清洗,是不是很高效。
會員ID 性別 加密隱私 身份證號 銀行卡號 IP地址 設備指紋
18231292 男 18231293:男 18232394:男 382015495:男_18232272:男
38201500:女_38201509:女_382937:女 3820152901:男_38204902:男_3820486:男_38201326:女
但是對于分析用戶細分來說,還需要借助MapReduce,或者Scala來深層次處理特征數據。
3.4 使用Scala來清洗特殊的數據
對于使用Spark框架來清洗數據,我一般都是處于下面兩個原因:
- 常規的HQL解決不了
- 用簡潔的代碼高效計算,也就是考慮開發成本和執行效率
對于部署本機的大數據挖掘環境,可以查看這兩篇文章來實踐動手下:
工欲善其事,必先利其器。有了這么好的利器,處理復雜的特征數據,那都是手到擒來。
借助于Hive清洗處理后的源數據,我們繼續回到第二個思考 ——如何結合關系網絡的每個維度,去初步挖掘與該用戶關聯度最高的那一批用戶列表?
看到這個問題,又產生了這幾個思考:
- 目前有五個維度,以后可能還會更多,純手工顯然不可能,再使用Hive好像也比較困難。
- 每個維度的關聯用戶量也不少,所以基本每個用戶每行數據的處理采用單機串行的程序去處理顯然很緩慢。不過每行的處理是獨立性的。
- 同一個關聯用戶會在同一個維度,以及每一個維度出現多次,還需要進行累計。
如果才剛剛處理大數據挖掘,遇到這樣的問題的確很費神,就連你們常用的Python和R估計也難拯救你們。但是如果實戰比較多,這樣的獨立任務,完全可以并發到每臺計算節點上去每行單獨處理,而我們只需要在處理每行時,單獨調用清洗方法即可。
這里我優先推薦使用Spark來清洗處理( 后面給一個MapReduce的邏輯 ),整個核心過程主要有三個板塊
預處理,對所有關聯用戶去重,并統計每個關聯用戶在每個維度的累計次數
//循環每個維度下的關聯用戶集
for(j <- 0 until value.length){
//用列表存放所有關聯用戶集
if(value.apply(j).split(SEPARATOR4).size==2 && value.apply(j).split(SEPARATOR4).apply(0)!=mid){
midList.append(value.apply(j))
}
if(setMap.contains(value.apply(j))){
//對每個維度關聯用戶的重復次數匯總
val values = setMap.get(value.apply(j)).get
setMap=setMap.+((value.apply(j),1+values))
}else{
setMap=setMap.+((value.apply(j),1))
}
}
評分,循環上述關聯用戶集,給關聯度打一個分
for(ii <- 0 until distinctMidList.size){
var reationValue = 0.0
//分布取每個關聯用戶
val relation = distinctMidList.apply(ii)
//關聯用戶的會員ID
val mid = relation.split(SEPARATOR4).apply(0)
//關聯用戶的性別
val relationSex = relation.split(SEPARATOR4).apply(1)
val featureStr = new StringBuilder()
//循環每個關聯維度去給關聯用戶打分
for(jj <- 1 to FeatureNum.toInt){
var featureValue = 0.0
//獲取該關聯用戶在每個維度下重復次數
val resultMap = midMap.get(jj).get.get(relation).getOrElse(0)
if(jj==1){
//加密隱私,確定權重為10
featureValue=resultMap*10
}else if(jj==2 || jj==3){
標準化清洗處理,用戶關聯用json串拼接
3820150000934593 | 1 | [{"f1":"0","f2":"0","f3":"0","f4":"15","f5":"60","s":"1","r":"75"
,"m":"3820150000316460"},{"f1":"0","f2":"0","f3":"0","f4":"30","f
5":"30","s":"1","r":"60","m":"1816945313571344"},{"f1":"0","f2":"
0","f3":"0","f4":"45","f5":"90","s":"0","r":"135","m":"3820150000655195"}]
得到上面清洗結果,我們才能更好的作為模型的源數據輸出,感覺是不是很費神,所以才印證了這句話——做Data Mining,其實大部分時間都花在清洗數據
3.5 附加分:使用MapReduce來清洗特殊的數據
針對上述的數據清洗,同樣可以MapReduce來單獨處理。只是開發效率和執行效率有所影響。
當然也不排除適用于MapReduce處理的復雜數據場景。
對于在本地Windows環境寫MapRecue代碼,可以借鑒上述文章中部署的數據挖掘環境,修改下Maven工程的pom.xml文件就可以了。
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.0</version>
</dependency>
而我在以往做大數據挖掘的過程里,也有不少場景需要借助MR來處理,比如很早的一篇文章 《一種新思想去解決大矩陣相乘》 ,甚至是大家比較常見的 數據傾斜 ——特別是處理平臺行為日志數據,特別容易遇到數據傾斜。
這里提供一個上述Spark清洗數據的MR代碼邏輯,大家可以對比看看與Spark代碼邏輯的差異性。
Map階段
public static class dealMap
extends Mapper<Object,Text, Text,Text>{
@Override
protected void setup(Context context)
throws IOException,InterruptedException{
/**
* 初始化Map階段的全局變量,目前使用不上
*/
}
public void map(Object key,Text value,Context context)
throws IOException,InterruptedException{
//類似Spark,每一行讀取文件,按分隔符劃分
String[] records = value.toString().split("\u0009");
StringBuffer k = new StringBuffer();
//這里Key包含Mid和Sex
String keys = k.append(records[0]).append("\u0009")
.append(records[1]).toString();
//接下來對剩余維度數據進行循環
for(int i=2;i<records.length;i++){
//解決兩個問題,和Spark類似
//確定與該用戶關聯的用戶列表
//確定關聯用戶在每一個維度的累計頻數
}
for(int j=2;j<records.length;j++){
//循環計算用戶關聯得分,和Spark類似
}
/**
* 設置用戶Mid和sex作為Map階段傳輸的Key,用戶關聯維度用戶集作為value傳輸到reduce階段
*/
context.write(new Text(keys.toString()), new Text(value.toString()));
}
}
Reduce階段(這里用不上)
public static class dealReduce
extends Reducer<Text,Text,Text,Text> {
public void reduce(Text key, Iterable<Text> values,Context context)
throws IOException, InterruptedException{
/**
* 一般都會用Reduce階段,但是這里用不上
*/
for (Text val : values) {
}
}
}
Drive階段
public static Boolean run(String input,String ouput)
throws IOException, ClassNotFoundException, InterruptedException{
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "");
job.setJarByClass();
job.setMapperClass();
job.setReducerClass();
job.setNumReduceTasks(10);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
Path output = new Path(ouput);
FileInputFormat.setInputPaths(job,input);
FileOutputFormat.setOutputPath(job, output);
output.getFileSystem(conf).delete(output,true);
Boolean result=job.waitForCompletion(true);
return result;
}
上面這三個階段就是MR任務常規的流程,處理上述問題的思路其實和Spark邏輯差不多。只是這套框架性代碼量太多,有很多重復性,每寫一個MR任務的工作量也會比較大,執行效率我并沒有去測試作比較。
如果Spark跑線上任務模型會出現不穩定的話,我想以后我還是會遷移到MapReduce上去跑離線模型。
總結
說到這里,整篇文章概括起來有三點:
- 講述了數據清洗在業務場景建模過程中的重要性和流程操作。
- 介紹了兩款主流計算框架的適用場景和差異性。
- 更列舉了不同數據處理工具在每個業務場景下的優勢和不同。
但是,還是那么一句話——使用什么技術不在乎,我更迷戀業務場景驅動下的技術挑戰。
與你溝通最關鍵的,也許會是直屬領導,也許會是業務運營人員,甚至是完全不懂技術的客戶。他們最關心的是你在業務層面上的技術方案能否解決業務痛點問題。
所以,做大數據挖掘要多關心業務,別一味只談技術。
作者介紹
汪榕 ,3年場景建模經驗,曾累計獲得8次數學建模一等獎,包括全國大學生國家一等獎,在國內期刊發表過相關學術研究。兩年電商數據挖掘實踐,負責開發精準營銷產品中的用戶標簽體系。發表過數據挖掘相關的多篇文章。目前在互聯網金融行業從事數據挖掘工作,參與開發反欺詐實時監控系統。微博:樂平汪二。
感謝杜小芳對本文的審校。
來自:http://www.infoq.com/cn/articles/more-time-of-big-data-mining-is-used-to-clean-the-data