shidenggui / blog

Reading makes thought sharper, writing makes thought clearer
https://shidenggui.com
50 stars 2 forks source link

LRUCache 的各种实现 #17

Open shidenggui opened 5 years ago

shidenggui commented 5 years ago

LRUCache 的实现

缘起

 刷 leetcode 的时候碰到的这道题。LRUCache 在现实中也经常用到:

要求

   Leetcode 题目要求如下

Design and implement a data structure for Least Recently Used (LRU) cache. It should support the following operations: get and put.

get(key) - Get the value (will always be positive) of the key if the key exists in the cache, otherwise return -1.
put(key, value) - Set or insert the value if the key is not already present. When the cache reached its capacity, it should invalidate the least recently used item before inserting a new item.

Follow up:
Could you do both operations in O(1) time complexity?

Example:

LRUCache cache = new LRUCache( 2 /* capacity */ );

cache.put(1, 1);
cache.put(2, 2);
cache.get(1);       // returns 1
cache.put(3, 3);    // evicts key 2
cache.get(2);       // returns -1 (not found)
cache.put(4, 4);    // evicts key 1
cache.get(1);       // returns -1 (not found)
cache.get(3);       // returns 3
cache.get(4);       // returns 4

  关键点在于 getput 操作的时间复杂度都需为 O(1)。

实现

初版

  为了实现 O(1) 的复杂度,使用哈希表来存储对应的元素。然后通过双向链表来实现 lru cache 相关的逻辑,

# 使用 双向链表和 hashmap 的组合来实现 get,put 都为 O(1) 的复杂度,空间复杂度为 O(N)
class Node:
    def __init__(self, key, value, prev=None, next=None):
        self.key = key
        self.value = value
        self.prev = prev
        self.next = next

class LRUCache:

    def __init__(self, capacity: int):
        self.capacity = capacity
        self._map = {}
        # linked list with head
        self._list = Node(None, None)
        # 使用 last 指针来加快删除最后一个节点的速度
        # 在以下情况需要更新 last 指针
        # 1. 第一次插入数据时,需要将 last 指针指向新插入的节点
        # 2. 删除最后一个节点时,需要将 last 指针前移
        # 3. 当 get 或者 put 命中节点需要移动到链表顶端时。
        #    如果需要移动的节点正好是最后一个节点(这时候 last 指针才有可能发生变化)
        #    而且 last 指针之前不是头结点(如果是的话,移动后该节点还是尾节点,不需要变动 last 指针)
        self._last = self._list
        self.size = 0

    def get(self, key: int) -> int:
        if key not in self._map:
            return -1
        node = self._map[key]
        self._move_to_first(node)
        return node.value

    def _move_to_first(self, node):
        # 当移动的节点为最后一个节点,且该节点之前不为头结点
        # 将 last 指针前移    
        if node == self._last and node.prev != self._list:
            self._last = node.prev
        # 首先删除当前节点,要额外考虑该节点为最后一个节点,
        # 此时不需要调整之后节点的 prev 指针
        node.prev.next = node.next
        if node.next is not None:
            node.next.prev = node.prev
        self._insert_to_first(node)            

    def _insert_to_first(self, node):
        node.next = self._list.next
        node.prev = self._list
        self._list.next = node
        # 插入到头结点之后,如果该节点不是最后一个节点
        # 同样要调整之后节点的 prev 指针
        if node.next:
            node.next.prev = node

    def put(self, key: int, value: int) -> None:
        if key in self._map:
            node = self._map[key]
            node.value = value
            self._move_to_first(node)
            return
        self.size += 1
        node = Node(key, value)
        self._map[key] = node
        if self.size > self.capacity:
            # 直接根据 last 指针删除最后一个节点,然后前移 last 指针
            self._last.prev.next = None
            del self._map[self._last.key]
            self._last = self._last.prev

        # 如果插入时为空链表,设置 last 指针
        if self._list.next is None:
            self._last = node
        self._insert_to_first(node)        

