mailgun / groupcache

Clone of golang/groupcache with TTL and Item Removal support
Apache License 2.0
495 stars 73 forks source link

Groupcache fails to distribute across peers #33

Closed jon-whit closed 2 years ago

jon-whit commented 3 years ago

I have the following application:

package main

import (...)

var store = map[string]string{}

var group = groupcache.NewGroup("cache1", 64<<20, groupcache.GetterFunc(
    func(ctx groupcache.Context, key string, dest groupcache.Sink) error {

        v, ok := store[key]
        if !ok {
            return fmt.Errorf("key not set")
        } else {
            if err := dest.SetBytes([]byte(v), time.Now().Add(10*time.Minute)); err != nil {
                log.Printf("Failed to set cache value for key '%s' - %v\n", key, err)
                return err
            }
        }

        return nil
    },
))

func main() {
    addr := flag.String("addr", ":8080", "server address")
    peers := flag.String("pool", "http://localhost:8080", "server pool list")
    flag.Parse()

    p := strings.Split(*peers, ",")
    pool := groupcache.NewHTTPPoolOpts(*addr, &groupcache.HTTPPoolOptions{})
    pool.Set(p...)

    http.HandleFunc("/set", func(w http.ResponseWriter, r *http.Request) {
        key := r.FormValue("key")
        value := r.FormValue("value")
        store[key] = value
    })

    http.HandleFunc("/cache", func(w http.ResponseWriter, r *http.Request) {
        key := r.FormValue("key")

        fmt.Printf("Fetching value for key '%s'\n", key)

        ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
        defer cancel()

        var b []byte
        err := group.Get(ctx, key, groupcache.AllocatingByteSliceSink(&b))
        if err != nil {
            http.Error(w, err.Error(), http.StatusNotFound)
            return
        }
        w.Write(b)
        w.Write([]byte{'\n'})
    })

    go func() {
        if err := http.ListenAndServe(*addr, nil); err != nil {
            log.Fatalf("Failed to start HTTP server - %v", err)
        }
    }()

    termChan := make(chan os.Signal)
    signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM)
    <-termChan
}

I run two instances of this application:

go run main.go -addr=:8081 -pool=http://127.0.0.1:8081,http://127.0.0.1:8082
go run main.go -addr=:8082 -pool=http://127.0.0.1:8081,http://127.0.0.1:8082

Can you explain why I see this behavior?

curl -X GET "localhost:8081/set?key=key1&value=val1"
curl -X GET "localhost:8081/cache?key=key1"
> val1
curl -X GET "localhost:8082/cache?key=key1"
> error "key not set"

but, if I reverse the operations and issue them against the second server I get

curl -X GET "localhost:8082/set?key=key1&value=val1"
curl -X GET "localhost:8082/cache?key=key1"
> val1
curl -X GET "localhost:8081/cache?key=key1"
> val1

Why is this? Shouldn't the group cache return the value previously read by any peer (e.g. the cache is shared by the cache group) so long as no error happened with the dest.SetBytes(..) call?

jeffreydwalter commented 3 years ago

