GraphX 圖數據建模和存儲
背景
簡單分析一下GraphX是怎么為圖數據建模和存儲的。
入口
可以看 GraphLoader 的函數,
def edgeListFile( sc: SparkContext, path: String, canonicalOrientation: Boolean = false, numEdgePartitions: Int = -1, edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY, vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY) : Graph[Int, Int]
- path可以是本地路徑(文件或文件夾),也可以是hdfs路徑,本質上是使用 sc.textFile 來生成HadoopRDD的, numEdgePartitions 是分區數。
- Graph的存儲是分EdgeRDD和VertexRDD兩塊,可以分別設置StorageLevel。默認是內存。
- 這個函數接受邊文件,即’1 2’, ‘4 1’這樣的點到點的數據對組成的文件。把這份文件按分區數和存儲level轉化成一個可以操作的圖。 </ol>
- sc.textFile 讀文件,生成原始的RDD
- 每個分區(的計算節點)把每條記錄放進 PrimitiveVector 里,這個結構是spark里為primitive數據優化的存儲結構。
- 把 PrimitiveVector 里的數據一條條取出,轉化成 EdgePartition ,即 EdgeRDD 的分區實現。這個過程中生成了面向列存的結構:src點的array,dst點的array,edge的屬性array,以及兩個正反向map(用于對應點的local id和global id)。
- 對 EdgeRDD 做一次count觸發這次邊建模任務,真正persist起來。
- 用 EdgePartition 去生成一個 RoutingTablePartition ,里面是vertexId到partitionId的對應關系,借助 RoutingTablePartition 生成 VertexRDD 。
- 由 EdgeRDD 和 VertexRDD 生成 Graph 。前者維護了邊的屬性、邊兩頭頂點的屬性、兩頭頂點各自的global vertexID、兩頭頂點各自的local Id(在一個edge分區里的array index)、用于尋址array的正反向map。后者維護了點存在于哪個邊的分區上的Map。 </ol>
- Stores the locations of edge-partition join sites for each vertex attribute in a particular
- vertex partition. This provides routing information for shipping vertex attributes to edge
- partitions.
*/
private[graphx]
class RoutingTablePartition(
private val routingTable: Array[(Array[VertexId], BitSet, BitSet)]) extends Serializable {</pre>
細節
分區擺放
EdgeRDD 的分區怎么切分的呢?因為數據是根據HadoopRDD從文件里根據offset掃出來的。可以理解為對邊數據的切分是沒有任何處理的,因為文件也沒有特殊排列過,所以切分成多少個分區應該就是隨機的。
VertexRDD 的分區怎么切分的呢? EdgeRDD 生成的vertexIdToPartitionId這份RDD數據是 RDD[VertexId, Int] 型,它根據hash分區規則,分成和 EdgeRDD 分區數一樣大。所以 VertexRDD 的分區數和Edge一樣,分區規則是Long取hash。
所以我可以想象的計算過程是:
對點操作的時候,首先對vertexId(是個Long)進行hash,找到對應分區的位置,在這個分區上,如果是內存存儲的 VertexRDD,那很快可以查到它的邊所在的幾個Edge分區的所在位置,然后把計算分到這幾個Edge所在的分區上去計算。第一步根據點hash后 找邊分區位置的過程就類似一次建好索引的查詢。
配官方圖方面理解:
高效數據結構
對原生類型的存儲和讀寫有比較好的數據結構支持,典型的是 EdgePartition 里使用的map:
/**
- A fast hash map implementation for primitive, non-null keys. This hash map supports
- insertions and updates, but not deletions. This map is about an order of magnitude
- faster than java.util.HashMap, while using much less space overhead. *
- Under the hood, it uses our OpenHashSet implementation.
*/
private[graphx]
class GraphXPrimitiveKeyOpenHashMap[@specialized(Long, Int) K: ClassTag,
以及之前提到的vector@specialized(Long, Int, Double) V: ClassTag](</pre><br />
/**
- An append-only, non-threadsafe, array-backed vector that is optimized for primitive types.
*/
private[spark]
class PrimitiveVector@specialized(Long, Int, Double) V: ClassTag {
private var _numElements = 0
private var array: Array[V] = </pre>
流程
以下是代碼,比較清晰地展現了內部存儲結構。
private[graphx] class EdgePartition @specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassTag, VD: ClassTag extends Serializable {/**
本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!