sedationh / blog

Blog space for a user who doesn't want to mess with blogs anymore:)
0 stars 0 forks source link

Go从零实现 - 分布式缓存 - GeeCache #58

Open sedationh opened 12 months ago

sedationh commented 12 months ago

缓存淘汰策略、LRU 实现

存储是优先的,满了去掉谁

LFU(Least Frequently Used)

访问次数,去掉访问最少的

LRU(Least Recently Used)

LRU 认为,如果数据最近被访问过,那么将来被访问的概率也会更高。

LRU 实现

image

整体逻辑并不复杂,包装 entry 是为了删除队尾的时候方便

delete(c.cache, kv.key)

package lru

import "container/list"

type Value interface {
    Len() int
}

type Cache struct {
    maxBytes int64
    nbytes   int64
    // 使用双向链表
    ll *list.List
    // 使用 map
    cache     map[string]*list.Element
    OnEvicted func(key string, value Value)
}

func New(maxBytes int64, onEvicted func(string2 string, value Value)) *Cache {
    return &Cache{
        maxBytes:  maxBytes,
        ll:        list.New(),
        cache:     make(map[string]*list.Element),
        OnEvicted: onEvicted,
    }
}

type entry struct {
    key   string
    value Value
}

func (c *Cache) Add(key string, value Value) {
    if ele, ok := c.cache[key]; ok {
        // LRU 调整顺序到头部 头部是最近使用的,尾部是最久未使用的
        c.ll.MoveToFront(ele)
        c.nbytes += int64(len(key)) + int64(value.Len())
        kv := ele.Value.(*entry)
        kv.value = value
    } else {
        // 新增
        ele := c.ll.PushFront(&entry{key, value})
        c.cache[key] = ele
        c.nbytes += int64(len(key)) + int64(value.Len())
    }
    for c.maxBytes != 0 && c.maxBytes < c.nbytes {
        c.RemoveOldest()
    }
}

func (c *Cache) RemoveOldest() {
    ele := c.ll.Back()

    if ele == nil {
        return
    }

    c.ll.Remove(ele)
    kv := ele.Value.(*entry)
    delete(c.cache, kv.key)
    c.nbytes -= int64(len(kv.key)) + int64(kv.value.Len())
    if c.OnEvicted != nil {
        c.OnEvicted(kv.key, kv.value)
    }
}

func (c *Cache) Get(key string) (value Value, ok bool) {
    if ele, ok := c.cache[key]; ok {
        c.ll.MoveToFront(ele)
        kv := ele.Value.(*entry)
        return kv.value, true
    }
    return
}

func (c *Cache) Len() int {
    return c.ll.Len()
}

单机并发缓存

为现有的 Cache 增加并发读取的能力

先看看使用层

func TestGet(t *testing.T) {
    loadCounts := make(map[string]int, len(db))
    gee := NewGroup("scores", 2<<10, GetterFunc(
        func(key string) ([]byte, error) {
            log.Println("[SlowDB] search key", key)
            if v, ok := db[key]; ok {
                if _, ok := loadCounts[key]; !ok {
                    loadCounts[key] = 0
                }
                loadCounts[key] += 1
                return []byte(v), nil
            }
            return nil, fmt.Errorf("%s not exist", key)
        }))

    for k, v := range db {
        if view, err := gee.Get(k); err != nil || view.String() != v {
            t.Fatal("failed to get value of Tom")
        } // load from callback function
        if _, err := gee.Get(k); err != nil || loadCounts[k] > 1 {
            t.Fatalf("cache %s miss", k)
        } // cache hit
    }

    if view, err := gee.Get("unknown"); err == nil {
        t.Fatalf("the value of unknow should be empty, but %s got", view)
    }
}

NewGroup 创建返回 gee,也就是缓存实例,创建的时候第三个参数是缓存未命中时的获取逻辑,获取到了,拿到的结果会加入缓存

NewGroup 的 核心 Get 实现

