shownb / shownb.github.com

shownb.github.io
shownb.github.io
5 stars 1 forks source link

Bigcache代码分析 #41

Open shownb opened 5 years ago

shownb commented 5 years ago

随着1.9的发布,sync的map性能已经不差

分片

将整个缓存分成独立的 N 块, 然后用 hash(key) % (N-1)的计算方法来决定存入哪一块. 这样可以每一块各有自己的 RWMutex 锁. 与分片有关的代码可简化如下: https://github.com/allegro/bigcache/blob/master/bigcache.go

type BigCache struct {
    shards       []*cacheShard //每一个cacheShard 中都有一个锁
    lifeWindow   uint64
    clock        clock
    hash         Hasher //Hasher是一个接口,方便以后换更快的计算方法吗?-_-!!
    config       Config
    shardMask    uint64 //len(shards) - 1
    maxShardSize uint32
    close        chan struct{}
}

func (c *BigCache) Get(key string) ([]byte, error) {
    hashedKey := c.hash.Sum64(key) //string -> uint64
    shard := c.getShard(hashedKey) //接下来, 从这个shard中获取想要的value.
    return shard.get(key, hashedKey)
}

// Set saves entry under the key
func (c *BigCache) Set(key string, entry []byte) error {
    hashedKey := c.hash.Sum64(key)
    shard := c.getShard(hashedKey)
    return shard.set(key, hashedKey, entry)
}

// Delete removes the key
func (c *BigCache) Delete(key string) error {
    hashedKey := c.hash.Sum64(key)
    shard := c.getShard(hashedKey)
    return shard.del(key, hashedKey)
}

func (c *BigCache) getShard(hashedKey uint64) (shard *cacheShard) {
    return c.shards[hashedKey&c.shardMask]
}

用 map 储存 key

绕了这么大一圈子, 最后不还是用了 map 吗? 其实还是不太一样的

使用这个方法的目的源自 Go 1.5 的一个优化: 如果 map 的 key 或 value 中都不含指针, GC 便会忽略这个 map. map O(1) 的效率还是很吸引人的, 所以只要 KV 都不含指针, map 便非常值得一用.

但是, 只有数字和 bool 是与指针无关的. 难不成我们这个缓存系统只支持 int -> int 的 KV 吗? 这不现实. 于是作者想了一个曲线救国的方法: 所有的 value 都保存在一个 bytes queue 里(下文会说), 然后保存这个value的头部所在的索引值,通过索引值来访问, 索引值当然是 int 类型; 而string类型的 key 值只要 hash 一下就能变成 int. 这样 key 和 value 就都转换成 int 类型了.

而且这样需要用 map 保存的值也非常小, 与保存整个 KV 相比, 占用空间也小多了.

综上, 每一个缓存分片里都会有一个 map[int]int 来保存 " hash(key) -> valueIndex" 的关系, 并且每个分片里都会有一个 bytes queue 来储存 value. 与这部分有关的源码简化如下: https://github.com/allegro/bigcache/blob/master/shard.go

type cacheShard struct {
    hashmap map[uint64]uint32 // key -> value index
    entries queue.BytesQueue  // bytes queue
    // ...
}

可以注意到, Get/Set 时并不是直接操作 value 而是操作用 key和value和当前时间戳打包而成的一个"entry".

打包的结构其实很简单, 就是在原 value 的前面加了一个 header, 此 header的内容是当前的 timestamp 和当前 value 的长度. 整个结构可以简单表示为 fmt.Sprintf("%d%d%s", currentTimestamp, len(value), value), 当然实际代码中为了保持效率不会这样写, 而是使用 binary 库直接操作 []byte.

这样做的目的也很明显, 就是保存这个 value 被保存时的时间戳和长度. 使用时间戳来实现到期后的 "expire" 功能; 使用长度来维护在 byte queue 中的索引位置.

Bytes queue

前文已多次提到, 所有的 value 都是存在一个 bytes queue 里的.

其核心思路是将打包后的value存入 []byte, 然后维护 head, tail. capacity, count 等值. 具体的实现上没太多可说的, 就是一个很标准的数据结构课上所学的 queue. 实现了 Reset, Push, Pop, Peek, Capacity, Len 等一个 queue 必备的基本操作.

