Apache Spark中,對(duì)Block的查詢、存儲(chǔ)管理,是通過的Block ID來進(jìn)行區(qū)分的。所以,了解Block ID的生成規(guī)則,能夠幫助我們了解Block查詢、存儲(chǔ)過程中是如何定位Block以及如何處理互斥存儲(chǔ)/讀取同一個(gè)Block的。
可以想到,同一個(gè)Spark Application,以及多個(gè)運(yùn)行的Application之間,對(duì)應(yīng)的Block都具有的ID,通過代碼可以看到,BlockID包括:RDDBlockId、ShuffleBlockId、ShuffleDataBlockId、ShuffleIndexBlockId、BroadcastBlockId、TaskResultBlockId、TempLocalBlockId、TempShuffleBlockId這8種ID,可以詳見如下代碼定義:
@DeveloperApi case class RDDBlockId(rddId: Int, splitIndex: Int) extends BlockId { override def name: String = "rdd_" + rddId + "_" + splitIndex
} // Format of the shuffle block ids (including data and index) should be kept in sync with // org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getBlockData(). @DeveloperApi case class ShuffleBlockId(shuffleId: Int, mapId: Int, reduceId: Int) extends BlockId { override def name: String = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId
} @DeveloperApi case class ShuffleDataBlockId(shuffleId: Int, mapId: Int, reduceId: Int) extends BlockId { override def name: String = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId + ".data" } @DeveloperApi case class ShuffleIndexBlockId(shuffleId: Int, mapId: Int, reduceId: Int) extends BlockId { override def name: String = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId + ".index" } @DeveloperApi case class BroadcastBlockId(broadcastId: Long, field: String = "") extends BlockId { override def name: String = "broadcast_" + broadcastId + (if (field == "") "" else "_" + field)
} @DeveloperApi case class TaskResultBlockId(taskId: Long) extends BlockId { override def name: String = "taskresult_" + taskId
} @DeveloperApi case class StreamBlockId(streamId: Int, uniqueId: Long) extends BlockId { override def name: String = "input-" + streamId + "-" + uniqueId
} /** Id associated with temporary local data managed as blocks. Not serializable. */ private[spark] case class TempLocalBlockId(id: UUID) extends BlockId { override def name: String = "temp_local_" + id
} /** Id associated with temporary shuffle data managed as blocks. Not serializable. */ private[spark] case class TempShuffleBlockId(id: UUID) extends BlockId { override def name: String = "temp_shuffle_" + id
}
我們以RDDBlockId的生成規(guī)則為例,它是以前綴字符串“rdd_”為前綴、分配的全局RDD ID、下劃線“_”、Partition ID這4部分拼接而成,因?yàn)镽DD ID是的,所以終構(gòu)造好的RDDBlockId對(duì)應(yīng)的字符串就是的。如果該Block存在,查詢可以定位到該Block,存儲(chǔ)也不會(huì)出現(xiàn)覆蓋其他RDDBlockId的問題。
下面,我們通過分析MemoryStore、DiskStore、BlockManager、BlockInfoManager這4個(gè)核心的與Block管理相關(guān)的實(shí)現(xiàn)類,來理解Spark對(duì)Block的管理。全文中,我們主要針對(duì)RDDBlockId對(duì)應(yīng)的Block數(shù)據(jù)的處理、存儲(chǔ)、查詢、讀取,來分析Block的管理。
先說明一下MemoryStore,它主要用來在內(nèi)存中存儲(chǔ)Block數(shù)據(jù),可以避免重復(fù)計(jì)算同一個(gè)RDD的Partition數(shù)據(jù)。一個(gè)Block對(duì)應(yīng)著一個(gè)RDD的一個(gè)Partition的數(shù)據(jù)。當(dāng)StorageLevel設(shè)置為如下值時(shí),都會(huì)可能會(huì)需要使用MemoryStore來存儲(chǔ)數(shù)據(jù):
MEMORY_ONLY
MEMORY_ONLY_2
MEMORY_ONLY_SER
MEMORY_ONLY_SER_2
MEMORY_AND_DISK
MEMORY_AND_DISK_2
MEMORY_AND_DISK_SER
MEMORY_AND_DISK_SER_2
OFF_HEAP
所以,MemoryStore提供對(duì)Block數(shù)據(jù)的存儲(chǔ)、讀取等操作API,MemoryStore也提供了多種存儲(chǔ)方式,下面詳細(xì)說明每種方式。
def putBytes[T: ClassTag](
blockId: BlockId,
size: Long,
memoryMode: MemoryMode,
_bytes: () => ChunkedByteBuffer): Boolean
首先,通過MemoryManager來申請(qǐng)Storage內(nèi)存,調(diào)用putBytes方法,會(huì)根據(jù)size大小去申請(qǐng)Storage內(nèi)存,如果申請(qǐng)成功,則會(huì)將blockId對(duì)應(yīng)的Block數(shù)據(jù)保存在內(nèi)部的LinkedHashMap[BlockId, MemoryEntry[_]]映射表中,然后以SerializedMemoryEntry這種序列化的格式存儲(chǔ),實(shí)際SerializedMemoryEntry就是簡(jiǎn)單指向Buffer中數(shù)據(jù)的引用對(duì)象:
private case class SerializedMemoryEntry[T](
buffer: ChunkedByteBuffer,
memoryMode: MemoryMode,
classTag: ClassTag[T]) extends MemoryEntry[T] { def size: Long = buffer.size
}
如果無法申請(qǐng)到size大小的Storage內(nèi)存,則存儲(chǔ)失敗,對(duì)于出現(xiàn)這種失敗的情況,需要使用MemoryStore存儲(chǔ)API的調(diào)用者去處理異常情況。
private[storage] def putIteratorAsValues[T](
blockId: BlockId,
values: Iterator[T],
classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long]
這種方式,調(diào)用者希望將Block數(shù)據(jù)記錄以反序列化的方式保存在內(nèi)存中,如果內(nèi)存中能放得下,則返回終Block數(shù)據(jù)記錄的大小,否則返回一個(gè)PartiallyUnrolledIterator[T]迭代器,其中對(duì)應(yīng)如下2種情況:
種,Block數(shù)據(jù)記錄能夠完全放到內(nèi)存中:和前面的方式類似,能夠全部放到內(nèi)存,但是不同的是,這種方式對(duì)應(yīng)的數(shù)據(jù)格式是反序列化的Java對(duì)象格式,對(duì)應(yīng)實(shí)現(xiàn)類DeserializedMemoryEntry[T],它也會(huì)被直接存放到MemoryStore內(nèi)部的LinkedHashMap[BlockId, MemoryEntry[_]]映射表中。DeserializedMemoryEntry[T]類定義如下所示:
private case class DeserializedMemoryEntry[T](
value: Array[T],
size: Long,
classTag: ClassTag[T]) extends MemoryEntry[T] { val memoryMode: MemoryMode = MemoryMode.ON_HEAP
}
它與SerializedMemoryEntry都是MemoryEntry[T]的子類,所有被放到同一個(gè)映射表LinkedHashMap[BlockId, MemoryEntry[_]] entries中。
另外,也存在這種可能,通過MemoryManager申請(qǐng)的Unroll內(nèi)存大小大于該Block打開需要的內(nèi)存,則會(huì)返回如下結(jié)果對(duì)象:
Left(new PartiallyUnrolledIterator( this, unrollMemoryUsedByThisBlock,
unrolled = arrayValues.toIterator, rest = Iterator.empty))
上面unrolled = arrayValues.toIterator,rest = Iterator.empty,表示在內(nèi)存中可以打開迭代器中全部的數(shù)據(jù)記錄,打開對(duì)象類型為DeserializedMemoryEntry[T]。
第二種,Block數(shù)據(jù)記錄只能部分放到內(nèi)存中:也就是說Driver或Executor上的內(nèi)存有限,只可以放得下部分記錄,另一部分記錄內(nèi)存中放不下。values記錄迭代器對(duì)應(yīng)的全部記錄數(shù)據(jù)無法完全放在內(nèi)存中,所以為了保證不發(fā)生OOM異常,首選會(huì)調(diào)用MemoryManager的acquireUnrollMemory方法去申請(qǐng)Unroll內(nèi)存,如果可以申請(qǐng)到,在迭代values的過程中,需要累加計(jì)算打開(Unroll)的記錄對(duì)象大小之和,使其大小不能大于申請(qǐng)到的Unroll內(nèi)存,直到還有一部分記錄無法放到申請(qǐng)的Unroll內(nèi)存中。 后,返回的結(jié)果對(duì)象如下所示:
Left(new PartiallyUnrolledIterator( this, unrollMemoryUsedByThisBlock, unrolled = vector.iterator, rest = values))
上面的PartiallyUnrolledIterator中rest對(duì)應(yīng)的values就是putIteratorAsValues方法傳進(jìn)來的迭代器參數(shù)值,該迭代器已經(jīng)迭代出部分記錄,放到了內(nèi)存中,調(diào)用者可以繼續(xù)迭代該迭代器去處理未打開(Unroll)的記錄,而unrolled對(duì)應(yīng)一個(gè)打開記錄的迭代器。這里,PartiallyUnrolledIterator迭代器包裝了vector.iterator和一個(gè)迭代出部分記錄的values迭代器,調(diào)用者對(duì)PartiallyUnrolledIterator進(jìn)行統(tǒng)一迭代能夠獲取到全部記錄,里面包含兩種類型的記錄:DeserializedMemoryEntry[T]和T。
private[storage] def putIteratorAsBytes[T](
blockId: BlockId,
values: Iterator[T],
classTag: ClassTag[T],
memoryMode: MemoryMode): Either[PartiallySerializedBlock[T], Long]
這種方式,調(diào)用這種希望將Block數(shù)據(jù)記錄以二進(jìn)制的格式保存在內(nèi)存中。如果內(nèi)存中能放得下,則返回終的大小,否則返回一個(gè)PartiallySerializedBlock[T]迭代器。
如果Block數(shù)據(jù)記錄能夠完全放到內(nèi)存中,則以SerializedMemoryEntry[T]格式放到內(nèi)存的映射表中。如果Block數(shù)據(jù)記錄只能部分放到內(nèi)存中,則返回如下對(duì)象:
Left( new PartiallySerializedBlock( this,
serializerManager,
blockId,
serializationStream,
redirectableStream,
unrollMemoryUsedByThisBlock,
memoryMode,
bbos.toChunkedByteBuffer,
values,
classTag))
類似地,返回結(jié)果對(duì)象對(duì)調(diào)用者保持統(tǒng)一的迭代API視圖。
DiskStore提供了將Block數(shù)據(jù)寫入到磁盤的基本操作,它是通過DiskBlockManager來管理邏輯上Block到物理磁盤上Block文件路徑的映射關(guān)系。當(dāng)StorageLevel設(shè)置為如下值時(shí),都可能會(huì)需要使用DiskStore來存儲(chǔ)數(shù)據(jù):
DISK_ONLY
DISK_ONLY_2
MEMORY_AND_DISK
MEMORY_AND_DISK_2
MEMORY_AND_DISK_SER
MEMORY_AND_DISK_SER_2
OFF_HEAP
DiskBlockManager管理了每個(gè)Block數(shù)據(jù)存儲(chǔ)位置的信息,包括從Block ID到磁盤上文件的映射關(guān)系。DiskBlockManager主要有如下幾個(gè)功能:
DiskStore提供的基本操作接口,與MemoryStore類似,比較簡(jiǎn)單,如下所示:
該種方式對(duì)應(yīng)的接口方法,如下所示:
def put(blockId: BlockId)(writeFunc: FileOutputStream => Unit): Unit
參數(shù)指定Block ID,還有一個(gè)寫B(tài)lock數(shù)據(jù)到打開的文件流的函數(shù),在調(diào)用put方法時(shí),首先會(huì)從DiskBlockManager分配一個(gè)Block ID對(duì)應(yīng)的磁盤文件路徑,然后將數(shù)據(jù)寫入到該文件中。
putBytes方法實(shí)現(xiàn)了,將一個(gè)Buffer中的Block數(shù)據(jù)寫入指定的Block ID對(duì)應(yīng)的文件中,方法定義如下所示:
def putBytes(blockId: BlockId, bytes: ChunkedByteBuffer): Unit
實(shí)際上,它是調(diào)用上面的put方法,將bytes中的Block二進(jìn)制數(shù)據(jù)寫入到Block文件中。
對(duì)應(yīng)方法如下所示:
def getBytes(blockId: BlockId): ChunkedByteBuffer
通過給定的blockId,獲取磁盤上對(duì)應(yīng)的Block文件的數(shù)據(jù),以ChunkedByteBuffer的形式返回。
對(duì)應(yīng)的刪除方法定義,如下所示:
def remove(blockId: BlockId): Boolean
通過DiskBlockManager查找到blockId對(duì)應(yīng)的Block文件,然后刪除掉。
談到Spark中的Block數(shù)據(jù)存儲(chǔ),我們很容易能夠想到BlockManager,他負(fù)責(zé)管理在每個(gè)Dirver和Executor上的Block數(shù)據(jù),可能是本地或者遠(yuǎn)程的。具體操作包括查詢Block、將Block保存在指定的存儲(chǔ)中,如內(nèi)存、磁盤、堆外(Off-heap)。而BlockManager依賴的后端,對(duì)Block數(shù)據(jù)進(jìn)行內(nèi)存、磁盤存儲(chǔ)訪問,都是基于前面講到的MemoryStore、DiskStore。
在Spark集群中,當(dāng)提交一個(gè)Application執(zhí)行時(shí),該Application對(duì)應(yīng)的Driver以及所有的Executor上,都存在一個(gè)BlockManager、BlockManagerMaster,而BlockManagerMaster是負(fù)責(zé)管理各個(gè)BlockManager之間通信,這個(gè)BlockManager管理集群,如下圖所示:
關(guān)于一個(gè)Application運(yùn)行過程中Block的管理,主要是基于該Application所關(guān)聯(lián)的一個(gè)Driver和多個(gè)Executor構(gòu)建了一個(gè)Block管理集群:Driver上的(BlockManagerMaster, BlockManagerMasterEndpoint)是集群的Master角色,所有Executor上的(BlockManagerMaster, RpcEndpointRef)作為集群的Slave角色。當(dāng)Executor上的Task運(yùn)行時(shí),會(huì)查詢對(duì)應(yīng)的RDD的某個(gè)Partition對(duì)應(yīng)的Block數(shù)據(jù)是否處理過,這個(gè)過程中會(huì)觸發(fā)多個(gè)BlockManager之間的通信交互。我們以ShuffleMapTask的運(yùn)行為例,對(duì)應(yīng)代碼如下所示:
override def runTask(context: TaskContext): MapStatus = {
// Deserialize the RDD using the broadcast variable.
val deserializeStartTime = System.currentTimeMillis()
val ser = SparkEnv.get.closureSerializer.newInstance()
val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
_executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
var writer: ShuffleWriter[Any, Any] = null
try {
val manager = SparkEnv.get.shuffleManager writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]]) // 處理RDD的Partition的數(shù)據(jù)
writer.stop(success = true).get } catch {
case e: Exception =>
try {
if (writer != null) {
writer.stop(success = false)
}
} catch {
case e: Exception =>
log.debug("Could not stop writer", e)
}
throw e
}
}
一個(gè)RDD的Partition對(duì)應(yīng)一個(gè)ShuffleMapTask,一個(gè)ShuffleMapTask會(huì)在一個(gè)Executor上運(yùn)行,它負(fù)責(zé)處理RDD的一個(gè)Partition對(duì)應(yīng)的數(shù)據(jù),基本處理流程,如下所示:
下面,我們基于上面邏輯,詳細(xì)分析在這個(gè)處理過程中重要的交互邏輯:
用戶提交的Spark Application程序,會(huì)設(shè)置對(duì)應(yīng)的StorageLevel,所以設(shè)置與不設(shè)置對(duì)該處理邏輯有一定影響,具有兩種情況,如下圖所示:
如果用戶程序設(shè)置了StorageLevel,可能該P(yáng)artition的數(shù)據(jù)已經(jīng)處理過,那么對(duì)應(yīng)的處理結(jié)果Block數(shù)據(jù)可能已經(jīng)存儲(chǔ)。一般設(shè)置的StorageLevel,或者將Block存儲(chǔ)在內(nèi)存中,或者存儲(chǔ)在磁盤上,這里會(huì)嘗試調(diào)用getOrElseUpdate()方法獲取對(duì)應(yīng)的Block數(shù)據(jù),如果存在則直接返回Block對(duì)應(yīng)的記錄的迭代器實(shí)例,就不需要重新計(jì)算了,如果沒有找到對(duì)應(yīng)的已經(jīng)處理過的Block數(shù)據(jù),則調(diào)用RDD的compute()方法進(jìn)行處理,處理結(jié)果根據(jù)StorageLevel設(shè)置,將Block數(shù)據(jù)存儲(chǔ)在內(nèi)存或磁盤上,緩存供后續(xù)Task重復(fù)使用。
如果用戶程序沒有設(shè)置StorageLevel,那么RDD對(duì)應(yīng)的該P(yáng)artition的數(shù)據(jù)一定沒有進(jìn)行處理過,即使處理過,如果沒有進(jìn)行Checkpointing,也需要重新計(jì)算(如果進(jìn)行了Checkpointing,可以直接從緩存中獲取),直接調(diào)用RDD的compute()方法進(jìn)行處理。
每個(gè)Executor上都有一個(gè)BlockManager實(shí)例,負(fù)責(zé)管理用戶提交的該Application計(jì)算過程中產(chǎn)生的Block。很有可能當(dāng)前Executor上存儲(chǔ)在RDD對(duì)應(yīng)Partition的經(jīng)過處理后得到的Block數(shù)據(jù),也有可能當(dāng)前Executor上沒有,但是其他Executor上已經(jīng)處理過并緩存了Block數(shù)據(jù),所以對(duì)應(yīng)著本地獲取、遠(yuǎn)程獲取兩種可能,本地獲取交互邏輯如下圖所示:
從本地獲取,根據(jù)StorageLevel設(shè)置,如果是存儲(chǔ)在內(nèi)存中,則從本地的MemoryStore中查詢,存在則讀取并返回;如果是存儲(chǔ)在磁盤上,則從本地的DiskStore中查詢,存在則讀取并返回。本地不存在,則會(huì)從遠(yuǎn)程的Executor讀取,對(duì)應(yīng)的組件交互邏輯,如下圖所示:
遠(yuǎn)程獲取交互邏輯相對(duì)比較復(fù)雜:當(dāng)前Executor上的BlockManager通過BlockManagerMaster,向遠(yuǎn)程的Driver上的BlockManagerMasterEndpoint查詢對(duì)應(yīng)Block ID,有哪些Executor已經(jīng)保存了該Block數(shù)據(jù),Dirver返回一個(gè)包含了該Block數(shù)據(jù)的Location列表,如果對(duì)應(yīng)的Location信息與當(dāng)前ShuffleMapTask執(zhí)行所在Executor在同一臺(tái)節(jié)點(diǎn)上,則會(huì)優(yōu)先使用該Location,因?yàn)橥还?jié)點(diǎn)上的多個(gè)Executor之間傳輸Block數(shù)據(jù)效率更高。
這里需要說明的是,如果對(duì)應(yīng)的Block數(shù)據(jù)的StorageLevel設(shè)置為寫磁盤,通過前面我們知道,DiskStore是通過DiskBlockManager進(jìn)行管理存儲(chǔ)到磁盤上的Block數(shù)據(jù)文件的,在同一個(gè)節(jié)點(diǎn)上的多個(gè)Executor共享相同的磁盤文件路徑,相同的Block數(shù)據(jù)文件也就會(huì)被同一個(gè)節(jié)點(diǎn)上的多個(gè)Executor所共享。而對(duì)應(yīng)MemoryStore,因?yàn)槊總€(gè)Executor對(duì)應(yīng)獨(dú)立的JVM實(shí)例,從而具有獨(dú)立的Storage/Execution內(nèi)存管理,所以使用MemoryStore不能共享同一個(gè)Block數(shù)據(jù),但是同一個(gè)節(jié)點(diǎn)上的多個(gè)Executor之間的MemoryStore之間拷貝數(shù)據(jù),比跨網(wǎng)絡(luò)傳輸要高效的多。
用戶提交一個(gè)Spark Application程序,如果程序?qū)?yīng)的DAG圖相對(duì)復(fù)雜,其中很多Task計(jì)算的結(jié)果Block數(shù)據(jù)都有可能被重復(fù)使用,這種情況下如何去控制某個(gè)Executor上的Task線程去讀寫B(tài)lock數(shù)據(jù)呢?其實(shí),BlockInfoManager就是用來控制Block數(shù)據(jù)讀寫操作,并且跟蹤Task讀寫了哪些Block數(shù)據(jù)的映射關(guān)系,這樣如果兩個(gè)Task都想去處理同一個(gè)RDD的同一個(gè)Partition數(shù)據(jù),如果沒有鎖來控制,很可能兩個(gè)Task都會(huì)計(jì)算并寫同一個(gè)Block數(shù)據(jù),從而造成混亂。我們分析每種情況下,BlockInfoManager是如何管理Block數(shù)據(jù)(同一個(gè)RDD的同一個(gè)Partition)讀寫的:
這種情況下,沒有其他Task寫B(tài)lock數(shù)據(jù),個(gè)Task直接獲取到寫鎖,并啟動(dòng)寫B(tài)lock數(shù)據(jù)到本地MemoryStore或DiskStore。如果其他寫B(tài)lock數(shù)據(jù)的Task也請(qǐng)求寫鎖,則該Task會(huì)阻塞,等待個(gè)獲取寫鎖的Task完成寫B(tài)lock數(shù)據(jù),直到個(gè)Task寫完成,并通知其他阻塞的Task,然后其他Task需要再次獲取到讀鎖來讀取該Block數(shù)據(jù)。
這種情況,Block數(shù)據(jù)沒有完成寫操作,其他讀Block數(shù)據(jù)的Task只能阻塞,等待寫B(tài)lock的Task完成并通知讀Task去讀取Block數(shù)據(jù)。
如果該Block數(shù)據(jù)不存在,則直接返回空,表示當(dāng)前RDD的該P(yáng)artition并沒有被處理過。如果當(dāng)前Block數(shù)據(jù)存在,并且沒有其他Task在寫,表示已經(jīng)完成了些Block數(shù)據(jù)操作,則該Task直接讀取該Block數(shù)據(jù)。
本站文章版權(quán)歸原作者及原出處所有 。內(nèi)容為作者個(gè)人觀點(diǎn), 并不代表本站贊同其觀點(diǎn)和對(duì)其真實(shí)性負(fù)責(zé),本站只提供參考并不構(gòu)成任何投資及應(yīng)用建議。本站是一個(gè)個(gè)人學(xué)習(xí)交流的平臺(tái),網(wǎng)站上部分文章為轉(zhuǎn)載,并不用于任何商業(yè)目的,我們已經(jīng)盡可能的對(duì)作者和來源進(jìn)行了通告,但是能力有限或疏忽,造成漏登,請(qǐng)及時(shí)聯(lián)系我們,我們將根據(jù)著作權(quán)人的要求,立即更正或者刪除有關(guān)內(nèi)容。本站擁有對(duì)此聲明的最終解釋權(quán)。