// A Getter loads data for a key.
type Getter interface {
    Get(key string) ([]byte, error)
}

// A GetterFunc implements Getter with a function.
type GetterFunc func(key string) ([]byte, error)

// Get implements Getter interface function
func (f GetterFunc) Get(key string) ([]byte, error) {
    return f(key)
}

// A Group is a cache namespace and associated data loaded spread over
type Group struct {
    name      string
    getter    Getter
    mainCache cache
}

// Get value for a key from cache
func (g *Group) Get(key string) (ByteView, error) {
    if key == "" {
        return ByteView{}, fmt.Errorf("key is required")
    }

    if v, ok := g.mainCache.get(key); ok {
        log.Println("[GeeCache] hit")
        return v, nil
    }

    return g.load(key)
}

func (g *Group) load(key string) (value ByteView, err error) {
    return g.getLocally(key)
}

func (g *Group) getLocally(key string) (ByteView, error) {
    bytes, err := g.getter.Get(key)
    if err != nil {
        return ByteView{}, err

    }
    value := ByteView{b: cloneBytes(bytes)}
    g.populateCache(key, value)
    return value, nil
}

func (g *Group) populateCache(key string, value ByteView) {
    g.mainCache.add(key, value)
}

Getter 这里接口型函数的设计可看

cache 的实现

package geecache

import (
    "geecache/lru"
    "sync"
)

type cache struct {
    mu         sync.Mutex
    lru        *lru.Cache
    cacheBytes int64
}

func (c *cache) add(key string, value ByteView) {
    c.mu.Lock()
    defer c.mu.Unlock()
    if c.lru == nil {
        c.lru = lru.New(c.cacheBytes, nil)
    }
    c.lru.Add(key, value)
}

func (c *cache) get(key string) (value ByteView, ok bool) {
    c.mu.Lock()
    defer c.mu.Unlock()
    if c.lru == nil {
        return
    }

    if v, ok := c.lru.Get(key); ok {
        return v.(ByteView), ok
    }

    return
}

这里为 lru 的读取套上了一层锁的机制

[!NOTE] 多个协程(goroutine)同时读写同一个变量,在并发度较高的情况下,会发生冲突。确保一次只有一个协程(goroutine)可以访问该变量以避免冲突,这称之为互斥,互斥锁可以解决这个问题。

sync.Mutex 是一个互斥锁,可以由不同的协程加锁和解锁。

sync.Mutex  是 Go 语言标准库提供的一个互斥锁,当一个协程(goroutine)获得了这个锁的拥有权后,其它请求锁的协程(goroutine) 就会阻塞在  Lock()  方法的调用上,直到调用  Unlock()  锁被释放。

byte 是很通用的存储结构,抽象一层这个结构,并满足我们 Value 的接口要求

// Value use Len to count how many bytes it takes
type Value interface {
    Len() int
}

package geecache

// A ByteView holds an immutable view of bytes.
type ByteView struct {
    b []byte
}

// Len returns the view's length
func (v ByteView) Len() int {
    return len(v.b)
}

// ByteSlice returns a copy of the data as a byte slice.
func (v ByteView) ByteSlice() []byte {
    return cloneBytes(v.b)
}

// String returns the data as a string, making a copy if necessary.
func (v ByteView) String() string {
    return string(v.b)
}

func cloneBytes(b []byte) []byte {
    c := make([]byte, len(b))
    copy(c, b)
    return c
}

至此整体结构

geecache/
    |--lru/
        |--lru.go  // lru 缓存淘汰策略
    |--byteview.go // 缓存值的抽象与封装
    |--cache.go    // 并发控制
    |--geecache.go // 负责与外部交互,控制缓存存储和获取的主流程

至此,这一章节的单机并发缓存就已经完成了。

HTTP 服务端