Bytes queue 在初始化时会输入一个最大尺寸 maxCacacity, 而真正用来存数据的数组并不会直接初始化为这个最大尺寸, 而是先初始化到一个比较小的值, 在使用的过程中动态扩容. 每次扩容的容量都是前一次的两倍(直到最大尺寸). 下面是与扩容相关的简化代码: https://github.com/allegro/bigcache/blob/master/queue/bytes_queue.go

其它

BigCache 在进行 hash(key) 时, 使用的 hash 算法并不是 md5, sha1 等常见的, 而是 FNV 算法. 这个算法更快, 更省内存, 适合于这种生成 hash key 的场景. 而且实现非常简单, 源码中的 64 bit 实现如下: https://github.com/allegro/bigcache/blob/master/fnv.go

type fnv64a struct{}

const (
    // offset64 FNVa offset basis. See https://en.wikipedia.org/wiki/Fowler–Noll–Vo_hash_function#FNV-1a_hash
    offset64 = 14695981039346656037
    // prime64 FNVa prime value. See https://en.wikipedia.org/wiki/Fowler–Noll–Vo_hash_function#FNV-1a_hash
    prime64 = 1099511628211
)

// Sum64 gets the string and returns its uint64 hash value.
func (f fnv64a) Sum64(key string) uint64 {
    var hash uint64 = offset64
    for i := 0; i < len(key); i++ {
        hash ^= uint64(key[i])
        hash *= prime64
    }

    return hash
}

参考 http://45.63.122.37/2016/11/18/用go实现一个very-fast-k-v-缓存/

shownb commented 5 years ago

https://github.com/allegro/bigcache/blob/d3696373dd1331cdc65e20cea65d351bee988a8e/bigcache.go

package bigcache

import (
    "log"
    "sync"

    "github.com/allegro/bigcache/queue"
)

const (
    minimumEntriesInShard = 10 // 单个分片里面,最小的条数
)

type BigCache struct {
    shards     []*cacheShard
    lifeWindow uint64
    clock      clock
    hash       hash
    config     Config
}

type cacheShard struct {
    hashmap map[uint64]uint32
    entries queue.BytesQueue
    lock    sync.RWMutex
}

// NewBigCache 初始化一个BigCache
func NewBigCache(config Config) *BigCache {
    return newBigCache(config, &systemClock{})
}

func newBigCache(config Config, clock clock) *BigCache {
    cache := &BigCache{
        shards:     make([]*cacheShard, config.Shards),
        lifeWindow: uint64(config.LifeWindow.Seconds()),
        clock:      clock,
        hash:       fnv64a{},
        config:     config,
    }
    // config.MaxEntriesInWindow 在生命周期窗口最大的entry的数量. 在每个分片里面分配合适的entry的大小. 设置得合适的话 cache 不会再分配额外的内存
    shardSize := max(config.MaxEntriesInWindow/config.Shards, minimumEntriesInShard)
    for i := 0; i < config.Shards; i++ {
        cache.shards[i] = &cacheShard{
            hashmap: make(map[uint64]uint32, shardSize),
            entries: *queue.NewBytesQueue(shardSize*config.MaxEntrySize, config.Verbose),
        }
    }
    return cache
}

// Get reads entry for the key
func (c *BigCache) Get(key string) ([]byte, error) {
    hashedKey := c.hash.sum(key)
    shard := c.getShard(hashedKey)
    shard.lock.RLock()
    defer shard.lock.RUnlock()

    itemIndex := shard.hashmap[hashedKey]

    if itemIndex == 0 {
        return nil, notFound(key)
    }

    wrappedEntry, err := shard.entries.Get(int(itemIndex))
    if err != nil {
        return nil, err
    }
    if entryKey := readKeyFromEntry(wrappedEntry); key != entryKey {
        if c.config.Verbose {
            log.Printf("Collision detected. Both %q and %q has same hash %x", key, entryKey, hashedKey)
        }
        return nil, notFound(key)
    }
    return readEntry(wrappedEntry), nil
}

