Graphx:構建graph和聚合消息
<p>@玄暢
2014.12.29 </p>
About
最近在優化kcore算法時,對Graphx代碼看了幾遍。1.2后Graphx性能有所提升,代碼不太容易理解,現在用圖表示出來會更直觀。
對數學上的圖有點印象的是x軸、y軸坐標圖,坐標中每個點用橫坐標x和縱坐標y表示,即: (x1, y1), (x2, y2), 一個坐標點可以確定一個點的唯一位置
Graphx與上面的概念類似。不同的是, Graphx中的點概念更泛化,不一定用x y坐標表示,有唯一標示的對象即可,如:ID
1. 構建Graph
graphx的Graph對象是用戶操作圖的入口, 它包含了邊(edge)和頂點(vertices)兩部分. 邊由點組成,所以, 所有的邊中包含的點的就是點的全集。但是這個全集包含重復的點, 去重復后就是VertexRDD
點和邊都包含一個attr屬性,可以攜帶額外信息
1.1 構建一個圖的方法
-
根據邊構建圖(Graph.fromEdges)
def fromEdges[VD: ClassTag, ED: ClassTag]( edges: RDD[Edge[ED]], defaultValue: VD, edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY, vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]
-
根據邊的兩個點元數據構建(Graph.fromEdgeTuples)
def fromEdgeTuples[VD: ClassTag]( rawEdges: RDD[(VertexId, VertexId)], defaultValue: VD, uniqueEdges: Option[PartitionStrategy] = None, edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY, vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, Int] =
1.2 第一步:構建邊EdgeRDD
1.2.1 加載邊的文本信息
從持久化系統(HDFS)中加載邊的文本信息,按行處理生成tuple, 即(srcId, dstId)
api:
val rawEdgesRdd: RDD[(Long, Long)] = sc.textFile(input, partitionNum).filter(s => s != "0,0").repartition(partitionNum).map { case line => val ss = line.split(",") val src = ss(0).toLong val dst = ss(1).toLong if (src < dst) (src, dst) else (dst, src) }.distinct()
數據形如:
107,109 108,109 110,111 110,112 111,112 113,114 115,116 117,79 117,118 79,118
1.2.2 第二步:初步生成Graph
-
入口:Graph.fromEdgeTuples(rawEdgesRdd)
元數據為,分割的兩個點ID,把元數據映射成Edge(srcId, dstId, attr)對象, attr默認為null。這樣元數據就構建成了RDD[Edge[ED]] -
RDD[Edge[ED]]要進一步轉化成EdgeRDDImpl[ED, VD]
首先遍歷RDD[Edge[ED]]的分區partitions,對分區內的邊重排序new Sorter(Edge.edgeArraySortDataFormat[ED]).sort(edgeArray, 0, edgeArray.length, Edge.lexicographicOrdering)即:按照srcId從小到大排序。問:為何要重排序?
答:為了遍歷時順序訪問。采用數組而不是Map,數組是連續的內存單元,具有原子性,避免了Map的hash問題,訪問速度快 -
填充localSrcIds,localDstIds, data, index, global2local, local2global, vertexAttrs
數組localSrcIds,localDstIds中保存的是經過global2local.changeValue(srcId/dstId)轉變的本地索引,即:localSrcIds、localDstIds數組下標對應于分區元素,數組中保存的索引位可以定位到local2global中查到具體的VertexId
global2local是spark私有的Map數據結構GraphXPrimitiveKeyOpenHashMap, 保存vertextId和本地索引的映射關系。global2local中包含當前partition所有srcId、dstId與本地索引的映射關系。
data就是當前分區的attr屬性數組
index索引最有意思,按照srcId重排序的邊數據, 會看到相同的srcId對應了不同的dstId, 見圖中index desc部分。index中記錄的是相同srcId中第一個出現的srcId與其下標。
local2global記錄的是所有的VertexId信息的數組。形如:srcId,dstId,srcId,dstId,srcId,dstId,srcId,dstId。其中會包含相同的ID。即:當前分區所有vertextId的順序實際值
# 用途: # 根據本地下標取VertexId localSrcIds/localDstIds -> index -> local2global -> VertexId # 根據VertexId取本地下標,取屬性 VertexId -> global2local -> index -> data -> attr object
spark的數據最終是在patition中表達,所以各種transform都在這里進行,這里的數據結構性能至關重要
1.3 第二步:構建頂點(VertexRDD)
入口:GraphImpl365行。val vertices = VertexRDD.fromEdges(edgesCached, edgesCached.partitions.size, defaultVertexAttr).withTargetStorageLevel(vertexStorageLevel)
根據邊EdgeRDD[ED, VD]構建出點VertexRDD, 點是孤島,不像邊一樣保存點與點之間的關系。點只保存屬性attr。所以需要對拿到的點做分區。
為了能通過點找到邊,每個點需要保存點所在到邊信息即分區Id(pid),這些新保存在路由表RoutingTablePartition中。
構建的過程:
-
創建路由表
根據EdgeRDD,map其分區,對edge partition中的數據轉換成RoutingTableMessage數據結構。
特別激動的是: 為節省內存,把edgePartitionId和一個標志位通過一個32位的int表示。
如圖所示,RoutingTableMessage是自定義的類型類, 一個包含vid和int的tuple(VertexId, Int)。 int的32~31位表示一個標志位,01: isSrcId 10: isDstId。30~0位表示邊分區ID。贊這種做法,目測作者是山西人。
RoutingTableMessage想表達這樣的信息:一個頂點ID,不管未來你到天涯海角,請帶著你女朋友Edge的地址: edge分區ID。并且帶著一個標志你在女友心中的位置是:01是左邊isSrcId,10是右邊isDstId。這樣就算你流浪到非洲,也能找到小女友約會...blabla...
-
根據路由表生成分區對象vertexPartitions
- 上(1)中生成的消息路由表信息,重新分區,分區數目根edge分區數保持一致。
-
在新分區中,map分區中的每條數據,從RoutingTableMessage解出數據:vid, edge pid, isSrcId/isDstId。這個三個數據項重新封裝到三個數據結構中:pid2vid,srcFlags,dstFlags
-
有意思的地方來了,想一下,shuffle以后新分區中的點來自于之前edge不同分區,那么一個點要找到邊,就需要先確定邊的分區號pid, 然后在確定的edge分區中確定是srcId還是dstId, 這樣就找到了邊。
val pid2vid = Array.fill(numEdgePartitions)(new PrimitiveVector[VertexId]) val srcFlags = Array.fill(numEdgePartitions)(new PrimitiveVector[Boolean]) val dstFlags = Array.fill(numEdgePartitions)(new PrimitiveVector[Boolean])
上面表達的是:當前vertex分區中點在edge分區中的分布。新分區中保存這樣的記錄(vids.trim().array, toBitSet(srcFlags(pid)), toBitSet(dstFlags(pid)))vid, srcFlag, dstFlag, flag通過BitSet存儲,很省。
如此就生成了vertex的路由表routingTables
-
生成ShippableVertexPartition
根據上面routingTables, 重新封裝路由表里的數據結構為:ShippableVertexPartition
ShippableVertexPartition會合并相同重復點的屬性attr對象,補全缺失的attr對象。
關鍵是:根據vertexId生成map:GraphXPrimitiveKeyOpenHashMap,這個map跟邊中的global2local是不是很相似?這個map根據long vertxId生成下標索引,目測:相同的點會有相同的下標。// todo..
- 上(1)中生成的消息路由表信息,重新分區,分區數目根edge分區數保持一致。
-
創建VertexRDDImpl對象
new VertexRDDImpl(vertexPartitions),這就完事了
1.4 第三步 生成Graph對象[finished]
把上述edgeRDD和vertexRDD拿過來組成Graph
new GraphImpl(vertices, new ReplicatedVertexView(edges.asInstanceOf[EdgeRDDImpl[ED, VD]]))
2. 常用函數分析
下面分析一下常用的graph函數aggregateMessages
2.1 aggregateMessages
aggregateMessages是Graphx最重要的API,1.2版本添加的新函數,用于替換mapReduceTriplets。目前mapReduceTriplets最終也是使用兼容的aggregateMessages
據說改用aggregateMessages后,性能提升30%。
它主要功能是向鄰邊發消息,合并鄰邊收到的消息,返回messageRDD
aggregateMessages的接口如下:
def aggregateMessages[A: ClassTag]( sendMsg: EdgeContext[VD, ED, A] => Unit, mergeMsg: (A, A) => A, tripletFields: TripletFields = TripletFields.All) : VertexRDD[A] = { aggregateMessagesWithActiveSet(sendMsg, mergeMsg, tripletFields, None) }
-
sendMsg: 發消息函數
private def sendMsg(ctx: EdgeContext[KCoreVertex, Int, Map[Int, Int]]): Unit = { ctx.sendToDst(Map(ctx.srcAttr.preKCore -> -1, ctx.srcAttr.curKCore -> 1)) ctx.sendToSrc(Map(ctx.dstAttr.preKCore -> -1, ctx.dstAttr.curKCore -> 1)) }
- mergeMsg:合并消息函數。用于Map階段,每個edge分區中每個點收到的消息合并,以及reduce階段,合并不同分區的消息。合并vertexId相同的消息
- tripletFields:定義發消息的方向
2.1.1 aggregateMessages Map階段
從入口函數進入aggregateMessagesWithActiveSet,首先使用VertexRDD[VD]更新replicatedVertexView, 只更新其中vertexRDD中attr對象。
問:為啥更新replicatedVertexView? 答:replicatedVertexView就是個點和邊的視圖,點的屬性有變化,要更新邊中包含的點的attr
replicatedVertexView這里對edgeRDD做mapPartitions操作,所有的操作都在每個邊分區的迭代中完成。
-
進入aggregateMessagesEdgeScan
前文中提到edge partition中包含的五個重要數據結構之一:localSrcIds, 頂點vertixId在當前分區中的索引.
- 遍歷localSrcIds, 根據其下標去localSrcIds中拿到srcId在全局local2global中的索引位,然后拿到srcId; 同理,根據下標,去localDstIds中取到local2global中的索引位, 取出dstId
有了srcId和dstId,你就可以blabla....
問: 為啥用localSrcIds的下標
答: 用localDstIds的也可以。一條邊必然包含兩個點:srcId, dstId-
發消息
看上圖:
- 根據接口中定義的tripletFields,拿到發消息的方向: 1) 向dstId發;2) 向srcId發;3) 向兩邊發;4) 向其中一邊發
- 發消息的過程就是遍歷到一條邊,向以srcId/dstId在本分區內的本地IDlocalId為下標的數組中添加數據,如果localId為下標數組中已經存在數據,則執行合并函數mergeMsg
- 每個點之間在發消息的時候是獨立的,即:點單純根據方向,向以相鄰點的localId為下標的數組中插數據,互相獨立,在并行上互不影響。
完事,返回消息RDDmessages: RDD[(VertexId, VD2)]
- 根據接口中定義的tripletFields,拿到發消息的方向: 1) 向dstId發;2) 向srcId發;3) 向兩邊發;4) 向其中一邊發
2.1.2 aggregateMessages Reduce階段
因為對邊上,對點發消息,所以在reduce階段主要是VertexRDD的菜。
入口(Graphmpl 260行):vertices.aggregateUsingIndex(preAgg, mergeMsg)
收到messages: RDD[(VertexId, VD2)]消息RDD,開始:
-
對messages做shuffled分區,分區器使用VertexRDD的partitioner。
因為VertexRDD的partitioner根據點VertexID做分區,所以vertexId->消息分區后的pid根VertextRDD完全相同,這樣用zipPartitions高效的合并兩個分區的數據
-
根據對等合并attr, 聚合函數使用API傳入的mergeMsg函數
- 小技巧:遍歷節點時,遍歷messagePartition。并不是每個節點都會收到消息,所以messagePartition集合最小,所以速度會快。遍歷小集合取大集合的數據。
- 前文提到根據routingTables路由表生成VertexRDD的vertexPartitions時, vertexPartitions中重新封裝了ShippableVertexPartition對象,其定義為:
ShippableVertexPartition[VD: ClassTag]( val index: VertexIdToIndexMap, val values: Array[VD], val mask: BitSet, val routingTable: RoutingTablePartition)
最后生成對象:new ShippableVertexPartition(map.keySet, map._values, map.keySet.getBitSet, routingTable)
所以這里用到的index就是map.keySet, map就是映射vertexId->attr
index: map.keySet, hashSet, vertexId->下標
values: map._valuers, Array[Int], 根據下標保存attr。
so so,根據vetexId從index中取到其下標,再根據下標,從values中取到attr,存在attr就用API傳入的函數mergeMsg合并屬性attr; 不存在就直接賦值。
最后得到的是收到消息的VertexRDD
到這里,整個map/reduce過程就完成了。
翠花,上全圖
如果沒有繞暈,從頭再讀一遍
------待續--------
來自:https://github.com/shijinkui/spark_study/blob/master/spark_graphx_analyze.markdown