[!NOTE] 分布式缓存需要实现节点间通信,建立基于 HTTP 的通信机制是比较常见和简单的做法。如果一个节点启动了 HTTP 服务,那么这个节点就可以被其他节点访问。今天我们就为单机节点搭建 HTTP Server。

目标是可以通过 HTTP 协议访问到 上面实现的 geecache

使用层为

func main() {
    geecache.NewGroup("scores", 2<<10, geecache.GetterFunc(
        func(key string) ([]byte, error) {
            log.Println("[SlowDB] search key", key)
            if v, ok := db[key]; ok {
                return []byte(v), nil
            }
            return nil, fmt.Errorf("%s not exist", key)
        }))

    addr := "localhost:9999"
    peers := geecache.NewHTTPPool(addr)
    log.Println("geecache is running at", addr)
    log.Fatal(http.ListenAndServe(addr, peers))
}

启动后可以通过 HTTP 进行访问

$ curl http://localhost:9999/_geecache/scores/Tom
630

$ curl http://localhost:9999/_geecache/scores/kkk
kkk not exist

URL 格式为 /<basepath>/<groupname>/<key> 通过 groupname 得到 group 实例,再使用  group.Get(key)  获取缓存数据。

peers := geecache.NewHTTPPool(addr)
http.ListenAndServe(addr, peers)

peers 需要满足 ListenAndServe 的接口要求

func ListenAndServe(addr string, handler Handler) error {
    server := &Server{Addr: addr, Handler: handler}
    return server.ListenAndServe()
}

type Handler interface {
    ServeHTTP(ResponseWriter, *Request)
}

因此造一个 HTTPPool 实现 ServeHTTP 接口就好

const defaultBasePath = "/_geecache/"

// HTTPPool implements PeerPicker for a pool of HTTP peers.
type HTTPPool struct {
    // this peer's base URL, e.g. "https://example.net:8000"
    self     string
    basePath string
}

// NewHTTPPool initializes an HTTP pool of peers.
func NewHTTPPool(self string) *HTTPPool {
    return &HTTPPool{
        self:     self,
        basePath: defaultBasePath,
    }
}

// Log info with server name
func (p *HTTPPool) Log(format string, v ...interface{}) {
    log.Printf("[Server %s] %s", p.self, fmt.Sprintf(format, v...))
}

// ServeHTTP handle all http requests
func (p *HTTPPool) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    if !strings.HasPrefix(r.URL.Path, p.basePath) {
        panic("HTTPPool serving unexpected path: " + r.URL.Path)
    }
    p.Log("%s %s", r.Method, r.URL.Path)
    // /<basepath>/<groupname>/<key> required
    parts := strings.SplitN(r.URL.Path[len(p.basePath):], "/", 2)
    if len(parts) != 2 {
        http.Error(w, "bad request", http.StatusBadRequest)
        return
    }

    groupName := parts[0]
    key := parts[1]

    group := GetGroup(groupName)
    if group == nil {
        http.Error(w, "no such group: "+groupName, http.StatusNotFound)
        return
    }

    view, err := group.Get(key)
    if err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)
        return
    }

    w.Header().Set("Content-Type", "application/octet-stream")
    w.Write(view.ByteSlice())
}

接下来,运行 main 函数,使用 curl 做一些简单测试:

$ curl http://localhost:9999/_geecache/scores/Tom
630
$ curl http://localhost:9999/_geecache/scores/kkk
kkk not exist

GeeCache 的日志输出如下:

2020/02/11 23:28:39 geecache is running at localhost:9999
2020/02/11 23:29:08 [Server localhost:9999] GET /_geecache/scores/Tom
2020/02/11 23:29:08 [SlowDB] search key Tom
2020/02/11 23:29:16 [Server localhost:9999] GET /_geecache/scores/kkk
2020/02/11 23:29:16 [SlowDB] search key kkk

一致性哈希(hash)

场景

当我们有多个 Cache Server 的时候,一个 key 的请求过来,去哪个 Cache Server 来请求呢 需要满足以下条件

  1. 缓存命中率高
  2. 容易拓展

