Graphx:構建graph和聚合消息

jopen 9年前發布 | 42K 次閱讀 GraphX

<p>@玄暢
2014.12.29 </p>

About

最近在優化kcore算法時,對Graphx代碼看了幾遍。1.2后Graphx性能有所提升,代碼不太容易理解,現在用圖表示出來會更直觀。

對數學上的圖有點印象的是x軸、y軸坐標圖,坐標中每個點用橫坐標x和縱坐標y表示,即: (x1, y1), (x2, y2), 一個坐標點可以確定一個點的唯一位置

Graphx與上面的概念類似。不同的是, Graphx中的點概念更泛化,不一定用x y坐標表示,有唯一標示的對象即可,如:ID

1. 構建Graph

graphx的Graph對象是用戶操作圖的入口, 它包含了邊(edge)和頂點(vertices)兩部分. 邊由點組成,所以, 所有的邊中包含的點的就是點的全集。但是這個全集包含重復的點, 去重復后就是VertexRDD

點和邊都包含一個attr屬性,可以攜帶額外信息

1.1 構建一個圖的方法

  1. 根據邊構建圖(Graph.fromEdges)

    def fromEdges[VD: ClassTag, ED: ClassTag](
          edges: RDD[Edge[ED]],
          defaultValue: VD,
          edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
          vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]
  2. 根據邊的兩個點元數據構建(Graph.fromEdgeTuples)

    def fromEdgeTuples[VD: ClassTag](
      rawEdges: RDD[(VertexId, VertexId)],
      defaultValue: VD,
      uniqueEdges: Option[PartitionStrategy] = None,
      edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
      vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, Int] =

1.2 第一步:構建邊EdgeRDD

(點擊查看大圖)
 Graphx:構建graph和聚合消息

1.2.1 加載邊的文本信息

從持久化系統(HDFS)中加載邊的文本信息,按行處理生成tuple, 即(srcId, dstId)

api:

val rawEdgesRdd: RDD[(Long, Long)] = sc.textFile(input, partitionNum).filter(s => s != "0,0").repartition(partitionNum).map {
      case line =>
        val ss = line.split(",")
        val src = ss(0).toLong
        val dst = ss(1).toLong
        if (src < dst)
          (src, dst)
        else
          (dst, src)
    }.distinct()

數據形如:

107,109
108,109
110,111
110,112
111,112
113,114
115,116
117,79
117,118
79,118

1.2.2 第二步:初步生成Graph

  1. 入口:Graph.fromEdgeTuples(rawEdgesRdd)
    元數據為,分割的兩個點ID,把元數據映射成Edge(srcId, dstId, attr)對象, attr默認為null。這樣元數據就構建成了RDD[Edge[ED]]

  2. RDD[Edge[ED]]要進一步轉化成EdgeRDDImpl[ED, VD]
    首先遍歷RDD[Edge[ED]]的分區partitions,對分區內的邊重排序new Sorter(Edge.edgeArraySortDataFormat[ED]).sort(edgeArray, 0, edgeArray.length, Edge.lexicographicOrdering)即:按照srcId從小到大排序。

    問:為何要重排序?
    答:為了遍歷時順序訪問。采用數組而不是Map,數組是連續的內存單元,具有原子性,避免了Map的hash問題,訪問速度快

  3. 填充localSrcIds,localDstIds, data, index, global2local, local2global, vertexAttrs

    數組localSrcIds,localDstIds中保存的是經過global2local.changeValue(srcId/dstId)轉變的本地索引,即:localSrcIds、localDstIds數組下標對應于分區元素,數組中保存的索引位可以定位到local2global中查到具體的VertexId

    global2local是spark私有的Map數據結構GraphXPrimitiveKeyOpenHashMap, 保存vertextId和本地索引的映射關系。global2local中包含當前partition所有srcId、dstId與本地索引的映射關系。

    data就是當前分區的attr屬性數組

    index索引最有意思,按照srcId重排序的邊數據, 會看到相同的srcId對應了不同的dstId, 見圖中index desc部分。index中記錄的是相同srcId中第一個出現的srcId與其下標

    local2global記錄的是所有的VertexId信息的數組。形如:srcId,dstId,srcId,dstId,srcId,dstId,srcId,dstId。其中會包含相同的ID。即:當前分區所有vertextId的順序實際值

    # 用途:
    # 根據本地下標取VertexId
    localSrcIds/localDstIds -> index -> local2global -> VertexId
    
    # 根據VertexId取本地下標,取屬性
    VertexId -> global2local -> index -> data -> attr object

    spark的數據最終是在patition中表達,所以各種transform都在這里進行,這里的數據結構性能至關重要