@jon-whit I know this won't help and I don't have the answer, but I did try your code against the original groupcache package (https://github.com/golang/groupcache), that this one is based on and am able to reproduce the issue there as well.

jon-whit commented 3 years ago

Yeah, it's like the package literally doesn't do what it's meant to do 😂

tymeshifter commented 2 years ago

@jon-whit your example has a bug because you never expose pool over http. pool itself is an http.Handler and in this example it is only instantiated and never passed to the server to be used as handler for requests. So the scenario you described worked as expected because there was no connection between peers. Also the interpretation of "reverse" scenario is wrong because you need to restart processes to wipe out data from first scenario. If you do that you will get `error "key not set" for both scenarios.

jonwhitty commented 2 years ago

@tymeshifter I changed the code as follows:

package main

import (...)

var store = map[string]string{}

var group = groupcache.NewGroup("cache1", 64<<20, groupcache.GetterFunc(
    func(ctx context.Context, key string, dest groupcache.Sink) error {

        v, ok := store[key]
        if !ok {
            return fmt.Errorf("key not set")
        } else {
            if err := dest.SetBytes([]byte(v), time.Now().Add(10*time.Minute)); err != nil {
                log.Printf("Failed to set cache value for key '%s' - %v\n", key, err)
                return err
            }
        }

        return nil
    },
))

func main() {
    addr := flag.String("addr", ":8080", "server address")
    addr2 := flag.String("api-addr", ":8081", "api server address")
    peers := flag.String("pool", "http://localhost:8080", "server pool list")
    flag.Parse()

    p := strings.Split(*peers, ",")
    pool := groupcache.NewHTTPPoolOpts(*addr, &groupcache.HTTPPoolOptions{})
    pool.Set(p...)

    http.HandleFunc("/set", func(w http.ResponseWriter, r *http.Request) {
        key := r.FormValue("key")
        value := r.FormValue("value")
        store[key] = value
    })

    http.HandleFunc("/cache", func(w http.ResponseWriter, r *http.Request) {
        key := r.FormValue("key")

        fmt.Printf("Fetching value for key '%s'\n", key)

        ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
        defer cancel()

        var b []byte
        err := group.Get(ctx, key, groupcache.AllocatingByteSliceSink(&b))
        if err != nil {
            http.Error(w, err.Error(), http.StatusNotFound)
            return
        }
        w.Write(b)
        w.Write([]byte{'\n'})
    })

    server := http.Server{
        Addr:    *addr,
        Handler: pool,
    }

    go func() {
        if err := server.ListenAndServe(); err != nil {
            log.Fatalf("Failed to start HTTP server - %v", err)
        }
    }()

    go func() {
        if err := http.ListenAndServe(*addr2, nil); err != nil {
            log.Fatalf("Failed to start API HTTP server - %v", err)
        }
    }()

    termChan := make(chan os.Signal, 1)
    signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM)
    <-termChan
}

And I get the same issues I mentioned above. So I'm not so sure that it's an issue of the pool not being exposed over HTTP. In the old example it is exposed over the DefaultServeMux.

Maybe you can provide an example that exhibits the behavior I'm expecting to see? Because I'm not able to get it to work at all like I'd expect.

thrawn01 commented 2 years ago

I don't follow the issue here, I'm not sure what you are trying to accomplish. The README contains a working example of how the library is to be used. The code you posted doesn't appear to call the pool handler at all, so it's not going to work in the way the library was designed.

jon-whit commented 2 years ago

@thrawn01 how do you figure? I passed the pool into the http.Server just like README and implemented some handlers that leverage the group and it doesn't behave as described.

Could you clarify the rationale behind your comment?

thrawn01 commented 2 years ago

I understand what you are trying todo now! Amazing what a good night sleep will do. 😄

You have a few things wrong, also maybe some miss-understanding of what the library does and how it works.

  1. the groupcache.NewHTTPPoolOpts() call is incorrect, you have to tell groupcache who the current instance is, which is the first arg in that call. So in your code it should be pool := groupcache.NewHTTPPoolOpts(fmt.Sprintf("http://%s", *addr), &groupcache.HTTPPoolOptions{}) such that the instance passed to pool.Set() like http://127.0.0.1:8081 will match the self arg we passed to NewHTTPPoolOpts()
  2. You need to pass in the full interface address to which you want to bind. Passing in :8081 will most likely bind to ipv6 although some systems will bind to both ipv6 and ipv4 interfaces when you do this. To be clear, and to allow groupcache to match peers correctly you should call your app like this go run main.go -api-addr=127.0.0.1:9081 -addr=127.0.0.1:8081 -pool=http://127.0.0.1:8081,http://127.0.0.1:8082
  3. Group cache doesn't distribute the value to all peers in the cluster, if a peer doesn't have the value it will figure out which peer has the value and ask that peer for the value. So in your example, asking one peer for a value you set on an arbitrary peer might not work, it will only work if you set the value on the peer that the consistent hash algorithm has determined "owns" the value.

I hope this makes more sense.

I'll add an additional comment in the example to explicitly call out that you MUST identify the current instance by passing the argument self such that the pool can find self in the list passed to pool.Set(). Any other thoughts on making the documentation clear would be appreciated.

thrawn01 commented 2 years ago

See https://github.com/mailgun/groupcache/commit/ece2929696358d603feadcf128fa705cdd7d6650