image

方案

方案 1 如下图 image 命中率高,算法简单,但是一旦 Cache Server 数量变化,现有缓存都无用了

方案 2 一致性哈希 如上图 image

并且为了解决分布不均匀的问题,增加虚拟节点 image

image

实现

package consistenthash

import (
    "hash/crc32"
    "sort"
    "strconv"
)

// Hash maps bytes to uint32
type Hash func(data []byte) uint32

// Map constains all hashed keys
type Map struct {
    hash     Hash
    replicas int
    keys     []int // Sorted
    hashMap  map[int]string
}

// New creates a Map instance
func New(replicas int, fn Hash) *Map {
    m := &Map{
        replicas: replicas,
        hash:     fn,
        hashMap:  make(map[int]string),
    }
    if m.hash == nil {
        m.hash = crc32.ChecksumIEEE
    }
    return m
}

// Add adds some keys to the hash.
func (m *Map) Add(keys ...string) {
    for _, key := range keys {
        for i := 0; i < m.replicas; i++ {
            hash := int(m.hash([]byte(strconv.Itoa(i) + key)))
            m.keys = append(m.keys, hash)
            m.hashMap[hash] = key
        }
    }
    sort.Ints(m.keys)
}

// Get gets the closest item in the hash to the provided key.
func (m *Map) Get(key string) string {
    if len(m.keys) == 0 {
        return ""
    }

    hash := int(m.hash([]byte(key)))
    // Binary search for appropriate replica.
    idx := sort.Search(len(m.keys), func(i int) bool {
        return m.keys[i] >= hash
    })

    return m.hashMap[m.keys[idx%len(m.keys)]]
}

最后一行 m.keys[idx%len(m.keys)] 有些理解成本

[!NOTE] 如果  idx == len(m.keys),说明应选择  m.keys[0],因为  m.keys  是一个环状结构,所以用取余数的方式来处理这种情况。

其他参考

小林 coding:什么是一致性哈希?

一致性哈希 - Consistent Hashing 是什么?为什么系统设计面试中经常会提到?10 分钟讲解一致性哈希 | 系统设计 System Design EP1

分布式节点

现有流程

现在的流程为

                            是
接收 key --> 检查是否被缓存 -----> 返回缓存值 ⑴
                |  否                         是
                |-----> 是否应当从远程节点获取 -----> 与远程节点交互 --> 返回缓存值 ⑵
                            |  否
                            |-----> 调用`回调函数`,获取值并添加到缓存 --> 返回缓存值 ⑶

1、3 已经在 单机并发缓存那里实现,本节主要来解决 2

使用一致性哈希选择节点        是                                    是
    |-----> 是否是远程节点 -----> HTTP 客户端访问远程节点 --> 成功?-----> 服务端返回返回值
                    |  否                                    ↓  否
                    |----------------------------> 回退到本地节点处理。

先看使用层

import (
    "flag"
    "fmt"
    "geecache"
    "log"
    "net/http"
    "strings"
)

var db = map[string]string{
    "Tom":  "630",
    "Jack": "589",
    "Sam":  "567",
}

func createGroup() *geecache.Group {
    return geecache.NewGroup("scores", 2<<10, geecache.GetterFunc(
        func(key string) ([]byte, error) {
            log.Println("[SlowDB] search key", key)
            if v, ok := db[key]; ok {
                return []byte(v), nil
            }
            return nil, fmt.Errorf("%s not exist", key)
        }))
}

func startCacheServer(addr string, addrs []string, gee *geecache.Group) {
    peers := geecache.NewHTTPPool(addr)
    peers.Set(addrs...)
    gee.RegisterPeers(peers)
    log.Println("geecache is running at", addr)
    log.Fatal(http.ListenAndServe(strings.TrimPrefix(addr, "http://"), peers))
}

