Closed brandond closed 7 months ago
from @qwtsc in #250 (comment) I believe I may have found the actual reason for the problem. This Kubernetes feature depends on the GetCurrentResourceVersionFromStorage function to retrieve the latest version of a specified resource prefix. The function issues an empty list request with limit of 1, utilizing the ListAccessor interface to extract the resource version from the returned metadata. However, it seems that Kine responds with the resourceVersion corresponding to the maxRowId globally, rather than the one associated with the specified prefix.
Ah, interesting. I believe that should be fixable.
Just to be clear, this is a
List
against the current resource type prefix - so the etcd client is expecting to see the latest revision of any key in that prefix, NOT the latest revision of the keystore as a whole.What is the expected behavior if there are no keys in that prefix? I guess I'll have to test.
Originally posted by @brandond in #250 (comment)
Apologies, I am wrong again. According to the following unit-test in Kubernetes, it seems that Kine's behavior is indeed what Kubernetes anticipates.
// create a pod and make sure its RV is equal to the one maintained by etcd
pod := createPod(makePod("pod-1"))
currentStorageRV, err := storage.GetCurrentResourceVersionFromStorage(context.TODO(), etcdStorage, func() runtime.Object { return &example.PodList{} }, "/pods", "Pod")
require.NoError(t, err)
podRV, err := versioner.ParseResourceVersion(pod.ResourceVersion)
require.NoError(t, err)
require.Equal(t, currentStorageRV, podRV, "expected the global etcd RV to be equal to pod's RV")
// now create a replicaset (new resource) and make sure the target function returns global etcd RV
rs := createReplicaSet(makeReplicaSet("replicaset-1"))
currentStorageRV, err = storage.GetCurrentResourceVersionFromStorage(context.TODO(), etcdStorage, func() runtime.Object { return &example.PodList{} }, "/pods", "Pod")
require.NoError(t, err)
rsRV, err := versioner.ParseResourceVersion(rs.ResourceVersion)
require.NoError(t, err)
require.Equal(t, currentStorageRV, rsRV, "expected the global etcd RV to be equal to replicaset's RV")
Totally no idea now. It's possible that this Kubernetes feature is not yet stable enough for public use. I've decided to close this issue. Thanks all the same.
Ok, will close.
If you do have a public reproducer available, let me know, I am curious what is going on.
I implemented some the Progress Notify stuff in a private fork for exactly the same reason, using the streaming WatchList
in our informers. The progress notification in etcd
can be triggered by two different flows:
ProgressNotify
option to true
when a new watch stream is created using WatchRequest_CreateRequest
message. This enables progress notification just for that stream, using a ticker, delivering a progress notification if no events have been sent for a while. I can see this already works in kine upstream, thanks @brandond!WatchRequest_ProgressRequest
message. In this case etcd
waits for all of the watch streams for that client to sync (i.e they haven't sent a event in some time and are up to date) and then sends a progress notification for all of the streams for that client.It seems like K8s streaming WatchList
uses both of these mechanisms, and there is also the Consistent List from Cache
KEP which utilizes the second method explicitly for issuing bookmark events. I think implementing that would make streaming lists work.
We have been using WatchList
with kine in production for a while and it seems to be working great for us.
@onprem can you link me to where/when the apiserver sends a ProgressRequest message? I can take a look at making sure that works.
Also, lets take this to a new issue so I have something to work against.
Interesting observation @onprem
@brandond you can check out the following code snippets.
func (pr *conditionalProgressRequester) Run(stopCh <-chan struct{}) {
ctx := wait.ContextForChannel(stopCh)
go func() {
defer utilruntime.HandleCrash()
<-stopCh
pr.mux.Lock()
defer pr.mux.Unlock()
pr.stopped = true
pr.cond.Signal()
}()
ticker := pr.clock.NewTicker(progressRequestPeriod)
defer ticker.Stop()
for {
stopped := func() bool {
pr.mux.RLock()
defer pr.mux.RUnlock()
for pr.waiting == 0 && !pr.stopped {
pr.cond.Wait()
}
return pr.stopped
}()
if stopped {
return
}
select {
case <-ticker.C():
shouldRequest := func() bool {
pr.mux.RLock()
defer pr.mux.RUnlock()
return pr.waiting > 0 && !pr.stopped
}()
if !shouldRequest {
continue
}
err := pr.requestWatchProgress(ctx)
if err != nil {
klog.V(4).InfoS("Error requesting bookmark", "err", err)
}
case <-stopCh:
return
}
}
// RequestWatchProgress requests the a watch stream progress status be sent in the
// watch response stream as soon as possible.
// Used for monitor watch progress even if watching resources with no changes.
//
// If watch is lagging, progress status might:
// * be pointing to stale resource version. Use etcd KV request to get linearizable resource version.
// * not be delivered at all. It's recommended to poll request progress periodically.
//
// Note: Only watches with matching context grpc metadata will be notified.
// https://github.com/kubernetes/kubernetes/blob/9325a57125e8502941d1b0c7379c4bb80a678d5c/vendor/go.etcd.io/etcd/client/v3/watch.go#L1037-L1042
//
// TODO: Remove when storage.Interface will be separate from etc3.store.
// Deprecated: Added temporarily to simplify exposing RequestProgress for watch cache.
RequestWatchProgress(ctx context.Context) error
func (s *store) RequestWatchProgress(ctx context.Context) error {
// Use watchContext to match ctx metadata provided when creating the watch.
// In best case scenario we would use the same context that watch was created, but there is no way access it from watchCache.
return s.client.RequestProgress(s.watchContext(ctx))
}
@qwtsc assuming these are all enabled on the apiserver, do you have a reproducer that will make the correct client calls to trigger this on the server side? I see that there is a TODO/Deprecated note on the code in question in the apiserver, so I'm not quite sure how to hit that path with a client request.
@brandond I found a way to reproduce the watch-list request easily. Firstly, use kubectl proxy to bypass the authorization/certification issue.
kubectl proxy &
Then, use curl to create a watch-list request to apiserver, and please ensure you enabled watch-list feature in k8s apiserver.
curl 'http://localhost:8001/api/v1/namespaces/default/pods?allowWatchBookmarks=true&resourceVersionMatch=NotOlderThan&sendInitialEvents=true&timeoutSeconds=565&watch=true'
Now You can dig out what's going on under the hood.
@brandond I think this issue can be reopened, or recreate a new one to track the progress. I am still interested to enable k8s's watch-list with the support of kine.
Moved to another issue.
Ah, interesting. I believe that should be fixable.
Just to be clear, this is a
List
against the current resource type prefix - so the etcd client is expecting to see the latest revision of any key in that prefix, NOT the latest revision of the keystore as a whole.What is the expected behavior if there are no keys in that prefix? I guess I'll have to test.
Originally posted by @brandond in https://github.com/k3s-io/kine/issues/250#issuecomment-1847784858