Leetcode 经典实现

  初版实现的方法,需要不断判断删除,移动的是不是尾部指针,引入了很多不必要的 if 判断。而 Leetcode 讨论区里面提出了一个更好的方法。

  原本我们通过引入 Dummy Head 已经简化了头部相关的操作,这里额外再引入一个 Dummy tail ,这样的话在移动删除尾部节点的时候就不需要额外判断了。

# 使用 双向链表和 hashmap 的组合来实现 get,put 都为 O(1) 的复杂度,空间复杂度为 O(N)
class Node:
    def __init__(self, key, value, prev=None, next=None):
        self.key = key
        self.value = value
        self.prev = prev
        self.next = next

class LRUCache:

    def __init__(self, capacity: int):
        self.capacity = capacity
        self._map = {}
        # linked list with dummy head and tail
        self._list = Node(None, None)
        self._last = Node(None, None)
        self._list.next = self._last
        self.size = 0

    def _delete_node(self, node):
        node.prev.next = node.next
        node.next.prev = node.prev

    def get(self, key: int) -> int:
        if key not in self._map:
            return -1
        node = self._map[key]
        self._move_to_first(node)
        return node.value

    def _move_to_first(self, node):
        self._delete_node(node)
        self._insert_to_first(node)            

    def _insert_to_first(self, node):
        node.next = self._list.next
        node.prev = self._list

        node.next.prev = node
        self._list.next = node

    def put(self, key: int, value: int) -> None:
        if key in self._map:
            node = self._map[key]
            node.value = value
            self._move_to_first(node)
            return
        self.size += 1
        node = Node(key, value)
        self._map[key] = node
        if self.size > self.capacity:
            del self._map[self._last.prev.key]
            self._delete_node(self._last.prev)

        self._insert_to_first(node)        

# Your LRUCache object will be instantiated and called as such:
# obj = LRUCache(capacity)
# param_1 = obj.get(key)
# obj.put(key,value)

Python 内置的 LRUCache 实现

  但是上面的方式还有一个问题,就是在 lru_cache 满了的时候,此时新增一个节点,会导致需要从链表中删除一个尾部的旧节点,然后同时在头部插入一个新节点。

  有没有办法直接使用旧的删除的节点来代替新增的节点呢?这样在 LRUCache 满了的时候,put 新元素的性能会获得很大的提升。

  而 Python 内部实现正是考虑了这一点,利用了带头结点root的循环双向链表,避免了该问题。

下面是 Pythonlru_cache 的实现,因为原实现是装饰器,这里略作修改为类的实现:

# 使用 双向链表和 hashmap 的组合来实现 get,put 都为 O(1) 的复杂度,空间复杂度为 O(N)
PREV, NEXT, KEY, RESULT = 0, 1, 2, 3

class LRUCache:

    def __init__(self, capacity: int):
        self.capacity = capacity
        self._cache = {}
        # circular queue with root
        self.root = []
        self.root[:] = [self.root, self.root, None, None]
        self.size = 0

    def get(self, key: int) -> int:
        if key not in self._cache:
            return -1
        node = self._cache[key]
        self._move_to_front(node)

        return node[RESULT]

    def _delete_node(self, node):
        node[PREV][NEXT] = node[NEXT]
        node[NEXT][PREV] = node[PREV]

    def _insert_to_front(self, node):
        node[NEXT] = self.root
        node[PREV] = self.root[PREV]
        node[PREV][NEXT] = node[NEXT][PREV] = node

    def _move_to_front(self, node):
        self._delete_node(node)
        self._insert_to_front(node)            

    def put(self, key: int, value: int) -> None:
        if key in self._cache:
            node = self._cache[key]
            node[RESULT] = value
            self._move_to_front(node)
            return
        self.size += 1
        if self.size > self.capacity:
            # 直接使用 root 节点作为新节点,然后通过将 root[NEXT] 的待删除节点设为新的 root 节点,避免了删除和分配新节点的消耗
            self.root[KEY] = key
            self.root[RESULT] = value
            self._cache[key] = self.root

            self.root = self.root[NEXT]
            del self._cache[self.root[KEY]]
            return

        node = [None, None, key, value]
        self._cache[key] = node
        self._insert_to_front(node)        