// Set saves entry under the key
func (c *BigCache) Set(key string, entry []byte) {
    hashedKey := c.hash.sum(key)
    shard := c.getShard(hashedKey)
    shard.lock.Lock()
    defer shard.lock.Unlock()

    currentTimestamp := uint64(c.clock.epoch())

    if previousIndex := shard.hashmap[hashedKey]; previousIndex != 0 {
        if previousEntry, err := shard.entries.Get(int(previousIndex)); err == nil {
            resetKeyFromEntry(previousEntry)
        }
    }

    if oldestEntry, err := shard.entries.Peek(); err == nil {
        c.onEvict(oldestEntry, currentTimestamp, func() {
            shard.entries.Pop()
            hash := readHashFromEntry(oldestEntry)
            delete(shard.hashmap, hash)
        })
    }

    w := wrapEntry(currentTimestamp, hashedKey, key, entry)
    index := shard.entries.Push(w)
    shard.hashmap[hashedKey] = uint32(index)
}

func (c *BigCache) onEvict(oldestEntry []byte, currentTimestamp uint64, evict func()) {
    oldestTimestamp := readTimestampFromEntry(oldestEntry)
    if currentTimestamp-oldestTimestamp > c.lifeWindow {
        evict()
    }
}

func (c *BigCache) getShard(hashedKey uint64) (shard *cacheShard) {
    shardKey := hashedKey % uint64(len(c.shards))
    return c.shards[shardKey]
}

func max(a, b int) int {
    if a > b {
        return a
    }
    return b
}
shownb commented 5 years ago

https://github.com/allegro/bigcache/blob/d3696373dd1331cdc65e20cea65d351bee988a8e/queue/bytes_queue.go

package queue

import (
    "encoding/binary"
    "log"
    "time"
)

const (
    headerEntrySize = 4 // Entry的头大小,Entry头是用来记录entry长度的
    leftMarginIndex = 1 // Bytes before left margin are not used. Zero index means element does not exist in queue, useful while reading slice from index
)

//BytesQueue是一个用golang array实现的一个非线程安全先进先出队列类型
// For every push operation index of entry is returned. It can be used to read the entry later
type BytesQueue struct {
    array        []byte
    capacity     int
    head         int
    tail         int
    count        int
    rightMargin  int
    headerBuffer []byte
    verbose      bool
}

type queueError struct {
    message string
}

// NewBytesQueue initialize new bytes queue.
// Initial capacity is used in bytes array allocation
// When verbose flag is set then information about memory allocation are printed
func NewBytesQueue(initialCapacity int, verbose bool) *BytesQueue {
    return &BytesQueue{
        array:        make([]byte, initialCapacity),
        capacity:     initialCapacity,
        headerBuffer: make([]byte, headerEntrySize),
        tail:         leftMarginIndex,
        head:         leftMarginIndex,
        verbose:      verbose,
    }
}

// Push copies entry at the end of queue and moves tail pointer. Allocates more space if needed.
// Returns index for pushed data
func (q *BytesQueue) Push(data []byte) int {
    dataLen := len(data)

    if q.availableSpaceAfterTail() < dataLen+headerEntrySize {
        if q.availableSpaceBeforeHead() >= dataLen+headerEntrySize {
            q.tail = leftMarginIndex
        } else {
            q.allocateAdditionalMemory()
        }
    }

    index := q.tail

    q.push(data, dataLen)

    return index
}

func (q *BytesQueue) allocateAdditionalMemory() {
    start := time.Now()
    q.capacity = q.capacity * 2
    newArray := make([]byte, q.capacity)

    copy(newArray[leftMarginIndex:], q.array[q.head:q.rightMargin])
    newTail := q.rightMargin - q.head + leftMarginIndex
    if q.tail <= q.head {
        copy(newArray[newTail:], q.array[leftMarginIndex:q.tail])
        newTail += q.tail - leftMarginIndex
    }

    if q.verbose {
        log.Printf("Allocated new queue. Took: %dms, Capacity: %d \n", time.Since(start)/time.Millisecond, q.capacity)
    }

    q.array = newArray
    q.head = leftMarginIndex
    q.tail = newTail
    q.rightMargin = newTail
}

func (q *BytesQueue) push(data []byte, len int) {
    binary.LittleEndian.PutUint32(q.headerBuffer, uint32(len))
    q.copy(q.headerBuffer, headerEntrySize)

    q.copy(data, len)

    if q.tail > q.head {
        q.rightMargin = q.tail
    }

    q.count++
}

