Closed mthenw closed 7 years ago
Thanks for trying this out. What kind of events didn't trigger a notification from the Watch (create
/update
/delete
/ etc.)?
I may have missed something on how to use watches with prefix properly, so maybe specific events are not triggering a notification from the Watch somehow.
The List that happens before is just a specificity of libkv, we just return the current list of keys before watching on changes (which I'm starting to question now). Unsure this is related.
Thanks for trying this out. What kind of events didn't trigger a notification from the Watch (create/update/delete/ etc.)?
In my case, it's create
. It happens randomly and when I debugged it to libkv level it looks like some keys are missed between List
and Watch
.
The List that happens before is just a specificity of libkv, we just return the current list of keys before watching on changes (which I'm starting to question now). Unsure this is related.
yup, this is how I understand it.
I will try to investigate that more in few hours and let you know.
@mthenw I baked this small example to debug (found #16 in the process) and I wasn't able to reproduce:
package main
import (
"fmt"
"os"
"time"
"github.com/docker/libkv"
"github.com/docker/libkv/store"
"github.com/docker/libkv/store/etcd/v3"
)
func init() {
etcdv3.Register()
}
type Store struct {
store store.Store
}
func (s *Store) put(key string, value string) {
err := s.store.Put(key, []byte(value), nil)
if err != nil {
fmt.Errorf("Error trying to put value at key: %v", key)
os.Exit(1)
}
}
func (s *Store) delete(key string) {
err := s.store.Delete(key)
if err != nil {
fmt.Errorf("Error trying to delete key: %v", key)
os.Exit(1)
}
}
func main() {
client := "localhost:4001"
s := &Store{}
var err error
// Initialize a new store with consul
s.store, err = libkv.NewStore(
store.ETCDV3, // or "consul"
[]string{client},
&store.Config{
ConnectionTimeout: 10 * time.Second,
},
)
if err != nil {
fmt.Println("Cannot create store consul")
os.Exit(1)
}
s.put("foo/one", "bar")
s.put("foo/two", "bar")
s.put("foo/three", "bar")
stopCh := make(<-chan struct{})
events, err := s.store.WatchTree("foo", stopCh)
if err != nil {
fmt.Errorf("Couldn't start watch at key %v, error: %v", "foo", err)
}
go func() {
time.Sleep(1 * time.Second)
s.put("foo/four", "fourbar") // Create a key under the prefix "foo"
s.put("foo/five", "fivebar") // Create
s.put("foo/six", "sixbar") // Create
s.delete("foo/three") // Delete a key under the prefix "foo"
s.put("foo/two", "newbar") // Update "foo/two" with new value
}()
for {
select {
case pairs := <-events:
// Do something with events
for _, pair := range pairs {
fmt.Printf("even on key %v / new value=%s\n", pair.Key, string(pair.Value))
}
}
}
}
Maybe I'm doing this wrong, would you mind giving me an example program in which you can reproduce the error so I can try on my side? Thanks!
I slightly changed your example:
package main
import (
"fmt"
"os"
"time"
"github.com/docker/libkv"
"github.com/docker/libkv/store"
"github.com/docker/libkv/store/etcd/v3"
)
func init() {
etcdv3.Register()
}
type Store struct {
store store.Store
}
func (s *Store) put(key string, value string) {
err := s.store.Put(key, []byte(value), nil)
if err != nil {
fmt.Printf("Error trying to put value at key: %v", key)
os.Exit(1)
}
}
func (s *Store) delete(key string) {
err := s.store.Delete(key)
if err != nil {
fmt.Printf("Error trying to delete key: %v", key)
os.Exit(1)
}
}
func main() {
client := "localhost:2379"
s := &Store{}
var err error
// Initialize a new store with consul
s.store, err = libkv.NewStore(
store.ETCDV3, // or "consul"
[]string{client},
&store.Config{
ConnectionTimeout: 1 * time.Second,
},
)
if err != nil {
fmt.Printf("Cannot create store: %s", err.Error())
os.Exit(1)
}
s.put("foo/one", "bar")
s.put("foo/two", "bar")
s.put("foo/three", "bar")
stopCh := make(<-chan struct{})
events, err := s.store.WatchTree("foo", stopCh)
go func() {
s.put("foo/four", "fourbar") // Create a key under the prefix "foo"
s.put("foo/five", "fivebar") // Create
s.put("foo/six", "sixbar") // Create
s.put("foo/two", "newbar") // Update "foo/two" with new value
}()
if err != nil {
fmt.Printf("Couldn't start watch at key %v, error: %v", "foo", err)
}
for {
select {
case pairs := <-events:
// Do something with events
for _, pair := range pairs {
fmt.Printf("even on key %v / new value=%s\n", pair.Key, string(pair.Value))
}
}
}
}
Output:
even on key foo/two / new value=bar
even on key foo/three / new value=bar
even on key foo/one / new value=bar
even on key foo/five / new value=fivebar
even on key foo/six / new value=sixbar
even on key foo/two / new value=newbar
foo/four
is missing
etcd version:
$ etcd --version
etcd Version: 3.2.5
Git SHA: GitNotFound
Go Version: go1.8.3
Go OS/Arch: darwin/amd64
I think I found the issue. Submitting PR in few minutes.
Awesome 😄. Forgot to remove the sleep where it's racing. I'm able to reproduce now indeed.
I have an issue with WatchTree in etcd v3. It looks like WatchTree misses some updates and I want to confirm that. I'm not an etcd expert so I might not understand it correctly.
Right now the algorithm looks like that:
If a value was added between listing all values and starting watching it's missed. Shouldn't
Watch
https://github.com/abronan/libkv/blob/master/store/etcd/v3/etcd.go#L256 be called with specified revision number?