func startAPIServer(apiAddr string, gee *geecache.Group) {
    http.Handle("/api", http.HandlerFunc(
        func(w http.ResponseWriter, r *http.Request) {
            key := r.URL.Query().Get("key")
            view, err := gee.Get(key)
            if err != nil {
                http.Error(w, err.Error(), http.StatusInternalServerError)
                return
            }
            w.Header().Set("Content-Type", "application/octet-stream")
            w.Write(view.ByteSlice())

        }))
    log.Println("fontend server is running at", apiAddr)
    log.Fatal(http.ListenAndServe(strings.TrimPrefix(apiAddr, "http://"), nil))

}

func main() {
    var port int
    var api bool
    flag.IntVar(&port, "port", 8001, "Geecache server port")
    flag.BoolVar(&api, "api", false, "Start a api server?")
    flag.Parse()

    apiAddr := "http://0.0.0.0:9999"
    addrMap := map[int]string{
        8001: "http://0.0.0.0:8001",
        8002: "http://0.0.0.0:8002",
        8003: "http://0.0.0.0:8003",
    }

    var addrs []string
    for _, v := range addrMap {
        addrs = append(addrs, v)
    }

    gee := createGroup()
    if api {
        go startAPIServer(apiAddr, gee)
    }
    startCacheServer(addrMap[port], addrs, gee)
}
#!/bin/bash
trap "rm server;kill 0" EXIT

go build -o server
./server -port=8001 &
./server -port=8002 &
./server -port=8003 -api=1 &

sleep 2
echo ">>> start test"
curl "http://127.0.0.1:9999/api?key=Tom" &
curl "http://127.0.0.1:9999/api?key=Tom" &
curl "http://127.0.0.1:9999/api?key=Tom" &

wait

起了三个 Cache Server,端口分别在 8001 8002 8003 其中有个服务同时起了 Cache Server 和 对外暴露的 API 服务,这个 API 服务放在 9999 端口上

执行结果是

