Spark的内存淘汰策略是LRU还是FIFO?

发布于 2022-09-04 05:43:07 字数 4117 浏览 24 评论 0

Spark的内存淘汰策略(eviction strategy)是在Executor存储内存空间不足时置换已经被占用空间的策略。一些资料中提到Spark使用的是LRU,即当存储内存不足时,会用最近最少使用策略对内存中存储的block进行淘汰。(相关文章可以参考Spark MisconceptionsSPARK-14289

但是在阅读源码后发现,相关的两个类MemoryStoreBlockManager中并没有LRU的逻辑:

1.首先可以看到,在MemoryStore中使用一个LinkedHashMap类型的entries变量来记录所有存储的block的信息

// Note: all changes to memory allocations, notably putting blocks, evicting blocks, and
// acquiring or releasing unroll memory, must be synchronized on `memoryManager`!
    
private val entries = new LinkedHashMap[BlockId, MemoryEntry[_]](32, 0.75f, true)

2.在通过MemoryStore获取一个block的时候,如在getValue这个方法中,并没有将被访问的block放入LinkedHashMap的队首。但是我认为LRU最简单的实现方式应该是将被访问过的元素移动到队首。

def getValues(blockId: BlockId): Option[Iterator[_]] = {
    val entry = entries.synchronized { entries.get(blockId) }
    entry match {
        case null => None
        case e: SerializedMemoryEntry[_] =>
        throw new IllegalArgumentException("should only call getValues on deserialized blocks")
        case DeserializedMemoryEntry(values, _, _) =>
        val x = Some(values)
        x.map(_.iterator)
    }
}

3.在淘汰内存中的block去释放空间时,我们可以看到这里其实就是去entries中顺序遍历block进行释放,并没有选择最近最少使用的block,所以这里应该可以认为是先进先出策略(FIFO)

private[spark] def evictBlocksToFreeSpace(
     blockId: Option[BlockId],
     space: Long,
     memoryMode: MemoryMode): Long = {
   assert(space > 0)
   memoryManager.synchronized {
     var freedMemory = 0L
     val rddToAdd = blockId.flatMap(getRddId)
     val selectedBlocks = new ArrayBuffer[BlockId]
     def blockIsEvictable(blockId: BlockId, entry: MemoryEntry[_]): Boolean = {
       entry.memoryMode == memoryMode && (rddToAdd.isEmpty || rddToAdd != getRddId(blockId))
     }
     // This is synchronized to ensure that the set of entries is not changed
     // (because of getValue or getBytes) while traversing the iterator, as that
     // can lead to exceptions.
     entries.synchronized {
       val iterator = entries.entrySet().iterator()
       while (freedMemory < space && iterator.hasNext) {
         val pair = iterator.next()
         val blockId = pair.getKey
         val entry = pair.getValue
         if (blockIsEvictable(blockId, entry)) {
           // We don't want to evict blocks which are currently being read, so we need to obtain
           // an exclusive write lock on blocks which are candidates for eviction. We perform a
           // non-blocking "tryLock" here in order to ignore blocks which are locked for reading:
           if (blockInfoManager.lockForWriting(blockId, blocking = false).isDefined) {
             selectedBlocks += blockId
             freedMemory += pair.getValue.size
           }
         }
       }
     }
     ...
     if (freedMemory >= space) {
       logInfo(s"${selectedBlocks.size} blocks selected for dropping " +
         s"(${Utils.bytesToString(freedMemory)} bytes)")
       for (blockId <- selectedBlocks) {
         val entry = entries.synchronized { entries.get(blockId) }
         // This should never be null as only one task should be dropping
         // blocks and removing entries. However the check is still here for
         // future safety.
         if (entry != null) {
           dropBlock(blockId, entry)
         }
       }
      ...
     }
   }
 }

那么,难道官方的解释有问题?Spark的内存淘汰策略到底是LRU还是FIFO?
另外,我在stackoverflow上也提了这个问题,有兴趣者也可以去上面回答
What's the the current eviction strategy of Spark? FIFO or LRU?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文