GraphLab:新的面向機器學習的并行框架

jopen 12年前發布 | 34K 次閱讀 GraphLab 機器學習

1.1 GraphLab簡介

在海量數據盛行的今天,大規模并行計算已經隨處可見,尤其是MapReduce框架的出現,促進了并行計算在互聯網海量數據處理中的廣泛應用。而針對海量數據的機器學習對并行計算的性能、開發復雜度等提出了新的挑戰。

機器學習的算法具有下面兩個特點:數據依賴性強,運算過程各個機器之間要進行頻繁的數據交換;流處理復雜,整個處理過程需要多次迭代,數據的處理條件分支多。

而MapReduce是典型的SIMD模型,Map階段集群的各臺機器各自完成負載較重的計算過程,數據并行度高,適合完成類似矩陣運算、數據統計等數據獨立性強的計算,而對于機器學習類算法并行性能不高。

另一個并行實現方案就是采用純MPI(Native MPI)的方式。純MPI實現通過精細的設計將并行任務按照MPI協議分配到集群機器上,并根據具體應用,在計算過程中進行機器間的數據通信和同步。純 MPI的優點是,可以針對具體的應用,進行深度優化,從而達到很高的并行性能。但純MPI存在的問題是,針對不同的機器學習算法,需要重寫其數據分配、通 信等實現細節,代碼重用率低,機器拓展性能差,對編程開發人員的要求高,而且優化和調試成本高。因而,純MPI不適合敏捷的互聯網應用。

為解決機器學習的流處理,Google提出了Pregel框架,Pregel是嚴格的BSP模型,采用“計算-通信-同步”的模式完成機器學習的數據同步 和算法迭代。Goolge曾稱其80%的程序使用MapReduce完成,20%的程序使用Pregel實現。因而,Pregel是很成熟的機器學習流處 理框架,但Google一直沒有將Pregel的具體實現開源,外界對Pregel的模仿實現在性能和穩定性方面都未能達到工業級應用的標準。

2010年,CMU的Select實驗室提出了GraphLab框架,GraphLab是面向機器學習的流處理并行框架[1]。同年, GraphLab基于最初的并行概念實現了1.0版本,在機器學習的流處理并行性能方面得到很大的提升,并引起業界的廣泛關注,在2012年 GraphLab升級到2.1版本,進一步優化了其并行模型,尤其對自然圖的并行性能得到顯著改進。

在本章的余下章節,將詳細介紹GraphLab的并行框架和具體的源碼實現。

1.2 GraphLab并行框架

GraphLab將數據抽象成Graph結構,將算法的執行過程抽象成Gather、Apply、Scatter三個步驟。其并行的核心思想是對頂點的切分,以下面的例子作為一個說明。

GraphLab:新的面向機器學習的并行框架

圖1. Graph對并行思想

示例中,需要完成對V0鄰接頂點的求和計算,串行實現中,V0對其所有的鄰接點進行遍歷,累加求和。而GraphLab中,將頂點V0進行切分,將V0的 邊關系以及對應的鄰接點部署在兩臺處理器上,各臺機器上并行進行部分求和運算,然后通過master頂點和mirror頂點的通信完成最終的計算。

1.2.1 數據模型:Graph

頂點是其最小并行粒度和通信粒度,邊是機器學習算法中數據依賴性的表現方式。

對于某個頂點,其被部署到多臺機器,一臺機器作為master頂點,其余機器上作為mirror。Master作為所有mirror的管理者,負責給mirror安排具體計算任務;mirror作為該頂點在各臺機器上的代理執行者,與master數據的保持同步。

對于某條邊,GraphLab將其唯一部署在某一臺機器上,而對邊關聯的頂點進行多份存儲,解了邊數據量大的問題。

同一臺機器上的所有edge和vertex構成local graph,在每臺機器上,存在本地id到全局id的映射表。vertex是一個進程上所有線程共享的,在并行計算過程中,各個線程分攤進程中所有頂點的 gather->apply->scatter操作。

下面這個例子說明,GraphLab是怎么構建Graph的。

GraphLab:新的面向機器學習的并行框架

圖2 Graph的構建形式

1.2.2 執行模型:Gather-Apply-Scatter

每個頂點每一輪迭代經過gather->apple->scatter三個階段。

1)       Gather階段

工作頂點的邊 (可能是所有邊,也有可能是入邊或者出邊)從領接頂點和自身收集數據,記為gather_data_i,各個邊的數據graphlab會求和,記為sum_data。這一階段對工作頂點、邊都是只讀的。

2)       Apply階段

Mirror將gather計算的結果sum_data發送給master頂點,master進行匯總為total。Master利用total和上一步 的頂點數據,按照業務需求進行進一步的計算,然后更新master的頂點數據,并同步mirror。Apply階段中,工作頂點可修改,邊不可修改。

3)       Scatter階段

工作頂點更新完成之后,更新邊上的數據,并通知對其有依賴的鄰結頂點更新狀態。這scatter過程中,工作頂點只讀,邊上數據可寫。

在執行模型中,graphlab通過控制三個階段的讀寫權限來達到互斥的目的。在gather階段只讀,apply對頂點只寫,scatter對邊只寫。 并行計算的同步通過master和mirror來實現,mirror相當于每個頂點對外的一個接口人,將復雜的數據通信抽象成頂點的行為。