func (q *BytesQueue) copy(data []byte, len int) {
    q.tail += copy(q.array[q.tail:], data[:len])
}

// Pop reads the oldest entry from queue and moves head pointer to the next one
func (q *BytesQueue) Pop() ([]byte, error) {
    if q.count == 0 {
        return nil, &queueError{"Empty queue"}
    }

    data, size := q.peek(q.head)

    q.head += headerEntrySize + size
    q.count--

    if q.head == q.rightMargin {
        q.head = leftMarginIndex
        if q.tail == q.rightMargin {
            q.tail = leftMarginIndex
        }
        q.rightMargin = q.tail
    }

    return data, nil
}

// Peek reads the oldest entry from list without moving head pointer
func (q *BytesQueue) Peek() ([]byte, error) {
    if q.count == 0 {
        return nil, &queueError{"Empty queue"}
    }

    data, _ := q.peek(q.head)

    return data, nil
}

// Get reads entry from index
func (q *BytesQueue) Get(index int) ([]byte, error) {
    if index <= 0 {
        return nil, &queueError{"Index must be grater than zero. Invalid index."}
    }

    data, _ := q.peek(index)
    return data, nil
}

// Capacity returns number of allocated bytes for queue
func (q *BytesQueue) Capacity() int {
    return q.capacity
}

// Len returns number of entries kept in queue
func (q *BytesQueue) Len() int {
    return q.count
}

// Error returns error message
func (e *queueError) Error() string {
    return e.message
}

func (q *BytesQueue) peek(index int) ([]byte, int) {
    blockSize := int(binary.LittleEndian.Uint32(q.array[index : index+headerEntrySize]))
    return q.array[index+headerEntrySize : index+headerEntrySize+blockSize], blockSize
}

func (q *BytesQueue) availableSpaceAfterTail() int {
    if q.tail >= q.head {
        return q.capacity - q.tail
    }
    return q.head - q.tail
}

func (q *BytesQueue) availableSpaceBeforeHead() int {
    if q.tail >= q.head {
        return q.head - leftMarginIndex
    }
    return q.head - q.tail
}
shownb commented 5 years ago

https://github.com/allegro/bigcache/blob/d3696373dd1331cdc65e20cea65d351bee988a8e/encoding.go

package bigcache

import (
    "encoding/binary"
)

const (
    timestampSizeInBytes = 8                                                       // Number of bytes used for timestamp
    hashSizeInBytes      = 8                                                       // Number of bytes used for hash
    keySizeInBytes       = 2                                                       // Number of bytes used for size of entry key
    headersSizeInBytes   = timestampSizeInBytes + hashSizeInBytes + keySizeInBytes // Number of bytes used for all headers
)

func wrapEntry(timestamp uint64, hash uint64, key string, entry []byte) []byte {
    var blob []byte
    keyLength := len(key)
    blob = make([]byte, len(entry)+headersSizeInBytes+keyLength)
    binary.LittleEndian.PutUint64(blob, timestamp)
    binary.LittleEndian.PutUint64(blob[timestampSizeInBytes:], hash)
    binary.LittleEndian.PutUint16(blob[timestampSizeInBytes+hashSizeInBytes:], uint16(keyLength))
    copy(blob[headersSizeInBytes:], []byte(key))
    copy(blob[headersSizeInBytes+keyLength:], entry)

    return blob
}

func readEntry(data []byte) []byte {
    length := binary.LittleEndian.Uint16(data[timestampSizeInBytes+hashSizeInBytes:])
    return data[headersSizeInBytes+length:]
}

func readTimestampFromEntry(data []byte) uint64 {
    return binary.LittleEndian.Uint64(data)
}

func readKeyFromEntry(data []byte) string {
    length := binary.LittleEndian.Uint16(data[timestampSizeInBytes+hashSizeInBytes:])
    return string(data[headersSizeInBytes : headersSizeInBytes+length])
}

func readHashFromEntry(data []byte) uint64 {
    return binary.LittleEndian.Uint64(data[timestampSizeInBytes:])
}

func resetKeyFromEntry(data []byte) {
    binary.LittleEndian.PutUint64(data[timestampSizeInBytes:], 0)
}