Spark Block存儲管理分析
Apache Spark中,對Block的查詢、存儲管理,是通過唯一的Block ID來進行區分的。所以,了解Block ID的生成規則,能夠幫助我們了解Block查詢、存儲過程中是如何定位Block以及如何處理互斥存儲/讀取同一個Block的。可以想到,同一個Spark Application,以及多個運行的Application之間,對應的Block都具有唯一的ID,通過代碼可以看到,BlockID包括:RDDBlockId、ShuffleBlockId、ShuffleDataBlockId、ShuffleIndexBlockId、BroadcastBlockId、TaskResultBlockId、TempLocalBlockId、TempShuffleBlockId這8種ID,可以詳見如下代碼定義:
@DeveloperApi
case class RDDBlockId(rddId: Int, splitIndex: Int) extends BlockId {
override def name: String = "rdd_" + rddId + "_" + splitIndex
}
// Format of the shuffle block ids (including data and index) should be kept in sync with
// org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getBlockData().
@DeveloperApi
case class ShuffleBlockId(shuffleId: Int, mapId: Int, reduceId: Int) extends BlockId {
override def name: String = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId
}
@DeveloperApi
case class ShuffleDataBlockId(shuffleId: Int, mapId: Int, reduceId: Int) extends BlockId {
override def name: String = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId + ".data"
}
@DeveloperApi
case class ShuffleIndexBlockId(shuffleId: Int, mapId: Int, reduceId: Int) extends BlockId {
override def name: String = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId + ".index"
}
@DeveloperApi
case class BroadcastBlockId(broadcastId: Long, field: String = "") extends BlockId {
override def name: String = "broadcast_" + broadcastId + (if (field == "") "" else "_" + field)
}
@DeveloperApi
case class TaskResultBlockId(taskId: Long) extends BlockId {
override def name: String = "taskresult_" + taskId
}
@DeveloperApi
case class StreamBlockId(streamId: Int, uniqueId: Long) extends BlockId {
override def name: String = "input-" + streamId + "-" + uniqueId
}
/** Id associated with temporary local data managed as blocks. Not serializable. */
private[spark] case class TempLocalBlockId(id: UUID) extends BlockId {
override def name: String = "temp_local_" + id
}
/** Id associated with temporary shuffle data managed as blocks. Not serializable. */
private[spark] case class TempShuffleBlockId(id: UUID) extends BlockId {
override def name: String = "temp_shuffle_" + id
}
我們以RDDBlockId的生成規則為例,它是以前綴字符串“rdd_”為前綴、分配的全局RDD ID、下劃線“_”、Partition ID這4部分拼接而成,因為RDD ID是唯一的,所以最終構造好的RDDBlockId對應的字符串就是唯一的。如果該Block存在,查詢可以唯一定位到該Block,存儲也不會出現覆蓋其他RDDBlockId的問題。
下面,我們通過分析MemoryStore、DiskStore、BlockManager、BlockInfoManager這4個最核心的與Block管理相關的實現類,來理解Spark對Block的管理。全文中,我們主要針對RDDBlockId對應的Block數據的處理、存儲、查詢、讀取,來分析Block的管理。
MemoryStore
先說明一下MemoryStore,它主要用來在內存中存儲Block數據,可以避免重復計算同一個RDD的Partition數據。一個Block對應著一個RDD的一個Partition的數據。當StorageLevel設置為如下值時,都會可能會需要使用MemoryStore來存儲數據:
MEMORY_ONLY
MEMORY_ONLY_2
MEMORY_ONLY_SER
MEMORY_ONLY_SER_2
MEMORY_AND_DISK
MEMORY_AND_DISK_2
MEMORY_AND_DISK_SER
MEMORY_AND_DISK_SER_2
OFF_HEAP
所以,MemoryStore提供對Block數據的存儲、讀取等操作API,MemoryStore也提供了多種存儲方式,下面詳細說明每種方式。
- 以序列化格式保存Block數據
def putBytes[T: ClassTag](
blockId: BlockId,
size: Long,
memoryMode: MemoryMode,
_bytes: () => ChunkedByteBuffer): Boolean
首先,通過MemoryManager來申請Storage內存,調用putBytes方法,會根據size大小去申請Storage內存,如果申請成功,則會將blockId對應的Block數據保存在內部的LinkedHashMap[BlockId, MemoryEntry[_]]映射表中,然后以SerializedMemoryEntry這種序列化的格式存儲,實際SerializedMemoryEntry就是簡單指向Buffer中數據的引用對象:
private case class SerializedMemoryEntry[T](
buffer: ChunkedByteBuffer,
memoryMode: MemoryMode,
classTag: ClassTag[T]) extends MemoryEntry[T] {
def size: Long = buffer.size
}
如果無法申請到size大小的Storage內存,則存儲失敗,對于出現這種失敗的情況,需要使用MemoryStore存儲API的調用者去處理異常情況。
- 基于記錄迭代器,以反序列化Java對象形式保存Block數據
private[storage] def putIteratorAsValues[T](
blockId: BlockId,
values: Iterator[T],
classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long]
這種方式,調用者希望將Block數據記錄以反序列化的方式保存在內存中,如果內存中能放得下,則返回最終Block數據記錄的大小,否則返回一個PartiallyUnrolledIterator[T]迭代器,其中對應如下2種情況:
第一種,Block數據記錄能夠完全放到內存中:和前面的方式類似,能夠全部放到內存,但是不同的是,這種方式對應的數據格式是反序列化的Java對象格式,對應實現類DeserializedMemoryEntry[T],它也會被直接存放到MemoryStore內部的LinkedHashMap[BlockId, MemoryEntry[_]]映射表中。DeserializedMemoryEntry[T]類定義如下所示:
private case class DeserializedMemoryEntry[T](
value: Array[T],
size: Long,
classTag: ClassTag[T]) extends MemoryEntry[T] {
val memoryMode: MemoryMode = MemoryMode.ON_HEAP
}
它與SerializedMemoryEntry都是MemoryEntry[T]的子類,所有被放到同一個映射表LinkedHashMap[BlockId, MemoryEntry[_]] entries中。
另外,也存在這種可能,通過MemoryManager申請的Unroll內存大小大于該Block打開需要的內存,則會返回如下結果對象:
Left(new PartiallyUnrolledIterator( this, unrollMemoryUsedByThisBlock,
unrolled = arrayValues.toIterator, rest = Iterator.empty))
上面unrolled = arrayValues.toIterator,rest = Iterator.empty,表示在內存中可以打開迭代器中全部的數據記錄,打開對象類型為DeserializedMemoryEntry[T]。
第二種,Block數據記錄只能部分放到內存中:也就是說Driver或Executor上的內存有限,只可以放得下部分記錄,另一部分記錄內存中放不下。values記錄迭代器對應的全部記錄數據無法完全放在內存中,所以為了保證不發生OOM異常,首選會調用MemoryManager的acquireUnrollMemory方法去申請Unroll內存,如果可以申請到,在迭代values的過程中,需要累加計算打開(Unroll)的記錄對象大小之和,使其大小不能大于申請到的Unroll內存,直到還有一部分記錄無法放到申請的Unroll內存中。 最后,返回的結果對象如下所示:
Left(new PartiallyUnrolledIterator(
this, unrollMemoryUsedByThisBlock, unrolled = vector.iterator, rest = values))
上面的PartiallyUnrolledIterator中rest對應的values就是putIteratorAsValues方法傳進來的迭代器參數值,該迭代器已經迭代出部分記錄,放到了內存中,調用者可以繼續迭代該迭代器去處理未打開(Unroll)的記錄,而unrolled對應一個打開記錄的迭代器。這里,PartiallyUnrolledIterator迭代器包裝了vector.iterator和一個迭代出部分記錄的values迭代器,調用者對PartiallyUnrolledIterator進行統一迭代能夠獲取到全部記錄,里面包含兩種類型的記錄:DeserializedMemoryEntry[T]和T。
- 基于記錄迭代器,以序列化二進制格式保存Block數據
private[storage] def putIteratorAsBytes[T](
blockId: BlockId,
values: Iterator[T],
classTag: ClassTag[T],
memoryMode: MemoryMode): Either[PartiallySerializedBlock[T], Long]
這種方式,調用這種希望將Block數據記錄以二進制的格式保存在內存中。如果內存中能放得下,則返回最終的大小,否則返回一個PartiallySerializedBlock[T]迭代器。
如果Block數據記錄能夠完全放到內存中,則以SerializedMemoryEntry[T]格式放到內存的映射表中。如果Block數據記錄只能部分放到內存中,則返回如下對象:
Left(
new PartiallySerializedBlock(
this,
serializerManager,
blockId,
serializationStream,
redirectableStream,
unrollMemoryUsedByThisBlock,
memoryMode,
bbos.toChunkedByteBuffer,
values,
classTag))
類似地,返回結果對象對調用者保持統一的迭代API視圖。
DiskStore
DiskStore提供了將Block數據寫入到磁盤的基本操作,它是通過DiskBlockManager來管理邏輯上Block到物理磁盤上Block文件路徑的映射關系。當StorageLevel設置為如下值時,都可能會需要使用DiskStore來存儲數據:
DISK_ONLY
DISK_ONLY_2
MEMORY_AND_DISK
MEMORY_AND_DISK_2
MEMORY_AND_DISK_SER
MEMORY_AND_DISK_SER_2
OFF_HEAP
DiskBlockManager管理了每個Block數據存儲位置的信息,包括從Block ID到磁盤上文件的映射關系。DiskBlockManager主要有如下幾個功能:
- 負責創建一個本地節點上的指定磁盤目錄,用來存儲Block數據到指定文件中
- 如果Block數據想要落盤,需要通過調用getFile方法來分配一個唯一的文件路徑
- 如果想要查詢一個Block是否在磁盤上,通過調用containsBlock方法來查詢
- 查詢當前節點上管理的全部Block文件
- 通過調用createTempLocalBlock方法,生成一個唯一Block ID,并創建一個唯一的臨時文件,用來存儲中間結果數據
- 通過調用createTempShuffleBlock方法,生成一個唯一Block ID,并創建一個唯一的臨時文件,用來存儲Shuffle過程的中間結果數據
DiskStore提供的基本操作接口,與MemoryStore類似,比較簡單,如下所示:
- 通過文件流寫Block數據
該種方式對應的接口方法,如下所示:
def put(blockId: BlockId)(writeFunc: FileOutputStream => Unit): Unit
參數指定Block ID,還有一個寫Block數據到打開的文件流的函數,在調用put方法時,首先會從DiskBlockManager分配一個Block ID對應的磁盤文件路徑,然后將數據寫入到該文件中。
- 將二進制Block數據寫入文件
putBytes方法實現了,將一個Buffer中的Block數據寫入指定的Block ID對應的文件中,方法定義如下所示:
def putBytes(blockId: BlockId, bytes: ChunkedByteBuffer): Unit
實際上,它是調用上面的put方法,將bytes中的Block二進制數據寫入到Block文件中。
- 從磁盤文件讀取Block數據
對應方法如下所示:
def getBytes(blockId: BlockId): ChunkedByteBuffer
通過給定的blockId,獲取磁盤上對應的Block文件的數據,以ChunkedByteBuffer的形式返回。
- 刪除Block文件
對應的刪除方法定義,如下所示:
def remove(blockId: BlockId): Boolean
通過DiskBlockManager查找到blockId對應的Block文件,然后刪除掉。
BlockManager
談到Spark中的Block數據存儲,我們很容易能夠想到BlockManager,他負責管理在每個Dirver和Executor上的Block數據,可能是本地或者遠程的。具體操作包括查詢Block、將Block保存在指定的存儲中,如內存、磁盤、堆外(Off-heap)。而BlockManager依賴的后端,對Block數據進行內存、磁盤存儲訪問,都是基于前面講到的MemoryStore、DiskStore。
在Spark集群中,當提交一個Application執行時,該Application對應的Driver以及所有的Executor上,都存在一個BlockManager、BlockManagerMaster,而BlockManagerMaster是負責管理各個BlockManager之間通信,這個BlockManager管理集群,如下圖所示:
關于一個Application運行過程中Block的管理,主要是基于該Application所關聯的一個Driver和多個Executor構建了一個Block管理集群:Driver上的(BlockManagerMaster, BlockManagerMasterEndpoint)是集群的Master角色,所有Executor上的(BlockManagerMaster, RpcEndpointRef)作為集群的Slave角色。當Executor上的Task運行時,會查詢對應的RDD的某個Partition對應的Block數據是否處理過,這個過程中會觸發多個BlockManager之間的通信交互。我們以ShuffleMapTask的運行為例,對應代碼如下所示:
override def runTask(context: TaskContext): MapStatus = {
// Deserialize the RDD using the broadcast variable.
val deserializeStartTime = System.currentTimeMillis()
val ser = SparkEnv.get.closureSerializer.newInstance()
val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
_executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
var writer: ShuffleWriter[Any, Any] = null
try {
val manager = SparkEnv.get.shuffleManager
writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]]) // 處理RDD的Partition的數據
writer.stop(success = true).get
} catch {
case e: Exception =>
try {
if (writer != null) {
writer.stop(success = false)
}
} catch {
case e: Exception =>
log.debug("Could not stop writer", e)
}
throw e
}
}
一個RDD的Partition對應一個ShuffleMapTask,一個ShuffleMapTask會在一個Executor上運行,它負責處理RDD的一個Partition對應的數據,基本處理流程,如下所示:
- 根據該Partition的數據,創建一個RDDBlockId(由RDD ID和Partition Index組成),即得到一個穩定的blockId(如果該Partition數據被處理過,則可能本地或者遠程Executor存儲了對應的Block數據)。
- 先從BlockManager獲取該blockId對應的數據是否存在,如果本地存在(已經處理過),則直接返回Block結果(BlockResult);否則,查詢遠程的Executor是否已經處理過該Block,處理過則直接通過網絡傳輸到當前Executor本地,并根據StorageLevel設置,保存Block數據到本地MemoryStore或DiskStore,同時通過BlockManagerMaster上報Block數據狀態(通知Driver當前的Block狀態,亦即,該Block數據存儲在哪個BlockManager中)。
- 如果本地及遠程Executor都沒有處理過該Partition對應的Block數據,則調用RDD的compute方法進行計算處理,并將處理的Block數據,根據StorageLevel設置,存儲到本地MemoryStore或DiskStore。
- 根據ShuffleManager對應的ShuffleWriter,將返回的該Partition的Block數據進行Shuffle寫入操作
下面,我們基于上面邏輯,詳細分析在這個處理過程中重要的交互邏輯:
- 根據RDD獲取一個Partition對應數據的記錄迭代器
用戶提交的Spark Application程序,會設置對應的StorageLevel,所以設置與不設置對該處理邏輯有一定影響,具有兩種情況,如下圖所示:
如果用戶程序設置了StorageLevel,可能該Partition的數據已經處理過,那么對應的處理結果Block數據可能已經存儲。一般設置的StorageLevel,或者將Block存儲在內存中,或者存儲在磁盤上,這里會嘗試調用getOrElseUpdate()方法獲取對應的Block數據,如果存在則直接返回Block對應的記錄的迭代器實例,就不需要重新計算了,如果沒有找到對應的已經處理過的Block數據,則調用RDD的compute()方法進行處理,處理結果根據StorageLevel設置,將Block數據存儲在內存或磁盤上,緩存供后續Task重復使用。
如果用戶程序沒有設置StorageLevel,那么RDD對應的該Partition的數據一定沒有進行處理過,即使處理過,如果沒有進行Checkpointing,也需要重新計算(如果進行了Checkpointing,可以直接從緩存中獲取),直接調用RDD的compute()方法進行處理。
- 從BlockManager查詢獲取Block數據
每個Executor上都有一個BlockManager實例,負責管理用戶提交的該Application計算過程中產生的Block。很有可能當前Executor上存儲在RDD對應Partition的經過處理后得到的Block數據,也有可能當前Executor上沒有,但是其他Executor上已經處理過并緩存了Block數據,所以對應著本地獲取、遠程獲取兩種可能,本地獲取交互邏輯如下圖所示:
從本地獲取,根據StorageLevel設置,如果是存儲在內存中,則從本地的MemoryStore中查詢,存在則讀取并返回;如果是存儲在磁盤上,則從本地的DiskStore中查詢,存在則讀取并返回。本地不存在,則會從遠程的Executor讀取,對應的組件交互邏輯,如下圖所示:
遠程獲取交互邏輯相對比較復雜:當前Executor上的BlockManager通過BlockManagerMaster,向遠程的Driver上的BlockManagerMasterEndpoint查詢對應Block ID,有哪些Executor已經保存了該Block數據,Dirver返回一個包含了該Block數據的Location列表,如果對應的Location信息與當前ShuffleMapTask執行所在Executor在同一臺節點上,則會優先使用該Location,因為同一節點上的多個Executor之間傳輸Block數據效率更高。
這里需要說明的是,如果對應的Block數據的StorageLevel設置為寫磁盤,通過前面我們知道,DiskStore是通過DiskBlockManager進行管理存儲到磁盤上的Block數據文件的,在同一個節點上的多個Executor共享相同的磁盤文件路徑,相同的Block數據文件也就會被同一個節點上的多個Executor所共享。而對應MemoryStore,因為每個Executor對應獨立的JVM實例,從而具有獨立的Storage/Execution內存管理,所以使用MemoryStore不能共享同一個Block數據,但是同一個節點上的多個Executor之間的MemoryStore之間拷貝數據,比跨網絡傳輸要高效的多。
BlockInfoManager
用戶提交一個Spark Application程序,如果程序對應的DAG圖相對復雜,其中很多Task計算的結果Block數據都有可能被重復使用,這種情況下如何去控制某個Executor上的Task線程去讀寫Block數據呢?其實,BlockInfoManager就是用來控制Block數據讀寫操作,并且跟蹤Task讀寫了哪些Block數據的映射關系,這樣如果兩個Task都想去處理同一個RDD的同一個Partition數據,如果沒有鎖來控制,很可能兩個Task都會計算并寫同一個Block數據,從而造成混亂。我們分析每種情況下,BlockInfoManager是如何管理Block數據(同一個RDD的同一個Partition)讀寫的:
- 第一個Task請求寫Block數據
這種情況下,沒有其他Task寫Block數據,第一個Task直接獲取到寫鎖,并啟動寫Block數據到本地MemoryStore或DiskStore。如果其他寫Block數據的Task也請求寫鎖,則該Task會阻塞,等待第一個獲取寫鎖的Task完成寫Block數據,直到第一個Task寫完成,并通知其他阻塞的Task,然后其他Task需要再次獲取到讀鎖來讀取該Block數據。
- 第一個Task正在寫Block數據,其他Task請求讀Block數據
這種情況,Block數據沒有完成寫操作,其他讀Block數據的Task只能阻塞,等待寫Block的Task完成并通知讀Task去讀取Block數據。
- Task請求讀取Block數據
如果該Block數據不存在,則直接返回空,表示當前RDD的該Partition并沒有被處理過。如果當前Block數據存在,并且沒有其他Task在寫,表示已經完成了些Block數據操作,則該Task直接讀取該Block數據。
來自:http://shiyanjun.cn/archives/1641.html