redis 的 LRU 淘汰算法的实现

  因为想到 redis 也实现了 lru cache,就抽空看了下源码,发现跟想象中非常不一样,并不是常规的实现方式。

  当 redis 达到设置的 maxmemory,会从所有key 中随机抽样 5 个值,然后计算它们的 idle time,插入一个长度为 16 的待淘汰数组中,数组中的 entry 根据 idle time 升序排列,最右侧的就是接下来第一个被淘汰的。淘汰后如果内存还是不满足需要,则继续随机抽取 key 并循环以上过程。

因为是随机抽样,所以分为以下情况

  关键实现逻辑如下:

    while (mem_freed < mem_tofree) {
          sds bestkey = NULL;
          struct evictionPoolEntry *pool = EvictionPoolLRU;
           while(bestkey == NULL) {  
                evictionPoolPopulate(i, dict, db->dict, pool);

              /* Go backward from best to worst element to evict. */
                for (k = EVPOOL_SIZE-1; k >= 0; k--) {
                                        if (pool[k].key == NULL) continue;
                                            de = dictFind(server.db[pool[k].dbid].dict,
                            pool[k].key);

                                        /* Remove the entry from the pool. */
                    if (pool[k].key != pool[k].cached)
                        sdsfree(pool[k].key);
                     bestkey = dictGetKey(de);
                        break;
                }
           }

优缺点

优点

  1. LRU 实现简单,getput 时间复杂度都为 O(1)
  2. 利用了 locality ,即最近使用的数据很可能再次被使用
  3. 能更快的对短期行为作出反应

缺点

  1. long scan 的时候,会导致 lru 不断发生 evcit 行为。(数据库操作,从磁盘加载文件等,LFU 避免了该行为)

  2. 只利用了到了一部分的 locality,没有利用 最经常使用的数据很可能再次被使用(LFU 做到了,但是更慢,Log(N))

B 站源码解析之 LRUCache 实现

  在这篇文章还没完稿的时候,看到了 B 站的 LRUCache 的源码实现,下面就顺便来分析一下。下面是对应的源码:

package lrucache

// 这里是 Node 节点的定义,中规中矩
// Element - node to store cache item
type Element struct {
    prev, next *Element
    Key        interface{}
    Value      interface{}
}

// Next - fetch older element
func (e *Element) Next() *Element {
    return e.next
}

// Prev - fetch newer element
func (e *Element) Prev() *Element {
    return e.prev
}

// 通过这个结构体,猜测使用的是跟 Leetcode 经典实现类似的带头尾伪节点的链表实现。
// 后面发现实际上并不是
// LRUCache - a data structure that is efficient to insert/fetch/delete cache items [both O(1) time complexity]
type LRUCache struct {
    cache    map[interface{}]*Element
    head     *Element
    tail     *Element
    capacity int
}

// New - create a new lru cache object
func New(capacity int) *LRUCache {
    // 可以看到初始化 LRUCache 时,并没有初始化 Dummy head 和 Dummy tail
    // 不过这样岂不是在操作过程中需要进行很多 if 判断,后面的代码也验证了这点。
    return &LRUCache{make(map[interface{}]*Element), nil, nil, capacity}
}

// Put - put a cache item into lru cache
func (lc *LRUCache) Put(key interface{}, value interface{}) {
    if e, ok := lc.cache[key]; ok {
        e.Value = value
        // 等同之前的 _insert_to_front
        lc.refresh(e)
        return
    }

    if lc.capacity == 0 {
        return
    } else if len(lc.cache) >= lc.capacity {
        // evict the oldest item
        delete(lc.cache, lc.tail.Key)
        // 等同之前的 _delete_node
        lc.remove(lc.tail)
    }

    e := &Element{nil, lc.head, key, value}
    lc.cache[key] = e
    // 因为没有 Dummy head 和 Dummy tail 而带来的不必要的 if 判断
    if len(lc.cache) != 1 {
        lc.head.prev = e
    } else {
        lc.tail = e
    }
    lc.head = e
}

// Get - get value of key from lru cache with result
func (lc *LRUCache) Get(key interface{}) (interface{}, bool) {
    if e, ok := lc.cache[key]; ok {
        lc.refresh(e)
        return e.Value, ok
    }
    return nil, false
}

func (lc *LRUCache) refresh(e *Element) {
    // 可以看到这里很多的 if 判断,严重干扰了代码的阅读逻辑。
    if e.prev != nil {
        e.prev.next = e.next
        if e.next == nil {
            lc.tail = e.prev
        } else {
            e.next.prev = e.prev
        }
        e.prev = nil
        e.next = lc.head
        lc.head.prev = e
        lc.head = e
    }
}

func (lc *LRUCache) remove(e *Element) {
    // 可以看到这里很多的 if 判断,严重干扰了代码的阅读逻辑。
    if e.prev == nil {
        lc.head = e.next
    } else {
        e.prev.next = e.next
    }
    if e.next == nil {
        lc.tail = e.prev
    } else {
        e.next.prev = e.prev
    }
}

  怎么说呢,有点失望。一开始在看到结构定义了 head 、tail时以为是使用了 Dummy head 和 Dummy tail 的经典实现,但是在初始化时发现没有初始化对应的 head、tail,以为是使用了一种未知的新方法,但是一看 refresh 和 remove 的逻辑,发现是通过大量的 if、else 来判断 corner cases。

  而大量使用 if 会严重干扰代码的可读性和可维护性,具体可见 Applying the Linus Torvalds “ Good Taste ” Coding Requirement 这篇文章。

Golang/groupcache LRUCache 实现

  跟网友的讨论中,有人又贴出了 google 的一版实现,在 Golang/groupcache 项目下。就顺便看了下对应的源码。

// Package lru implements an LRU cache.
package lru

import "container/list"

// Cache is an LRU cache. It is not safe for concurrent access.
type Cache struct {
    // MaxEntries is the maximum number of cache entries before
    // an item is evicted. Zero means no limit.
    MaxEntries int

    // OnEvicted optionally specifies a callback function to be
    // executed when an entry is purged from the cache.
    OnEvicted func(key Key, value interface{})

    ll    *list.List
    cache map[interface{}]*list.Element
}

// Add adds a value to the cache.
func (c *Cache) Add(key Key, value interface{}) {
    if c.cache == nil {
        c.cache = make(map[interface{}]*list.Element)
        c.ll = list.New()
    }
    if ee, ok := c.cache[key]; ok {
        c.ll.MoveToFront(ee)
        ee.Value.(*entry).value = value
        return
    }
    ele := c.ll.PushFront(&entry{key, value})
    c.cache[key] = ele
    if c.MaxEntries != 0 && c.ll.Len() > c.MaxEntries {
        c.RemoveOldest()
    }
}

// Get looks up a key's value from the cache.
func (c *Cache) Get(key Key) (value interface{}, ok bool) {
    if c.cache == nil {
        return
    }
    if ele, hit := c.cache[key]; hit {
        c.ll.MoveToFront(ele)
        return ele.Value.(*entry).value, true
    }
    return
}

   关键点在于 container/list ,这是一个带头结点的循环双向链表,但是并没有暴露 root 节点,所以 google 的实现同 Leetcode 经典实现是一致的。   我还发了一个 issue 去询问为什么不采用类似 Python 的实现。官方的回答是目前够用,如果需要变更的话,需要 benchmark 的支持。理论上 Python 的实现在不断读取新数值的时候性能会好很多。

  综合下来 Python 内置库 functools.lru_cache 的带头结点的双向循环队列的实现是最优雅的

yihong0618 commented 5 years ago

学到了,感谢作者。