GraphLab:新的面向機器學習的并行框架
1.1 GraphLab簡介
在海量數據盛行的今天,大規模并行計算已經隨處可見,尤其是MapReduce框架的出現,促進了并行計算在互聯網海量數據處理中的廣泛應用。而針對海量數據的機器學習對并行計算的性能、開發復雜度等提出了新的挑戰。
機器學習的算法具有下面兩個特點:數據依賴性強,運算過程各個機器之間要進行頻繁的數據交換;流處理復雜,整個處理過程需要多次迭代,數據的處理條件分支多。
而MapReduce是典型的SIMD模型,Map階段集群的各臺機器各自完成負載較重的計算過程,數據并行度高,適合完成類似矩陣運算、數據統計等數據獨立性強的計算,而對于機器學習類算法并行性能不高。
另一個并行實現方案就是采用純MPI(Native MPI)的方式。純MPI實現通過精細的設計將并行任務按照MPI協議分配到集群機器上,并根據具體應用,在計算過程中進行機器間的數據通信和同步。純 MPI的優點是,可以針對具體的應用,進行深度優化,從而達到很高的并行性能。但純MPI存在的問題是,針對不同的機器學習算法,需要重寫其數據分配、通 信等實現細節,代碼重用率低,機器拓展性能差,對編程開發人員的要求高,而且優化和調試成本高。因而,純MPI不適合敏捷的互聯網應用。
為解決機器學習的流處理,Google提出了Pregel框架,Pregel是嚴格的BSP模型,采用“計算-通信-同步”的模式完成機器學習的數據同步 和算法迭代。Goolge曾稱其80%的程序使用MapReduce完成,20%的程序使用Pregel實現。因而,Pregel是很成熟的機器學習流處 理框架,但Google一直沒有將Pregel的具體實現開源,外界對Pregel的模仿實現在性能和穩定性方面都未能達到工業級應用的標準。
2010年,CMU的Select實驗室提出了GraphLab框架,GraphLab是面向機器學習的流處理并行框架[1]。同年, GraphLab基于最初的并行概念實現了1.0版本,在機器學習的流處理并行性能方面得到很大的提升,并引起業界的廣泛關注,在2012年 GraphLab升級到2.1版本,進一步優化了其并行模型,尤其對自然圖的并行性能得到顯著改進。
在本章的余下章節,將詳細介紹GraphLab的并行框架和具體的源碼實現。
GraphLab將數據抽象成Graph結構,將算法的執行過程抽象成Gather、Apply、Scatter三個步驟。其并行的核心思想是對頂點的切分,以下面的例子作為一個說明。
圖1. Graph對并行思想
示例中,需要完成對V0鄰接頂點的求和計算,串行實現中,V0對其所有的鄰接點進行遍歷,累加求和。而GraphLab中,將頂點V0進行切分,將V0的 邊關系以及對應的鄰接點部署在兩臺處理器上,各臺機器上并行進行部分求和運算,然后通過master頂點和mirror頂點的通信完成最終的計算。
頂點是其最小并行粒度和通信粒度,邊是機器學習算法中數據依賴性的表現方式。
對于某個頂點,其被部署到多臺機器,一臺機器作為master頂點,其余機器上作為mirror。Master作為所有mirror的管理者,負責給mirror安排具體計算任務;mirror作為該頂點在各臺機器上的代理執行者,與master數據的保持同步。
對于某條邊,GraphLab將其唯一部署在某一臺機器上,而對邊關聯的頂點進行多份存儲,解了邊數據量大的問題。
同一臺機器上的所有edge和vertex構成local graph,在每臺機器上,存在本地id到全局id的映射表。vertex是一個進程上所有線程共享的,在并行計算過程中,各個線程分攤進程中所有頂點的 gather->apply->scatter操作。
下面這個例子說明,GraphLab是怎么構建Graph的。
圖2 Graph的構建形式
每個頂點每一輪迭代經過gather->apple->scatter三個階段。
1) Gather階段
工作頂點的邊 (可能是所有邊,也有可能是入邊或者出邊)從領接頂點和自身收集數據,記為gather_data_i,各個邊的數據graphlab會求和,記為sum_data。這一階段對工作頂點、邊都是只讀的。
2) Apply階段
Mirror將gather計算的結果sum_data發送給master頂點,master進行匯總為total。Master利用total和上一步 的頂點數據,按照業務需求進行進一步的計算,然后更新master的頂點數據,并同步mirror。Apply階段中,工作頂點可修改,邊不可修改。
3) Scatter階段
工作頂點更新完成之后,更新邊上的數據,并通知對其有依賴的鄰結頂點更新狀態。這scatter過程中,工作頂點只讀,邊上數據可寫。
在執行模型中,graphlab通過控制三個階段的讀寫權限來達到互斥的目的。在gather階段只讀,apply對頂點只寫,scatter對邊只寫。 并行計算的同步通過master和mirror來實現,mirror相當于每個頂點對外的一個接口人,將復雜的數據通信抽象成頂點的行為。
下面這個例子說明GraphLab的執行模型:
圖3. Gather-Apply-Scatter
Graphlab的實現可以分為四層:基礎組件層,抽象層,引擎層,應用層。
圖4. GraphLab源碼結構
提供Graphlab數據傳輸、多線程管理等基礎并行結構的組件模塊,下面將主要介紹其通信、數據序列化、數據交換、多線程管理四個功能模塊。
1) 通信(dc_tcp_comm.cpp)
Graphlab基于TCP協議的長連接在機器之間進行數據通信。在Graphlab初始化階段,所有機器建立連接,將socket數據存儲在std::vector
Graphlab使用單獨的線程來接收和發送數據,其中接收或發送都可以配置多個線程,默認每個線程中負責與64臺機器進行通信。在接收連接中,tcp_comm基于libevent采用epoll的方式獲取連接到達的通知,效率高。將這部分抽象成以下偽代碼:
listen();
for(size_t i = 0;i < nprocs; ++i)
connect(i);
while{
wait_for_connect();
}
in_thread_num=machine_num / proc_per_thread;
- out_thread_num= machine_num / proc_per_thread;
for(每一個線程)
{
event_add();
}
for(每一個線程)
{
event_add();
}
for(每一個線程)
{
In_thread.launch(receive_loop);
}
for(每一個線程)
{
In_thread.launch(send_loop)
}
需要補充的是,Graphlab在數據通信中,并沒有采用MPI的接口,但在源碼中封裝了MPI_tools,其用途是在 distributed_control::init時,獲取系統參數(包括機器IP和端口)提供兩種方式,一種是系統配置中初始化,一種是通過MPI接 口實現(dc_init_from_mpi::init_param_from_mpi)。
2) 數據序列化(oarchive & iarchive)
Oarchive通過重載操作符>>將對象序列化后寫入ostream中,在Graphlab中對于POD( Plain Old Data)和非POD數據區分對待, POD類型的數據直接轉為為char*寫入ostream, 而非POD數據需要用戶實現save方法,否則將拋出異常。iarchive的過程與oarchive的過程相反。
所有通過rpc傳輸的數據都通過oarchive和iarchive轉化為stream,比如vertex_program, vertex_data。
圖5. 數據序列化
3) 數據傳輸流(buffered_stream_send2.cpp)
Oarchive,iarchive是數據序列化的工具, 在實際的傳輸過程中,數據并沒有立即發送出去,而是緩存在buffered_stream_send。
4) Pthread_tools:
Thread類封裝了lpthread的方法
提供thread_group管理線程隊列
封裝了鎖、信號量、條件變量等同步方法。
1) dc_dist_object是GraphLab對所有分布式對象的一個抽象,其目標是將分布式處理的數據對象對用戶抽象成普通對象,以希望在使用的時候不需要關心其分布式細節。
2) buffer_exchange是基于dc_dist_object對需要在頂點間交換的數據提供一個容器。
3) distribute_controller是基于dc_dist_object實現的一個整個分布式系統的控制器,提供了機器數據、頂點關系等全局信息。
圖6. 同步引擎
1) Excange message階段,master接受來?自mirror的消息;
2) Receive Message階段,master接收上一輪Scatter發送的消息和mirror發送的消息,將有message的master激活, 對于激活的頂點,master通知mirror激活,并將vectex_program同步到mirrors;
3) Gather階段,多線程并行gather, 誰先完成,多線程并行localgraph中的頂點,mirror將gather的結果到master;
4) Apply階段,master執行apply(apply()),并將apply的結果同步到mirror (sync_vertex_data()).
5)Scatter階段,master和mirror基于新的頂點數據,更新邊上數據,并以signal的形式通知相鄰頂點。
下面這個例子形象地說明了同步引擎的工作過程:
圖7. 頂點2的GraphLab執行過程
圖8. master和mirror狀態轉移過程
異步引擎中,每個頂點是消息驅動的狀態機。
1) 在每一輪執行開始時,Master從全局的調度器(Sceduler)獲取消息,獲取消息后,master獲得鎖,并進入Locking狀態。同時,master通知mirror獲取鎖,進入Locking狀態。
2) master和mirror分別進行Gathering操作,mirror將gathering結果匯報給master,由master完成匯總。
3) master完成applying之后,將結果同步到mirror上。
4) master和mirror獨立的執行scattering,執行完成之后釋放鎖進入None狀態,等待新的任務到來。
5) mirror在scattering狀態時,可能再次接收到來自master的locking請求,這種情況下,mirror在完成scattering之后將不會釋放鎖,而直接進入下一輪任務中。
來自:http://blog.csdn.net/cs870101/article/details/8072458