下面這個例子說明GraphLab的執行模型:

GraphLab:新的面向機器學習的并行框架

                                                                           圖3. Gather-Apply-Scatter

1.3 GraphLab的源碼實現

Graphlab的實現可以分為四層:基礎組件層,抽象層,引擎層,應用層。

 GraphLab:新的面向機器學習的并行框架

圖4. GraphLab源碼結構

1.3.1 基礎組件層

提供Graphlab數據傳輸、多線程管理等基礎并行結構的組件模塊,下面將主要介紹其通信、數據序列化、數據交換、多線程管理四個功能模塊。

1)       通信(dc_tcp_comm.cpp)

Graphlab基于TCP協議的長連接在機器之間進行數據通信。在Graphlab初始化階段,所有機器建立連接,將socket數據存儲在std::vector sock 結構中。

Graphlab使用單獨的線程來接收和發送數據,其中接收或發送都可以配置多個線程,默認每個線程中負責與64臺機器進行通信。在接收連接中,tcp_comm基于libevent采用epoll的方式獲取連接到達的通知,效率高。將這部分抽象成以下偽代碼:

listen();

for(size_t i = 0;i < nprocs; ++i)

connect(i);

while{

wait_for_connect();

}

in_thread_num=machine_num / proc_per_thread;

  • out_thread_num= machine_num / proc_per_thread;

 

for(每一個線程)

{

event_add();

}

for(每一個線程)

{

event_add();

}

 

for(每一個線程)

{

In_thread.launch(receive_loop);

}

for(每一個線程)

{

In_thread.launch(send_loop)

}

 

需要補充的是,Graphlab在數據通信中,并沒有采用MPI的接口,但在源碼中封裝了MPI_tools,其用途是在 distributed_control::init時,獲取系統參數(包括機器IP和端口)提供兩種方式,一種是系統配置中初始化,一種是通過MPI接 口實現(dc_init_from_mpi::init_param_from_mpi)。

2)       數據序列化(oarchive & iarchive)

Oarchive通過重載操作符>>將對象序列化后寫入ostream中,在Graphlab中對于POD( Plain Old Data)和非POD數據區分對待, POD類型的數據直接轉為為char*寫入ostream, 而非POD數據需要用戶實現save方法,否則將拋出異常。iarchive的過程與oarchive的過程相反。

所有通過rpc傳輸的數據都通過oarchive和iarchive轉化為stream,比如vertex_program, vertex_data。

 

GraphLab:新的面向機器學習的并行框架

圖5. 數據序列化

3)       數據傳輸流(buffered_stream_send2.cpp)

Oarchive,iarchive是數據序列化的工具, 在實際的傳輸過程中,數據并沒有立即發送出去,而是緩存在buffered_stream_send。

4)       Pthread_tools:

Thread類封裝了lpthread的方法

提供thread_group管理線程隊列

封裝了鎖、信號量、條件變量等同步方法。

1.3.2 抽象層

1)      dc_dist_object是GraphLab對所有分布式對象的一個抽象,其目標是將分布式處理的數據對象對用戶抽象成普通對象,以希望在使用的時候不需要關心其分布式細節。

2)      buffer_exchange是基于dc_dist_object對需要在頂點間交換的數據提供一個容器。

3)      distribute_controller是基于dc_dist_object實現的一個整個分布式系統的控制器,提供了機器數據、頂點關系等全局信息。

1.3.3引擎層

1.3.3.1同步引擎

GraphLab:新的面向機器學習的并行框架

                         圖6. 同步引擎

1) Excange message階段,master接受來?自mirror的消息;

2) Receive Message階段,master接收上一輪Scatter發送的消息和mirror發送的消息,將有message的master激活, 對于激活的頂點,master通知mirror激活,并將vectex_program同步到mirrors;

3) Gather階段,多線程并行gather, 誰先完成,多線程并行localgraph中的頂點,mirror將gather的結果到master;

4) Apply階段,master執行apply(apply()),并將apply的結果同步到mirror (sync_vertex_data()).

5)Scatter階段,master和mirror基于新的頂點數據,更新邊上數據,并以signal的形式通知相鄰頂點。

下面這個例子形象地說明了同步引擎的工作過程:

GraphLab:新的面向機器學習的并行框架

 

圖7. 頂點2的GraphLab執行過程

1.3.3.2異步引擎

GraphLab:新的面向機器學習的并行框架

圖8. master和mirror狀態轉移過程

異步引擎中,每個頂點是消息驅動的狀態機。

1) 在每一輪執行開始時,Master從全局的調度器(Sceduler)獲取消息,獲取消息后,master獲得鎖,并進入Locking狀態。同時,master通知mirror獲取鎖,進入Locking狀態。

2) master和mirror分別進行Gathering操作,mirror將gathering結果匯報給master,由master完成匯總。

3) master完成applying之后,將結果同步到mirror上。

4) master和mirror獨立的執行scattering,執行完成之后釋放鎖進入None狀態,等待新的任務到來。

5) mirror在scattering狀態時,可能再次接收到來自master的locking請求,這種情況下,mirror在完成scattering之后將不會釋放鎖,而直接進入下一輪任務中。

來自:http://blog.csdn.net/cs870101/article/details/8072458

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!