MemeoryStore
上一節(jié),我們對(duì)BlockManager的主要寫(xiě)入方法做了一個(gè)整理,知道了BlockMananger的主要寫(xiě)入邏輯,以及對(duì)于塊信息的管理。但是,由于spark的整個(gè)存儲(chǔ)模塊是在是很龐大,而且很多細(xì)節(jié)的邏輯錯(cuò)綜復(fù)雜,如果對(duì)于每個(gè)細(xì)節(jié)都刨根問(wèn)底,一來(lái)精力有限,二來(lái)感覺(jué)也沒(méi)有太大的必要,當(dāng)然如果時(shí)間允許肯定是越詳細(xì)越好,在這里,我的分析的主要目的是理清存儲(chǔ)模塊的重點(diǎn)邏輯,希望能夠提綱契領(lǐng)地把各個(gè)模塊的脈絡(luò)領(lǐng)出來(lái),建立起對(duì)spark-core中各模塊的整體認(rèn)知,這樣我們?cè)谟龅揭恍﹩?wèn)題的時(shí)候就能夠很快地知道應(yīng)該從何處下手,從哪個(gè)具體的模塊去找問(wèn)題。
好了廢話(huà)不多說(shuō),本節(jié)接著上一節(jié)。上一篇,我們分析了BlockManager的幾個(gè)主要的存儲(chǔ)方法,發(fā)現(xiàn)BlockManager主要依靠?jī)?nèi)部的兩個(gè)組件MemoryStore和DiskStore來(lái)進(jìn)行實(shí)際的數(shù)據(jù)寫(xiě)入和塊的管理。
本節(jié),我們就來(lái)看一下MemoryStore這個(gè)組件。
不過(guò),我還是延續(xù)我一貫的風(fēng)格,從外部對(duì)一個(gè)類(lèi)的方法調(diào)用為切入點(diǎn)分析這個(gè)類(lèi)的作用和邏輯。
所以,我們先來(lái)看一下上一節(jié)對(duì)于MemoryStore的主要的方法調(diào)用的總結(jié):
memoryStore.putIteratorAsValues
memoryStore.putIteratorAsBytes
memoryStore.putBytes
memoryStore.putIteratorAsValues
這個(gè)方法主要是用于存儲(chǔ)級(jí)別是非序列化的情況,即直接以java對(duì)象的形式將數(shù)據(jù)存放在jvm堆內(nèi)存上。我們都知道,在jvm堆內(nèi)存上存放大量的對(duì)象并不是什么好事,gc壓力大,擠占內(nèi)存,可能引起頻繁的gc,但是也有明顯的好處,就是省去了序列化和反序列化耗時(shí),而且直接從堆內(nèi)存取數(shù)據(jù)顯然比任何其他方式(磁盤(pán)和直接內(nèi)存)都要快很多,所以對(duì)于內(nèi)存充足且要緩存的數(shù)據(jù)量本省不是很大的情況,這種方式也不失為一種不錯(cuò)的選擇。
private[storage] def putIteratorAsValues[T](
blockId: BlockId,
values: Iterator[T],
classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = {
// 用于存儲(chǔ)java對(duì)象的容器
val valuesHolder = new DeserializedValuesHolder[T](classTag)
putIterator(blockId, values, classTag, MemoryMode.ON_HEAP, valuesHolder) match {
// 存儲(chǔ)成功
case Right(storedSize) => Right(storedSize)
// 存儲(chǔ)失敗的情況
case Left(unrollMemoryUsedByThisBlock) =>
// ValuesHolder內(nèi)部的數(shù)組和vector會(huì)相互轉(zhuǎn)換
// 數(shù)據(jù)寫(xiě)入完成后會(huì)將vector中的數(shù)據(jù)轉(zhuǎn)移到數(shù)組中
val unrolledIterator = if (valuesHolder.vector != null) {
valuesHolder.vector.iterator
} else {
valuesHolder.arrayValues.toIterator
}
// 返回寫(xiě)入一半的迭代器、
// 外部調(diào)用者一半會(huì)選擇關(guān)閉這個(gè)迭代器以釋放被使用的內(nèi)存
Left(new PartiallyUnrolledIterator(
this,
MemoryMode.ON_HEAP,
unrollMemoryUsedByThisBlock,
unrolled = unrolledIterator,
rest = values))
}
}
這個(gè)方法的邏輯很簡(jiǎn)單,作用也比較單一,主要是對(duì)實(shí)際存儲(chǔ)方法putIterator的返回結(jié)果做處理,如果失敗的話(huà),就封裝一個(gè)PartiallyUnrolledIterator返回給外部調(diào)用這個(gè),調(diào)用這個(gè)一般需要將這個(gè)寫(xiě)入一半的迭代器關(guān)閉。
MemoryStore.putIterator
這個(gè)方法看似很長(zhǎng),其實(shí)邏輯相對(duì)簡(jiǎn)單,主要做的事就是把數(shù)據(jù)一條一條往ValuesHolder中寫(xiě),并周期性地檢查內(nèi)存,如果內(nèi)存不夠就通過(guò)內(nèi)存管理器MemoryManager申請(qǐng)內(nèi)存,每次申請(qǐng)當(dāng)前內(nèi)存量的1.5倍。
最后,將ValuesHolder中的數(shù)據(jù)轉(zhuǎn)移到一個(gè)數(shù)組中(其實(shí)數(shù)據(jù)在SizeTrackingVector中也是以數(shù)組的形式存儲(chǔ),只不過(guò)SizeTrackingVector對(duì)象內(nèi)部處理數(shù)組還有一些其他的簿記量,更為關(guān)鍵的是我們需要將存儲(chǔ)的數(shù)據(jù)以同一的接口進(jìn)行包裝,以利于MemoryStore進(jìn)行同一管理)。最后還有關(guān)鍵的一步,就是釋放展開(kāi)內(nèi)存,重新申請(qǐng)存儲(chǔ)內(nèi)存。
此外,這個(gè)過(guò)程中有使用到memoryManager,具體的方法調(diào)用是:
memoryManager.acquireUnrollMemory(blockId, memory, memoryMode)
------------------------------分割線(xiàn)------------------------------
private def putIterator[T](
blockId: BlockId,
values: Iterator[T],
classTag: ClassTag[T],
memoryMode: MemoryMode,
valuesHolder: ValuesHolder[T]): Either[Long, Long] = {
require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
// Number of elements unrolled so far
var elementsUnrolled = 0
// Whether there is still enough memory for us to continue unrolling this block
var keepUnrolling = true
// Initial per-task memory to request for unrolling blocks (bytes).
// 用于數(shù)據(jù)在內(nèi)存展開(kāi)的初始的內(nèi)存使用量
val initialMemoryThreshold = unrollMemoryThreshold
// How often to check whether we need to request more memory
// 檢查內(nèi)存的頻率,每寫(xiě)這么多條數(shù)據(jù)就會(huì)檢查一次是否需要申請(qǐng)額外的內(nèi)存
val memoryCheckPeriod = conf.get(UNROLL_MEMORY_CHECK_PERIOD)
// Memory currently reserved by this task for this particular unrolling operation
// 內(nèi)存閾值,開(kāi)始時(shí)等于初始閾值
var memoryThreshold = initialMemoryThreshold
// Memory to request as a multiple of current vector size
// 內(nèi)存增長(zhǎng)因子,每次申請(qǐng)的內(nèi)存是當(dāng)前內(nèi)存的這個(gè)倍數(shù)
val memoryGrowthFactor = conf.get(UNROLL_MEMORY_GROWTH_FACTOR)
// Keep track of unroll memory used by this particular block / putIterator() operation
// 當(dāng)前的塊使用的內(nèi)存大小
var unrollMemoryUsedByThisBlock = 0L
// Request enough memory to begin unrolling
// 首先進(jìn)行初始的內(nèi)存申請(qǐng),向MemoryManager申請(qǐng)內(nèi)存
keepUnrolling =
reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold, memoryMode)
if (!keepUnrolling) {
logWarning(s"Failed to reserve initial memory threshold of " +
s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.")
} else {
// 如果成功申請(qǐng)到內(nèi)存,則累加記錄
unrollMemoryUsedByThisBlock += initialMemoryThreshold
}
// Unroll this block safely, checking whether we have exceeded our threshold periodically
// 循環(huán)將每條數(shù)據(jù)寫(xiě)入容器中valuesHolder
while (values.hasNext && keepUnrolling) {
valuesHolder.storeValue(values.next())
// 如果寫(xiě)入數(shù)據(jù)的條數(shù)達(dá)到一個(gè)周期,那么就檢查一下是否需要申請(qǐng)額外的內(nèi)存
if (elementsUnrolled % memoryCheckPeriod == 0) {
// 通過(guò)valuesHolder獲取已經(jīng)寫(xiě)入的數(shù)據(jù)的評(píng)估大小
// 注意,這里的數(shù)據(jù)大小只是估計(jì)值,并不是十分準(zhǔn)確
// 具體如何進(jìn)行估算的可以看valuesHolder內(nèi)部實(shí)現(xiàn)
val currentSize = valuesHolder.estimatedSize()
// If our vector's size has exceeded the threshold, request more memory
// 如果已寫(xiě)入的數(shù)據(jù)大小超過(guò)了當(dāng)前閾值
if (currentSize >= memoryThreshold) {
// 這里每次申請(qǐng)的內(nèi)存量都是不一樣的
// 每次申請(qǐng)的內(nèi)存是當(dāng)前已使用內(nèi)存的1.5倍(默認(rèn))
val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong
keepUnrolling =
reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode)
if (keepUnrolling) {
// 記錄累積申請(qǐng)的內(nèi)存量
unrollMemoryUsedByThisBlock += amountToRequest
}
// New threshold is currentSize * memoryGrowthFactor
// 目前已經(jīng)向內(nèi)存管理器申請(qǐng)的內(nèi)存量
memoryThreshold += amountToRequest
}
}
// 記錄插入的數(shù)據(jù)條數(shù)
elementsUnrolled += 1
}
// Make sure that we have enough memory to store the block. By this point, it is possible that
// the block's actual memory usage has exceeded the unroll memory by a small amount, so we
// perform one final call to attempt to allocate additional memory if necessary.
// 如果keepUnrolling為true,說(shuō)明順利地將所有數(shù)據(jù)插入,
// 并未遇到申請(qǐng)內(nèi)存失敗的情況
if (keepUnrolling) {
// 將內(nèi)部的數(shù)據(jù)轉(zhuǎn)移到一個(gè)數(shù)組中
val entryBuilder = valuesHolder.getBuilder()
// 數(shù)據(jù)在內(nèi)存中的精確大小
val size = entryBuilder.preciseSize
// 實(shí)際的大小可能大于申請(qǐng)的內(nèi)存量
// 因此根據(jù)實(shí)際大小還要再申請(qǐng)額外的內(nèi)存
if (size > unrollMemoryUsedByThisBlock) {
val amountToRequest = size - unrollMemoryUsedByThisBlock
keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode)
if (keepUnrolling) {
unrollMemoryUsedByThisBlock += amountToRequest
}
}
if (keepUnrolling) {
// 獲取MemoryEntry對(duì)象,該對(duì)象是對(duì)插入數(shù)據(jù)的包裝
val entry = entryBuilder.build()
// Synchronize so that transfer is atomic
memoryManager.synchronized {
// 這一步主要是釋放申請(qǐng)的展開(kāi)內(nèi)存
// 然后申請(qǐng)存儲(chǔ)內(nèi)存
// 這里需要弄清楚展開(kāi)內(nèi)存的概念
// 展開(kāi)狀態(tài)指的是對(duì)象在內(nèi)存中處于一種比較松散的狀態(tài),這樣的狀態(tài)方便做一些管理如統(tǒng)計(jì)大小等
// 而隨后將對(duì)象轉(zhuǎn)移到數(shù)組中,處于一種比較緊實(shí)的狀態(tài),數(shù)組相對(duì)來(lái)說(shuō)占用的額外內(nèi)存是比較小的
// 一個(gè)數(shù)組只是一個(gè)對(duì)象,只有一個(gè)對(duì)象頭,可以用來(lái)管理大量的對(duì)象
releaseUnrollMemoryForThisTask(memoryMode, unrollMemoryUsedByThisBlock)
// 申請(qǐng)存儲(chǔ)內(nèi)存
val success = memoryManager.acquireStorageMemory(blockId, entry.size, memoryMode)
assert(success, "transferring unroll memory to storage memory failed")
}
// 放入map中管理起來(lái)
entries.synchronized {
entries.put(blockId, entry)
}
logInfo("Block %s stored as values in memory (estimated size %s, free %s)".format(blockId,
Utils.bytesToString(entry.size), Utils.bytesToString(maxMemory - blocksMemoryUsed)))
Right(entry.size)
} else {
// We ran out of space while unrolling the values for this block
logUnrollFailureMessage(blockId, entryBuilder.preciseSize)
// 如果失敗,返回已經(jīng)申請(qǐng)的展開(kāi)內(nèi)存
Left(unrollMemoryUsedByThisBlock)
}
} else {
// We ran out of space while unrolling the values for this block
logUnrollFailureMessage(blockId, valuesHolder.estimatedSize())
Left(unrollMemoryUsedByThisBlock)
}
}
memoryStore.putIteratorAsBytes
我們?cè)倏戳硪粋€(gè)方法。套路基本和putIteratorAsValues是一樣一樣的。
最大的區(qū)別在于ValuesHolder類(lèi)型不同。非序列化形式存儲(chǔ)使用的是DeserializedMemoryEntry,而序列化形式存儲(chǔ)使用的是SerializedMemoryEntry。
private[storage] def putIteratorAsBytes[T](
blockId: BlockId,
values: Iterator[T],
classTag: ClassTag[T],
memoryMode: MemoryMode): Either[PartiallySerializedBlock[T], Long] = {
require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
// Initial per-task memory to request for unrolling blocks (bytes).
val initialMemoryThreshold = unrollMemoryThreshold
// 字節(jié)數(shù)組的塊大小,默認(rèn)是1m
val chunkSize = if (initialMemoryThreshold > Int.MaxValue) {
logWarning(s"Initial memory threshold of ${Utils.bytesToString(initialMemoryThreshold)} " +
s"is too large to be set as chunk size. Chunk size has been capped to " +
s"${Utils.bytesToString(Int.MaxValue)}")
Int.MaxValue
} else {
initialMemoryThreshold.toInt
}
// 字節(jié)數(shù)組的容器
val valuesHolder = new SerializedValuesHolder[T](blockId, chunkSize, classTag,
memoryMode, serializerManager)
putIterator(blockId, values, classTag, memoryMode, valuesHolder) match {
case Right(storedSize) => Right(storedSize)
case Left(unrollMemoryUsedByThisBlock) =>
// 部分展開(kāi),部分以序列化形式存儲(chǔ)的block
Left(new PartiallySerializedBlock(
this,
serializerManager,
blockId,
valuesHolder.serializationStream,
valuesHolder.redirectableStream,
unrollMemoryUsedByThisBlock,
memoryMode,
valuesHolder.bbos,
values,
classTag))
}
}
memoryStore.putBytes
我們?cè)賮?lái)看另一個(gè)被外部調(diào)用用來(lái)插入數(shù)據(jù)的方法。很簡(jiǎn)單,不說(shuō)了。
def putBytes[T: ClassTag](
blockId: BlockId,
size: Long,
memoryMode: MemoryMode,
_bytes: () => ChunkedByteBuffer): Boolean = {
require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
// 首先向內(nèi)存管理器申請(qǐng)內(nèi)存
// 這里申請(qǐng)的是存儲(chǔ)內(nèi)存,因?yàn)橐迦氲淖止?jié)數(shù)組,
// 所以不需要再展開(kāi),也就不需要申請(qǐng)展開(kāi)內(nèi)存
if (memoryManager.acquireStorageMemory(blockId, size, memoryMode)) {
// We acquired enough memory for the block, so go ahead and put it
val bytes = _bytes()
assert(bytes.size == size)
// 這里直接構(gòu)建了一個(gè)SerializedMemoryEntry
// 并放到map中管理起來(lái)
val entry = new SerializedMemoryEntry[T](bytes, memoryMode, implicitly[ClassTag[T]])
entries.synchronized {
entries.put(blockId, entry)
}
logInfo("Block %s stored as bytes in memory (estimated size %s, free %s)".format(
blockId, Utils.bytesToString(size), Utils.bytesToString(maxMemory - blocksMemoryUsed)))
true
} else {
false
}
}
小結(jié)
通過(guò)對(duì)上面的三個(gè)方法,其實(shí)主要是前兩個(gè)方法的分析,我們發(fā)現(xiàn),除了對(duì)內(nèi)存進(jìn)行簿記管理之外,以及通過(guò)內(nèi)存管理器申請(qǐng)內(nèi)存之外,插入數(shù)據(jù)最主要的工作其實(shí)都是有ValuesHolder對(duì)象來(lái)完成的。
ValuesHolder特質(zhì)有兩個(gè)實(shí)現(xiàn)類(lèi):DeserializedValuesHolder和SerializedValuesHolder。
DeserializedValuesHolder
DeserializedValuesHolder對(duì)象內(nèi)部有兩個(gè)成員:vector,是一個(gè)SizeTrackingVector;arrayValues,是一個(gè)存放值的數(shù)組,用于在所有數(shù)據(jù)插入后,將主句轉(zhuǎn)移到一個(gè)數(shù)組中,方便包裝成一個(gè)MemoryEntry對(duì)象。大部分工作是有SizeTrackingVector完成的。
private class DeserializedValuesHolder[T] (classTag: ClassTag[T]) extends ValuesHolder[T] {
// Underlying vector for unrolling the block
var vector = new SizeTrackingVector[T]()(classTag)
var arrayValues: Array[T] = null
override def storeValue(value: T): Unit = {
vector += value
}
override def estimatedSize(): Long = {
vector.estimateSize()
}
override def getBuilder(): MemoryEntryBuilder[T] = new MemoryEntryBuilder[T] {
// We successfully unrolled the entirety of this block
arrayValues = vector.toArray
vector = null
override val preciseSize: Long = SizeEstimator.estimate(arrayValues)
override def build(): MemoryEntry[T] =
DeserializedMemoryEntry[T](arrayValues, preciseSize, classTag)
}
}
SizeTracker
上面提到的SizeTrackingVector繼承了這個(gè)特質(zhì),除了這個(gè)特質(zhì),還集成了PrimitiveVector類(lèi),但是PrimitiveVector類(lèi)基本上就是對(duì)一個(gè)數(shù)組的簡(jiǎn)單包裝。
SizeTrackingVector最重要的功能:追蹤對(duì)象的大小,就是在SizeTracker特之中實(shí)現(xiàn)的。
我大致說(shuō)一下這個(gè)特質(zhì)是如何實(shí)現(xiàn)對(duì)象大小跟蹤和估算的,代碼實(shí)現(xiàn)也并不復(fù)雜,感興趣的可以看一看,限于篇幅這里就不貼了。
- 每插入一定數(shù)量的數(shù)據(jù)(姑且稱(chēng)之為周期),就會(huì)對(duì)當(dāng)前的對(duì)象進(jìn)行一次取樣,而這個(gè)取樣的周期會(huì)越來(lái)越長(zhǎng),以1.1倍的速率增長(zhǎng);
- 取樣就是計(jì)算對(duì)象大小,并與前一次取樣作比較,而且只會(huì)保留最近兩次的取樣數(shù)據(jù);
- 每次取樣其實(shí)就是獲取兩個(gè)數(shù)據(jù),當(dāng)前對(duì)象大小,當(dāng)前插入的數(shù)據(jù)條數(shù);
- 這樣與上一次取樣一比較,就能夠計(jì)算出每條數(shù)據(jù)的大小了;
- 最后,在返回整個(gè)對(duì)象大小時(shí),是拿最近一次取樣時(shí)記錄下的對(duì)象大小,以及根據(jù)最近的情況估算的每條數(shù)據(jù)的大小乘以自從上次取樣以來(lái)新插入的數(shù)據(jù)量,二者相加作為對(duì)象大小的估算值,
可見(jiàn)這么做并不是什么精確,但是由于是抽樣,而且抽樣周期越往后面越長(zhǎng),所以對(duì)于數(shù)據(jù)插入的效率影響很小,而且這種不精確性其實(shí)在后續(xù)的內(nèi)存檢查過(guò)程中是有考慮到的。在所有數(shù)據(jù)插入完的收尾工作中,會(huì)對(duì)對(duì)象大小做一次精確計(jì)算。此外,熟悉spark內(nèi)存管理的同學(xué)應(yīng)該知道,其實(shí)spark一般會(huì)配置一個(gè)安全因子(一般是0.9),也就是說(shuō)只是用配置的內(nèi)存大小的90%,就是為了盡可能地減少這種不精確的內(nèi)存估算造成OOM的可能性。
SerializedValuesHolder
private class SerializedValuesHolder[T](
blockId: BlockId,
chunkSize: Int,
classTag: ClassTag[T],
memoryMode: MemoryMode,
serializerManager: SerializerManager) extends ValuesHolder[T] {
val allocator = memoryMode match {
case MemoryMode.ON_HEAP => ByteBuffer.allocate _
// 調(diào)用unsafe的本地方法申請(qǐng)直接內(nèi)存
// 這個(gè)方法之所以沒(méi)有調(diào)用ByteBuffer.allocateDirect方法
// 是因?yàn)檫@個(gè)方法分配的直接內(nèi)存大小收到參數(shù)MaxDirectMemorySize限制
// 所以這里繞過(guò)ByteBuffer.allocateDirect方法,通過(guò)反射和unsafe類(lèi)創(chuàng)建直接內(nèi)存對(duì)象
case MemoryMode.OFF_HEAP => Platform.allocateDirectBuffer _
}
val redirectableStream = new RedirectableOutputStream
val bbos = new ChunkedByteBufferOutputStream(chunkSize, allocator)
redirectableStream.setOutputStream(bbos)
val serializationStream: SerializationStream = {
val autoPick = !blockId.isInstanceOf[StreamBlockId]
val ser = serializerManager.getSerializer(classTag, autoPick).newInstance()
// 包裝壓縮流和序列化流
ser.serializeStream(serializerManager.wrapForCompression(blockId, redirectableStream))
}
// 寫(xiě)入方法,寫(xiě)入的對(duì)象經(jīng)過(guò)序列化,壓縮,
// 然后經(jīng)過(guò)ChunkedByteBufferOutputStream被分割成一個(gè)個(gè)的字節(jié)數(shù)組塊
override def storeValue(value: T): Unit = {
serializationStream.writeObject(value)(classTag)
}
override def estimatedSize(): Long = {
bbos.size
}
override def getBuilder(): MemoryEntryBuilder[T] = new MemoryEntryBuilder[T] {
// We successfully unrolled the entirety of this block
serializationStream.close()
override def preciseSize(): Long = bbos.size
override def build(): MemoryEntry[T] =
SerializedMemoryEntry[T](bbos.toChunkedByteBuffer, memoryMode, classTag)
}
}
大概看一下,主要的邏輯很簡(jiǎn)單,這里面有幾個(gè)注意點(diǎn):
- 對(duì)于直接內(nèi)存分配,spark并沒(méi)有使用jdk的高級(jí)api,而是反射配合unsafe類(lèi)分配直接內(nèi)存,這樣可以繞過(guò)jvm參數(shù)MaxDirectMemorySize的限制,這也體現(xiàn)了spark的作者盡可能的降低用戶(hù)使用難度
- 另外,我們看到序列化流其實(shí)經(jīng)過(guò)了層層包裝(典型的裝飾器模式),序列化和壓縮以及分塊是比較重要的幾個(gè)點(diǎn),感興趣的話(huà)可以深究,序列化和壓縮如果深入了解都是很大的課題,所以這里也僅僅是蜻蜓點(diǎn)水,不深究了。
總結(jié)
MemoryStore.scala這個(gè)文件中乍看代碼有八百多行,但是其實(shí)很大部分代碼是一些輔助類(lèi),比較核心的寫(xiě)入邏輯也就是前面提到的幾個(gè)方法,再加上核心的兩個(gè)類(lèi)DeserializedValuesHolder和SerializedValuesHolder實(shí)現(xiàn)了以對(duì)象或字節(jié)數(shù)組的形式存儲(chǔ)數(shù)據(jù)。
|