Spark的内存淘汰策略是LRU还是FIFO?
Spark的内存淘汰策略(eviction strategy)是在Executor存储内存空间不足时置换已经被占用空间的策略。一些资料中提到Spark使用的是LRU,即当存储内存不足时,会用最近最少使用策略对内存中存储的block进行淘汰。(相关文章可以参考Spark Misconceptions和SPARK-14289)
但是在阅读源码后发现,相关的两个类MemoryStore和BlockManager中并没有LRU的逻辑:
1.首先可以看到,在MemoryStore中使用一个LinkedHashMap类型的entries变量来记录所有存储的block的信息
// Note: all changes to memory allocations, notably putting blocks, evicting blocks, and
// acquiring or releasing unroll memory, must be synchronized on `memoryManager`!
private val entries = new LinkedHashMap[BlockId, MemoryEntry[_]](32, 0.75f, true)
2.在通过MemoryStore获取一个block的时候,如在getValue这个方法中,并没有将被访问的block放入LinkedHashMap的队首。但是我认为LRU最简单的实现方式应该是将被访问过的元素移动到队首。
def getValues(blockId: BlockId): Option[Iterator[_]] = {
val entry = entries.synchronized { entries.get(blockId) }
entry match {
case null => None
case e: SerializedMemoryEntry[_] =>
throw new IllegalArgumentException("should only call getValues on deserialized blocks")
case DeserializedMemoryEntry(values, _, _) =>
val x = Some(values)
x.map(_.iterator)
}
}
3.在淘汰内存中的block去释放空间时,我们可以看到这里其实就是去entries中顺序遍历block进行释放,并没有选择最近最少使用的block,所以这里应该可以认为是先进先出策略(FIFO)
private[spark] def evictBlocksToFreeSpace(
blockId: Option[BlockId],
space: Long,
memoryMode: MemoryMode): Long = {
assert(space > 0)
memoryManager.synchronized {
var freedMemory = 0L
val rddToAdd = blockId.flatMap(getRddId)
val selectedBlocks = new ArrayBuffer[BlockId]
def blockIsEvictable(blockId: BlockId, entry: MemoryEntry[_]): Boolean = {
entry.memoryMode == memoryMode && (rddToAdd.isEmpty || rddToAdd != getRddId(blockId))
}
// This is synchronized to ensure that the set of entries is not changed
// (because of getValue or getBytes) while traversing the iterator, as that
// can lead to exceptions.
entries.synchronized {
val iterator = entries.entrySet().iterator()
while (freedMemory < space && iterator.hasNext) {
val pair = iterator.next()
val blockId = pair.getKey
val entry = pair.getValue
if (blockIsEvictable(blockId, entry)) {
// We don't want to evict blocks which are currently being read, so we need to obtain
// an exclusive write lock on blocks which are candidates for eviction. We perform a
// non-blocking "tryLock" here in order to ignore blocks which are locked for reading:
if (blockInfoManager.lockForWriting(blockId, blocking = false).isDefined) {
selectedBlocks += blockId
freedMemory += pair.getValue.size
}
}
}
}
...
if (freedMemory >= space) {
logInfo(s"${selectedBlocks.size} blocks selected for dropping " +
s"(${Utils.bytesToString(freedMemory)} bytes)")
for (blockId <- selectedBlocks) {
val entry = entries.synchronized { entries.get(blockId) }
// This should never be null as only one task should be dropping
// blocks and removing entries. However the check is still here for
// future safety.
if (entry != null) {
dropBlock(blockId, entry)
}
}
...
}
}
}
那么,难道官方的解释有问题?Spark的内存淘汰策略到底是LRU还是FIFO?
另外,我在stackoverflow上也提了这个问题,有兴趣者也可以去上面回答
What's the the current eviction strategy of Spark? FIFO or LRU?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论