1.3 第二步:構建頂點(VertexRDD)

(點擊查看大圖)
 Graphx:構建graph和聚合消息

入口:GraphImpl365行。val vertices = VertexRDD.fromEdges(edgesCached, edgesCached.partitions.size, defaultVertexAttr).withTargetStorageLevel(vertexStorageLevel)

根據邊EdgeRDD[ED, VD]構建出點VertexRDD, 點是孤島,不像邊一樣保存點與點之間的關系。點只保存屬性attr。所以需要對拿到的點做分區。

為了能通過點找到邊,每個點需要保存點所在到邊信息即分區Id(pid),這些新保存在路由表RoutingTablePartition中。

構建的過程:

  1. 創建路由表

    根據EdgeRDD,map其分區,對edge partition中的數據轉換成RoutingTableMessage數據結構。

    特別激動的是: 為節省內存,把edgePartitionId和一個標志位通過一個32位的int表示。

    如圖所示,RoutingTableMessage是自定義的類型類, 一個包含vid和int的tuple(VertexId, Int)。 int的32~31位表示一個標志位,01: isSrcId 10: isDstId。30~0位表示邊分區ID。贊這種做法,目測作者是山西人。

    RoutingTableMessage想表達這樣的信息:一個頂點ID,不管未來你到天涯海角,請帶著你女朋友Edge的地址: edge分區ID。并且帶著一個標志你在女友心中的位置是:01是左邊isSrcId,10是右邊isDstId。這樣就算你流浪到非洲,也能找到小女友約會...blabla...

  2. 根據路由表生成分區對象vertexPartitions

    1. 上(1)中生成的消息路由表信息,重新分區,分區數目根edge分區數保持一致。
    2. 在新分區中,map分區中的每條數據,從RoutingTableMessage解出數據:vid, edge pid, isSrcId/isDstId。這個三個數據項重新封裝到三個數據結構中:pid2vid,srcFlags,dstFlags

    3. 有意思的地方來了,想一下,shuffle以后新分區中的點來自于之前edge不同分區,那么一個點要找到邊,就需要先確定邊的分區號pid, 然后在確定的edge分區中確定是srcId還是dstId, 這樣就找到了邊。

      val pid2vid = Array.fill(numEdgePartitions)(new PrimitiveVector[VertexId])
      val srcFlags = Array.fill(numEdgePartitions)(new PrimitiveVector[Boolean])
      val dstFlags = Array.fill(numEdgePartitions)(new PrimitiveVector[Boolean])

      上面表達的是:當前vertex分區中點在edge分區中的分布。新分區中保存這樣的記錄(vids.trim().array, toBitSet(srcFlags(pid)), toBitSet(dstFlags(pid)))vid, srcFlag, dstFlag, flag通過BitSet存儲,很省。

      如此就生成了vertex的路由表routingTables

    4. 生成ShippableVertexPartition

      根據上面routingTables, 重新封裝路由表里的數據結構為:ShippableVertexPartition

      ShippableVertexPartition會合并相同重復點的屬性attr對象,補全缺失的attr對象。

      關鍵是:根據vertexId生成map:GraphXPrimitiveKeyOpenHashMap,這個map跟邊中的global2local是不是很相似?這個map根據long vertxId生成下標索引,目測:相同的點會有相同的下標。// todo..

  3. 創建VertexRDDImpl對象
    new VertexRDDImpl(vertexPartitions),這就完事了

1.4 第三步 生成Graph對象[finished]

(點擊查看大圖)
 Graphx:構建graph和聚合消息

把上述edgeRDD和vertexRDD拿過來組成Graph

new GraphImpl(vertices, new ReplicatedVertexView(edges.asInstanceOf[EdgeRDDImpl[ED, VD]]))

2. 常用函數分析

下面分析一下常用的graph函數aggregateMessages

2.1 aggregateMessages

aggregateMessages是Graphx最重要的API,1.2版本添加的新函數,用于替換mapReduceTriplets。目前mapReduceTriplets最終也是使用兼容的aggregateMessages

據說改用aggregateMessages后,性能提升30%。

它主要功能是向鄰邊發消息,合并鄰邊收到的消息,返回messageRDD

aggregateMessages的接口如下:

def aggregateMessages[A: ClassTag](
      sendMsg: EdgeContext[VD, ED, A] => Unit,
      mergeMsg: (A, A) => A,
      tripletFields: TripletFields = TripletFields.All)
    : VertexRDD[A] = {
    aggregateMessagesWithActiveSet(sendMsg, mergeMsg, tripletFields, None)
  }
  • sendMsg: 發消息函數

    private def sendMsg(ctx: EdgeContext[KCoreVertex, Int, Map[Int, Int]]): Unit = {
        ctx.sendToDst(Map(ctx.srcAttr.preKCore -> -1, ctx.srcAttr.curKCore -> 1))
        ctx.sendToSrc(Map(ctx.dstAttr.preKCore -> -1, ctx.dstAttr.curKCore -> 1))
    }
  • mergeMsg:合并消息函數。用于Map階段,每個edge分區中每個點收到的消息合并,以及reduce階段,合并不同分區的消息。合并vertexId相同的消息
  • tripletFields:定義發消息的方向

2.1.1 aggregateMessages Map階段

(點擊查看大圖)
 Graphx:構建graph和聚合消息

從入口函數進入aggregateMessagesWithActiveSet,首先使用VertexRDD[VD]更新replicatedVertexView, 只更新其中vertexRDD中attr對象。

問:為啥更新replicatedVertexView? 答:replicatedVertexView就是個點和邊的視圖,點的屬性有變化,要更新邊中包含的點的attr

replicatedVertexView這里對edgeRDD做mapPartitions操作,所有的操作都在每個邊分區的迭代中完成。

  1. 進入aggregateMessagesEdgeScan

    前文中提到edge partition中包含的五個重要數據結構之一:localSrcIds, 頂點vertixId在當前分區中的索引.

    1. 遍歷localSrcIds, 根據其下標去localSrcIds中拿到srcId在全局local2global中的索引位,然后拿到srcId; 同理,根據下標,去localDstIds中取到local2global中的索引位, 取出dstId

    有了srcId和dstId,你就可以blabla....

    問: 為啥用localSrcIds的下標
    答: 用localDstIds的也可以。一條邊必然包含兩個點:srcId, dstId

    1. 發消息

      看上圖:

      • 根據接口中定義的tripletFields,拿到發消息的方向: 1) 向dstId發;2) 向srcId發;3) 向兩邊發;4) 向其中一邊發
      • 發消息的過程就是遍歷到一條邊,向以srcId/dstId在本分區內的本地IDlocalId為下標的數組中添加數據,如果localId為下標數組中已經存在數據,則執行合并函數mergeMsg
      • 每個點之間在發消息的時候是獨立的,即:點單純根據方向,向以相鄰點的localId為下標的數組中插數據,互相獨立,在并行上互不影響。
        完事,返回消息RDDmessages: RDD[(VertexId, VD2)]

2.1.2 aggregateMessages Reduce階段

(點擊查看大圖)  Graphx:構建graph和聚合消息

因為對邊上,對點發消息,所以在reduce階段主要是VertexRDD的菜。

入口(Graphmpl 260行):vertices.aggregateUsingIndex(preAgg, mergeMsg)

收到messages: RDD[(VertexId, VD2)]消息RDD,開始:

  1. 對messages做shuffled分區,分區器使用VertexRDD的partitioner。

    因為VertexRDD的partitioner根據點VertexID做分區,所以vertexId->消息分區后的pid根VertextRDD完全相同,這樣用zipPartitions高效的合并兩個分區的數據

  2. 根據對等合并attr, 聚合函數使用API傳入的mergeMsg函數

    • 小技巧:遍歷節點時,遍歷messagePartition。并不是每個節點都會收到消息,所以messagePartition集合最小,所以速度會快。遍歷小集合取大集合的數據。
    • 前文提到根據routingTables路由表生成VertexRDD的vertexPartitions時, vertexPartitions中重新封裝了ShippableVertexPartition對象,其定義為:
    ShippableVertexPartition[VD: ClassTag](
    val index: VertexIdToIndexMap,
    val values: Array[VD],
    val mask: BitSet,
    val routingTable: RoutingTablePartition)

    最后生成對象:new ShippableVertexPartition(map.keySet, map._values, map.keySet.getBitSet, routingTable)

    所以這里用到的index就是map.keySet, map就是映射vertexId->attr

    index: map.keySet, hashSet, vertexId->下標

    values: map._valuers, Array[Int], 根據下標保存attr。

    so so,根據vetexId從index中取到其下標,再根據下標,從values中取到attr,存在attr就用API傳入的函數mergeMsg合并屬性attr; 不存在就直接賦值。

最后得到的是收到消息的VertexRDD

到這里,整個map/reduce過程就完成了。


翠花,上全圖

(點擊查看大圖)  Graphx:構建graph和聚合消息

如果沒有繞暈,從頭再讀一遍

------待續--------

來自:https://github.com/shijinkui/spark_study/blob/master/spark_graphx_analyze.markdown

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!