乡下人产国偷v产偷v自拍,国产午夜片在线观看,婷婷成人亚洲综合国产麻豆,久久综合给合久久狠狠狠9

  • <output id="e9wm2"></output>
    <s id="e9wm2"><nobr id="e9wm2"><ins id="e9wm2"></ins></nobr></s>

    • 分享

      spark存儲(chǔ)模塊之內(nèi)存存儲(chǔ)--MemeoryStore MemeoryStore

       新進(jìn)小設(shè)計(jì) 2021-08-12

      MemeoryStore

      上一節(jié),我們對(duì)BlockManager的主要寫(xiě)入方法做了一個(gè)整理,知道了BlockMananger的主要寫(xiě)入邏輯,以及對(duì)于塊信息的管理。但是,由于spark的整個(gè)存儲(chǔ)模塊是在是很龐大,而且很多細(xì)節(jié)的邏輯錯(cuò)綜復(fù)雜,如果對(duì)于每個(gè)細(xì)節(jié)都刨根問(wèn)底,一來(lái)精力有限,二來(lái)感覺(jué)也沒(méi)有太大的必要,當(dāng)然如果時(shí)間允許肯定是越詳細(xì)越好,在這里,我的分析的主要目的是理清存儲(chǔ)模塊的重點(diǎn)邏輯,希望能夠提綱契領(lǐng)地把各個(gè)模塊的脈絡(luò)領(lǐng)出來(lái),建立起對(duì)spark-core中各模塊的整體認(rèn)知,這樣我們?cè)谟龅揭恍﹩?wèn)題的時(shí)候就能夠很快地知道應(yīng)該從何處下手,從哪個(gè)具體的模塊去找問(wèn)題。
      好了廢話(huà)不多說(shuō),本節(jié)接著上一節(jié)。上一篇,我們分析了BlockManager的幾個(gè)主要的存儲(chǔ)方法,發(fā)現(xiàn)BlockManager主要依靠?jī)?nèi)部的兩個(gè)組件MemoryStore和DiskStore來(lái)進(jìn)行實(shí)際的數(shù)據(jù)寫(xiě)入和塊的管理。
      本節(jié),我們就來(lái)看一下MemoryStore這個(gè)組件。

      不過(guò),我還是延續(xù)我一貫的風(fēng)格,從外部對(duì)一個(gè)類(lèi)的方法調(diào)用為切入點(diǎn)分析這個(gè)類(lèi)的作用和邏輯。
      所以,我們先來(lái)看一下上一節(jié)對(duì)于MemoryStore的主要的方法調(diào)用的總結(jié):

      memoryStore.putIteratorAsValues
      memoryStore.putIteratorAsBytes
      memoryStore.putBytes
      

      memoryStore.putIteratorAsValues

      這個(gè)方法主要是用于存儲(chǔ)級(jí)別是非序列化的情況,即直接以java對(duì)象的形式將數(shù)據(jù)存放在jvm堆內(nèi)存上。我們都知道,在jvm堆內(nèi)存上存放大量的對(duì)象并不是什么好事,gc壓力大,擠占內(nèi)存,可能引起頻繁的gc,但是也有明顯的好處,就是省去了序列化和反序列化耗時(shí),而且直接從堆內(nèi)存取數(shù)據(jù)顯然比任何其他方式(磁盤(pán)和直接內(nèi)存)都要快很多,所以對(duì)于內(nèi)存充足且要緩存的數(shù)據(jù)量本省不是很大的情況,這種方式也不失為一種不錯(cuò)的選擇。

      private[storage] def putIteratorAsValues[T](
        blockId: BlockId,
        values: Iterator[T],
        classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = {
      
      // 用于存儲(chǔ)java對(duì)象的容器
      val valuesHolder = new DeserializedValuesHolder[T](classTag)
      
      putIterator(blockId, values, classTag, MemoryMode.ON_HEAP, valuesHolder) match {
          // 存儲(chǔ)成功
        case Right(storedSize) => Right(storedSize)
          // 存儲(chǔ)失敗的情況
        case Left(unrollMemoryUsedByThisBlock) =>
          // ValuesHolder內(nèi)部的數(shù)組和vector會(huì)相互轉(zhuǎn)換
          // 數(shù)據(jù)寫(xiě)入完成后會(huì)將vector中的數(shù)據(jù)轉(zhuǎn)移到數(shù)組中
          val unrolledIterator = if (valuesHolder.vector != null) {
            valuesHolder.vector.iterator
          } else {
            valuesHolder.arrayValues.toIterator
          }
      
          // 返回寫(xiě)入一半的迭代器、
          // 外部調(diào)用者一半會(huì)選擇關(guān)閉這個(gè)迭代器以釋放被使用的內(nèi)存
          Left(new PartiallyUnrolledIterator(
            this,
            MemoryMode.ON_HEAP,
            unrollMemoryUsedByThisBlock,
            unrolled = unrolledIterator,
            rest = values))
      }
      }
      

      這個(gè)方法的邏輯很簡(jiǎn)單,作用也比較單一,主要是對(duì)實(shí)際存儲(chǔ)方法putIterator的返回結(jié)果做處理,如果失敗的話(huà),就封裝一個(gè)PartiallyUnrolledIterator返回給外部調(diào)用這個(gè),調(diào)用這個(gè)一般需要將這個(gè)寫(xiě)入一半的迭代器關(guān)閉。

      MemoryStore.putIterator

      這個(gè)方法看似很長(zhǎng),其實(shí)邏輯相對(duì)簡(jiǎn)單,主要做的事就是把數(shù)據(jù)一條一條往ValuesHolder中寫(xiě),并周期性地檢查內(nèi)存,如果內(nèi)存不夠就通過(guò)內(nèi)存管理器MemoryManager申請(qǐng)內(nèi)存,每次申請(qǐng)當(dāng)前內(nèi)存量的1.5倍。
      最后,將ValuesHolder中的數(shù)據(jù)轉(zhuǎn)移到一個(gè)數(shù)組中(其實(shí)數(shù)據(jù)在SizeTrackingVector中也是以數(shù)組的形式存儲(chǔ),只不過(guò)SizeTrackingVector對(duì)象內(nèi)部處理數(shù)組還有一些其他的簿記量,更為關(guān)鍵的是我們需要將存儲(chǔ)的數(shù)據(jù)以同一的接口進(jìn)行包裝,以利于MemoryStore進(jìn)行同一管理)。最后還有關(guān)鍵的一步,就是釋放展開(kāi)內(nèi)存,重新申請(qǐng)存儲(chǔ)內(nèi)存。
      此外,這個(gè)過(guò)程中有使用到memoryManager,具體的方法調(diào)用是:

      memoryManager.acquireUnrollMemory(blockId, memory, memoryMode)
      

      ------------------------------分割線(xiàn)------------------------------

      private def putIterator[T](
        blockId: BlockId,
        values: Iterator[T],
        classTag: ClassTag[T],
        memoryMode: MemoryMode,
        valuesHolder: ValuesHolder[T]): Either[Long, Long] = {
      require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
      
      // Number of elements unrolled so far
      var elementsUnrolled = 0
      // Whether there is still enough memory for us to continue unrolling this block
      var keepUnrolling = true
      // Initial per-task memory to request for unrolling blocks (bytes).
      // 用于數(shù)據(jù)在內(nèi)存展開(kāi)的初始的內(nèi)存使用量
      val initialMemoryThreshold = unrollMemoryThreshold
      // How often to check whether we need to request more memory
      // 檢查內(nèi)存的頻率,每寫(xiě)這么多條數(shù)據(jù)就會(huì)檢查一次是否需要申請(qǐng)額外的內(nèi)存
      val memoryCheckPeriod = conf.get(UNROLL_MEMORY_CHECK_PERIOD)
      // Memory currently reserved by this task for this particular unrolling operation
      // 內(nèi)存閾值,開(kāi)始時(shí)等于初始閾值
      var memoryThreshold = initialMemoryThreshold
      // Memory to request as a multiple of current vector size
      // 內(nèi)存增長(zhǎng)因子,每次申請(qǐng)的內(nèi)存是當(dāng)前內(nèi)存的這個(gè)倍數(shù)
      val memoryGrowthFactor = conf.get(UNROLL_MEMORY_GROWTH_FACTOR)
      // Keep track of unroll memory used by this particular block / putIterator() operation
      // 當(dāng)前的塊使用的內(nèi)存大小
      var unrollMemoryUsedByThisBlock = 0L
      
      // Request enough memory to begin unrolling
      // 首先進(jìn)行初始的內(nèi)存申請(qǐng),向MemoryManager申請(qǐng)內(nèi)存
      keepUnrolling =
        reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold, memoryMode)
      
      if (!keepUnrolling) {
        logWarning(s"Failed to reserve initial memory threshold of " +
          s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.")
      } else {
        // 如果成功申請(qǐng)到內(nèi)存,則累加記錄
        unrollMemoryUsedByThisBlock += initialMemoryThreshold
      }
      
      // Unroll this block safely, checking whether we have exceeded our threshold periodically
      // 循環(huán)將每條數(shù)據(jù)寫(xiě)入容器中valuesHolder
      while (values.hasNext && keepUnrolling) {
        valuesHolder.storeValue(values.next())
        // 如果寫(xiě)入數(shù)據(jù)的條數(shù)達(dá)到一個(gè)周期,那么就檢查一下是否需要申請(qǐng)額外的內(nèi)存
        if (elementsUnrolled % memoryCheckPeriod == 0) {
          // 通過(guò)valuesHolder獲取已經(jīng)寫(xiě)入的數(shù)據(jù)的評(píng)估大小
          // 注意,這里的數(shù)據(jù)大小只是估計(jì)值,并不是十分準(zhǔn)確
          // 具體如何進(jìn)行估算的可以看valuesHolder內(nèi)部實(shí)現(xiàn)
          val currentSize = valuesHolder.estimatedSize()
          // If our vector's size has exceeded the threshold, request more memory
          // 如果已寫(xiě)入的數(shù)據(jù)大小超過(guò)了當(dāng)前閾值
          if (currentSize >= memoryThreshold) {
            // 這里每次申請(qǐng)的內(nèi)存量都是不一樣的
            // 每次申請(qǐng)的內(nèi)存是當(dāng)前已使用內(nèi)存的1.5倍(默認(rèn))
            val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong
            keepUnrolling =
              reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode)
            if (keepUnrolling) {
              // 記錄累積申請(qǐng)的內(nèi)存量
              unrollMemoryUsedByThisBlock += amountToRequest
            }
            // New threshold is currentSize * memoryGrowthFactor
            // 目前已經(jīng)向內(nèi)存管理器申請(qǐng)的內(nèi)存量
            memoryThreshold += amountToRequest
          }
        }
        // 記錄插入的數(shù)據(jù)條數(shù)
        elementsUnrolled += 1
      }
      
      // Make sure that we have enough memory to store the block. By this point, it is possible that
      // the block's actual memory usage has exceeded the unroll memory by a small amount, so we
      // perform one final call to attempt to allocate additional memory if necessary.
      // 如果keepUnrolling為true,說(shuō)明順利地將所有數(shù)據(jù)插入,
      // 并未遇到申請(qǐng)內(nèi)存失敗的情況
      if (keepUnrolling) {
        // 將內(nèi)部的數(shù)據(jù)轉(zhuǎn)移到一個(gè)數(shù)組中
        val entryBuilder = valuesHolder.getBuilder()
        // 數(shù)據(jù)在內(nèi)存中的精確大小
        val size = entryBuilder.preciseSize
        // 實(shí)際的大小可能大于申請(qǐng)的內(nèi)存量
        // 因此根據(jù)實(shí)際大小還要再申請(qǐng)額外的內(nèi)存
        if (size > unrollMemoryUsedByThisBlock) {
          val amountToRequest = size - unrollMemoryUsedByThisBlock
          keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode)
          if (keepUnrolling) {
            unrollMemoryUsedByThisBlock += amountToRequest
          }
        }
      
        if (keepUnrolling) {
          // 獲取MemoryEntry對(duì)象,該對(duì)象是對(duì)插入數(shù)據(jù)的包裝
          val entry = entryBuilder.build()
          // Synchronize so that transfer is atomic
          memoryManager.synchronized {
            // 這一步主要是釋放申請(qǐng)的展開(kāi)內(nèi)存
            // 然后申請(qǐng)存儲(chǔ)內(nèi)存
            // 這里需要弄清楚展開(kāi)內(nèi)存的概念
            // 展開(kāi)狀態(tài)指的是對(duì)象在內(nèi)存中處于一種比較松散的狀態(tài),這樣的狀態(tài)方便做一些管理如統(tǒng)計(jì)大小等
            // 而隨后將對(duì)象轉(zhuǎn)移到數(shù)組中,處于一種比較緊實(shí)的狀態(tài),數(shù)組相對(duì)來(lái)說(shuō)占用的額外內(nèi)存是比較小的
            // 一個(gè)數(shù)組只是一個(gè)對(duì)象,只有一個(gè)對(duì)象頭,可以用來(lái)管理大量的對(duì)象
            releaseUnrollMemoryForThisTask(memoryMode, unrollMemoryUsedByThisBlock)
            // 申請(qǐng)存儲(chǔ)內(nèi)存
            val success = memoryManager.acquireStorageMemory(blockId, entry.size, memoryMode)
            assert(success, "transferring unroll memory to storage memory failed")
          }
      
          // 放入map中管理起來(lái)
          entries.synchronized {
            entries.put(blockId, entry)
          }
      
          logInfo("Block %s stored as values in memory (estimated size %s, free %s)".format(blockId,
            Utils.bytesToString(entry.size), Utils.bytesToString(maxMemory - blocksMemoryUsed)))
          Right(entry.size)
        } else {
          // We ran out of space while unrolling the values for this block
          logUnrollFailureMessage(blockId, entryBuilder.preciseSize)
          // 如果失敗,返回已經(jīng)申請(qǐng)的展開(kāi)內(nèi)存
          Left(unrollMemoryUsedByThisBlock)
        }
      } else {
        // We ran out of space while unrolling the values for this block
        logUnrollFailureMessage(blockId, valuesHolder.estimatedSize())
        Left(unrollMemoryUsedByThisBlock)
      }
      }
      

      memoryStore.putIteratorAsBytes

      我們?cè)倏戳硪粋€(gè)方法。套路基本和putIteratorAsValues是一樣一樣的。
      最大的區(qū)別在于ValuesHolder類(lèi)型不同。非序列化形式存儲(chǔ)使用的是DeserializedMemoryEntry,而序列化形式存儲(chǔ)使用的是SerializedMemoryEntry。

      private[storage] def putIteratorAsBytes[T](
        blockId: BlockId,
        values: Iterator[T],
        classTag: ClassTag[T],
        memoryMode: MemoryMode): Either[PartiallySerializedBlock[T], Long] = {
      
      require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
      
      // Initial per-task memory to request for unrolling blocks (bytes).
      val initialMemoryThreshold = unrollMemoryThreshold
      // 字節(jié)數(shù)組的塊大小,默認(rèn)是1m
      val chunkSize = if (initialMemoryThreshold > Int.MaxValue) {
        logWarning(s"Initial memory threshold of ${Utils.bytesToString(initialMemoryThreshold)} " +
          s"is too large to be set as chunk size. Chunk size has been capped to " +
          s"${Utils.bytesToString(Int.MaxValue)}")
        Int.MaxValue
      } else {
        initialMemoryThreshold.toInt
      }
      
      // 字節(jié)數(shù)組的容器
      val valuesHolder = new SerializedValuesHolder[T](blockId, chunkSize, classTag,
        memoryMode, serializerManager)
      
      putIterator(blockId, values, classTag, memoryMode, valuesHolder) match {
        case Right(storedSize) => Right(storedSize)
        case Left(unrollMemoryUsedByThisBlock) =>
          // 部分展開(kāi),部分以序列化形式存儲(chǔ)的block
          Left(new PartiallySerializedBlock(
            this,
            serializerManager,
            blockId,
            valuesHolder.serializationStream,
            valuesHolder.redirectableStream,
            unrollMemoryUsedByThisBlock,
            memoryMode,
            valuesHolder.bbos,
            values,
            classTag))
      }
      }
      

      memoryStore.putBytes

      我們?cè)賮?lái)看另一個(gè)被外部調(diào)用用來(lái)插入數(shù)據(jù)的方法。很簡(jiǎn)單,不說(shuō)了。

      def putBytes[T: ClassTag](
        blockId: BlockId,
        size: Long,
        memoryMode: MemoryMode,
        _bytes: () => ChunkedByteBuffer): Boolean = {
      require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
      // 首先向內(nèi)存管理器申請(qǐng)內(nèi)存
      // 這里申請(qǐng)的是存儲(chǔ)內(nèi)存,因?yàn)橐迦氲淖止?jié)數(shù)組,
      // 所以不需要再展開(kāi),也就不需要申請(qǐng)展開(kāi)內(nèi)存
      if (memoryManager.acquireStorageMemory(blockId, size, memoryMode)) {
        // We acquired enough memory for the block, so go ahead and put it
        val bytes = _bytes()
        assert(bytes.size == size)
        // 這里直接構(gòu)建了一個(gè)SerializedMemoryEntry
        // 并放到map中管理起來(lái)
        val entry = new SerializedMemoryEntry[T](bytes, memoryMode, implicitly[ClassTag[T]])
        entries.synchronized {
          entries.put(blockId, entry)
        }
        logInfo("Block %s stored as bytes in memory (estimated size %s, free %s)".format(
          blockId, Utils.bytesToString(size), Utils.bytesToString(maxMemory - blocksMemoryUsed)))
        true
      } else {
        false
      }
      }
      

      小結(jié)

      通過(guò)對(duì)上面的三個(gè)方法,其實(shí)主要是前兩個(gè)方法的分析,我們發(fā)現(xiàn),除了對(duì)內(nèi)存進(jìn)行簿記管理之外,以及通過(guò)內(nèi)存管理器申請(qǐng)內(nèi)存之外,插入數(shù)據(jù)最主要的工作其實(shí)都是有ValuesHolder對(duì)象來(lái)完成的。
      ValuesHolder特質(zhì)有兩個(gè)實(shí)現(xiàn)類(lèi):DeserializedValuesHolder和SerializedValuesHolder。

      DeserializedValuesHolder

      DeserializedValuesHolder對(duì)象內(nèi)部有兩個(gè)成員:vector,是一個(gè)SizeTrackingVector;arrayValues,是一個(gè)存放值的數(shù)組,用于在所有數(shù)據(jù)插入后,將主句轉(zhuǎn)移到一個(gè)數(shù)組中,方便包裝成一個(gè)MemoryEntry對(duì)象。大部分工作是有SizeTrackingVector完成的。

      private class DeserializedValuesHolder[T] (classTag: ClassTag[T]) extends ValuesHolder[T] {
        // Underlying vector for unrolling the block
        var vector = new SizeTrackingVector[T]()(classTag)
        var arrayValues: Array[T] = null
      
        override def storeValue(value: T): Unit = {
          vector += value
        }
      
        override def estimatedSize(): Long = {
          vector.estimateSize()
        }
      
        override def getBuilder(): MemoryEntryBuilder[T] = new MemoryEntryBuilder[T] {
          // We successfully unrolled the entirety of this block
          arrayValues = vector.toArray
          vector = null
      
          override val preciseSize: Long = SizeEstimator.estimate(arrayValues)
      
          override def build(): MemoryEntry[T] =
            DeserializedMemoryEntry[T](arrayValues, preciseSize, classTag)
        }
      }
      

      SizeTracker

      上面提到的SizeTrackingVector繼承了這個(gè)特質(zhì),除了這個(gè)特質(zhì),還集成了PrimitiveVector類(lèi),但是PrimitiveVector類(lèi)基本上就是對(duì)一個(gè)數(shù)組的簡(jiǎn)單包裝。
      SizeTrackingVector最重要的功能:追蹤對(duì)象的大小,就是在SizeTracker特之中實(shí)現(xiàn)的。

      我大致說(shuō)一下這個(gè)特質(zhì)是如何實(shí)現(xiàn)對(duì)象大小跟蹤和估算的,代碼實(shí)現(xiàn)也并不復(fù)雜,感興趣的可以看一看,限于篇幅這里就不貼了。

      • 每插入一定數(shù)量的數(shù)據(jù)(姑且稱(chēng)之為周期),就會(huì)對(duì)當(dāng)前的對(duì)象進(jìn)行一次取樣,而這個(gè)取樣的周期會(huì)越來(lái)越長(zhǎng),以1.1倍的速率增長(zhǎng);
      • 取樣就是計(jì)算對(duì)象大小,并與前一次取樣作比較,而且只會(huì)保留最近兩次的取樣數(shù)據(jù);
      • 每次取樣其實(shí)就是獲取兩個(gè)數(shù)據(jù),當(dāng)前對(duì)象大小,當(dāng)前插入的數(shù)據(jù)條數(shù);
      • 這樣與上一次取樣一比較,就能夠計(jì)算出每條數(shù)據(jù)的大小了;
      • 最后,在返回整個(gè)對(duì)象大小時(shí),是拿最近一次取樣時(shí)記錄下的對(duì)象大小,以及根據(jù)最近的情況估算的每條數(shù)據(jù)的大小乘以自從上次取樣以來(lái)新插入的數(shù)據(jù)量,二者相加作為對(duì)象大小的估算值,

      可見(jiàn)這么做并不是什么精確,但是由于是抽樣,而且抽樣周期越往后面越長(zhǎng),所以對(duì)于數(shù)據(jù)插入的效率影響很小,而且這種不精確性其實(shí)在后續(xù)的內(nèi)存檢查過(guò)程中是有考慮到的。在所有數(shù)據(jù)插入完的收尾工作中,會(huì)對(duì)對(duì)象大小做一次精確計(jì)算。此外,熟悉spark內(nèi)存管理的同學(xué)應(yīng)該知道,其實(shí)spark一般會(huì)配置一個(gè)安全因子(一般是0.9),也就是說(shuō)只是用配置的內(nèi)存大小的90%,就是為了盡可能地減少這種不精確的內(nèi)存估算造成OOM的可能性。

      SerializedValuesHolder

      private class SerializedValuesHolder[T](
          blockId: BlockId,
          chunkSize: Int,
          classTag: ClassTag[T],
          memoryMode: MemoryMode,
          serializerManager: SerializerManager) extends ValuesHolder[T] {
        val allocator = memoryMode match {
          case MemoryMode.ON_HEAP => ByteBuffer.allocate _
            // 調(diào)用unsafe的本地方法申請(qǐng)直接內(nèi)存
            // 這個(gè)方法之所以沒(méi)有調(diào)用ByteBuffer.allocateDirect方法
            // 是因?yàn)檫@個(gè)方法分配的直接內(nèi)存大小收到參數(shù)MaxDirectMemorySize限制
            // 所以這里繞過(guò)ByteBuffer.allocateDirect方法,通過(guò)反射和unsafe類(lèi)創(chuàng)建直接內(nèi)存對(duì)象
          case MemoryMode.OFF_HEAP => Platform.allocateDirectBuffer _
        }
      
        val redirectableStream = new RedirectableOutputStream
        val bbos = new ChunkedByteBufferOutputStream(chunkSize, allocator)
        redirectableStream.setOutputStream(bbos)
        val serializationStream: SerializationStream = {
          val autoPick = !blockId.isInstanceOf[StreamBlockId]
          val ser = serializerManager.getSerializer(classTag, autoPick).newInstance()
          // 包裝壓縮流和序列化流
          ser.serializeStream(serializerManager.wrapForCompression(blockId, redirectableStream))
        }
      
        // 寫(xiě)入方法,寫(xiě)入的對(duì)象經(jīng)過(guò)序列化,壓縮,
        // 然后經(jīng)過(guò)ChunkedByteBufferOutputStream被分割成一個(gè)個(gè)的字節(jié)數(shù)組塊
        override def storeValue(value: T): Unit = {
          serializationStream.writeObject(value)(classTag)
        }
      
        override def estimatedSize(): Long = {
          bbos.size
        }
      
        override def getBuilder(): MemoryEntryBuilder[T] = new MemoryEntryBuilder[T] {
          // We successfully unrolled the entirety of this block
          serializationStream.close()
      
          override def preciseSize(): Long = bbos.size
      
          override def build(): MemoryEntry[T] =
            SerializedMemoryEntry[T](bbos.toChunkedByteBuffer, memoryMode, classTag)
        }
      }
      

      大概看一下,主要的邏輯很簡(jiǎn)單,這里面有幾個(gè)注意點(diǎn):

      • 對(duì)于直接內(nèi)存分配,spark并沒(méi)有使用jdk的高級(jí)api,而是反射配合unsafe類(lèi)分配直接內(nèi)存,這樣可以繞過(guò)jvm參數(shù)MaxDirectMemorySize的限制,這也體現(xiàn)了spark的作者盡可能的降低用戶(hù)使用難度
      • 另外,我們看到序列化流其實(shí)經(jīng)過(guò)了層層包裝(典型的裝飾器模式),序列化和壓縮以及分塊是比較重要的幾個(gè)點(diǎn),感興趣的話(huà)可以深究,序列化和壓縮如果深入了解都是很大的課題,所以這里也僅僅是蜻蜓點(diǎn)水,不深究了。

      總結(jié)

      MemoryStore.scala這個(gè)文件中乍看代碼有八百多行,但是其實(shí)很大部分代碼是一些輔助類(lèi),比較核心的寫(xiě)入邏輯也就是前面提到的幾個(gè)方法,再加上核心的兩個(gè)類(lèi)DeserializedValuesHolder和SerializedValuesHolder實(shí)現(xiàn)了以對(duì)象或字節(jié)數(shù)組的形式存儲(chǔ)數(shù)據(jù)。

        本站是提供個(gè)人知識(shí)管理的網(wǎng)絡(luò)存儲(chǔ)空間,所有內(nèi)容均由用戶(hù)發(fā)布,不代表本站觀點(diǎn)。請(qǐng)注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購(gòu)買(mǎi)等信息,謹(jǐn)防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請(qǐng)點(diǎn)擊一鍵舉報(bào)。
        轉(zhuǎn)藏 分享 獻(xiàn)花(0

        0條評(píng)論

        發(fā)表

        請(qǐng)遵守用戶(hù) 評(píng)論公約

        類(lèi)似文章 更多