2023/11/22 09:29:09 geecache is running at http://0.0.0.0:8001
2023/11/22 09:29:09 geecache is running at http://0.0.0.0:8002
2023/11/22 09:29:09 fontend server is running at http://0.0.0.0:9999
2023/11/22 09:29:09 geecache is running at http://0.0.0.0:8003
>>> start test
2023/11/22 09:29:11 [Server http://0.0.0.0:8003] Pick peer http://0.0.0.0:8002
2023/11/22 09:29:11 [Server http://0.0.0.0:8003] Pick peer http://0.0.0.0:8002
2023/11/22 09:29:11 [Server http://0.0.0.0:8003] Pick peer http://0.0.0.0:8002
2023/11/22 09:29:11 [Server http://0.0.0.0:8002] GET /_geecache/scores/Tom
2023/11/22 09:29:11 [SlowDB] search key Tom
2023/11/22 09:29:11 [Server http://0.0.0.0:8002] GET /_geecache/scores/Tom
2023/11/22 09:29:11 [GeeCache] hit
2023/11/22 09:29:11 [Server http://0.0.0.0:8002] GET /_geecache/scores/Tom
2023/11/22 09:29:11 [GeeCache] hit
630630630

整个流程

如下图 image image image

上面这两个图可以结合着理解 一致性哈希在分布式节点的使用

其他

var _ PeerPicker = (*HTTPPool)(nil)  这行代码是用于验证  *HTTPPool  类型是否实现了  PeerPicker  接口的一种方式。

即利用强制类型转换,确保 struct HTTPPool 实现了接口 PeerPicker。这样 IDE 和编译期间就可以检查,而不是等到使用的时候。

参考这里

防止缓存击穿

背景

缓存雪崩:缓存在同一时刻全部失效,造成瞬时 DB 请求量大、压力骤增,引起雪崩。缓存雪崩通常因为缓存服务器宕机、缓存的 key 设置了相同的过期时间等引起。

缓存击穿:一个存在的 key,在缓存过期的一刻,同时有大量的请求,这些请求都会击穿到 DB ,造成瞬时 DB 请求量大、压力骤增。

缓存穿透:查询一个不存在的数据,因为不存在则不会写到缓存中,所以每次都会去请求 DB,如果瞬间流量过大,穿透到 DB,导致宕机。

在分布式节点中有以下使用结果

#!/bin/bash
trap "rm server;kill 0" EXIT

go build -o server
./server -port=8001 &
./server -port=8002 &
./server -port=8003 -api=1 &

sleep 2
echo ">>> start test"
curl "http://127.0.0.1:9999/api?key=Tom" &
curl "http://127.0.0.1:9999/api?key=Tom" &
curl "http://127.0.0.1:9999/api?key=Tom" &

wait
2023/11/22 09:29:09 geecache is running at http://0.0.0.0:8001
2023/11/22 09:29:09 geecache is running at http://0.0.0.0:8002
2023/11/22 09:29:09 fontend server is running at http://0.0.0.0:9999
2023/11/22 09:29:09 geecache is running at http://0.0.0.0:8003
>>> start test
2023/11/22 09:29:11 [Server http://0.0.0.0:8003] Pick peer http://0.0.0.0:8002
2023/11/22 09:29:11 [Server http://0.0.0.0:8003] Pick peer http://0.0.0.0:8002
2023/11/22 09:29:11 [Server http://0.0.0.0:8003] Pick peer http://0.0.0.0:8002
2023/11/22 09:29:11 [Server http://0.0.0.0:8002] GET /_geecache/scores/Tom
2023/11/22 09:29:11 [SlowDB] search key Tom
2023/11/22 09:29:11 [Server http://0.0.0.0:8002] GET /_geecache/scores/Tom
2023/11/22 09:29:11 [GeeCache] hit
2023/11/22 09:29:11 [Server http://0.0.0.0:8002] GET /_geecache/scores/Tom
2023/11/22 09:29:11 [GeeCache] hit
630630630

对着 api 请求了 三次,8003 向 8002 发了三次请求

本次来实现一个叫作 singleflight 的包来解决这个问题,做到只对远端进行一次请求

方案

从实现上来看,就是对向远端请求的那个流程加锁

func (g *Group) load(key string) (value ByteView, err error) {
    // each key is only fetched once (either locally or remotely)
    // regardless of the number of concurrent callers.
    viewi, err := g.loader.Do(key, func() (interface{}, error) {
        if g.peers != nil {
            if peer, ok := g.peers.PickPeer(key); ok {
                if value, err = g.getFromPeer(peer, key); err == nil {
                    return value, nil
                }
                log.Println("[GeeCache] Failed to get from peer", err)
            }
        }

        return g.getLocally(key)
    })

    if err == nil {
        return viewi.(ByteView), nil
    }
    return
}

实现如下

package singleflight

import "sync"

// call is an in-flight or completed Do call
type call struct {
    wg  sync.WaitGroup
    val interface{}
    err error
}

// Group represents a class of work and forms a namespace in which
// units of work can be executed with duplicate suppression.
type Group struct {
    mu sync.Mutex       // protects m
    m  map[string]*call // lazily initialized
}

// Do executes and returns the results of the given function, making
// sure that only one execution is in-flight for a given key at a
// time. If a duplicate comes in, the duplicate caller waits for the
// original to complete and receives the same results.
func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error) {
    g.mu.Lock()
    if g.m == nil {
        g.m = make(map[string]*call)
    }
    if c, ok := g.m[key]; ok {
        g.mu.Unlock()
        c.wg.Wait()
        return c.val, c.err
    }
    c := new(call)
    c.wg.Add(1)
    g.m[key] = c
    g.mu.Unlock()

    c.val, c.err = fn()
    c.wg.Done()

    g.mu.Lock()
    delete(g.m, key)
    g.mu.Unlock()

    return c.val, c.err
}

两个锁

参考

原文