Closed tomasaschan closed 2 years ago
Ooops - thanks for reporting!
This should be a relatively simple fix - I forgot a mutex to guard lastRW. However, the reason I forgot that mutex is that I thought that there should only be a single goroutine, but in fact there can be several if we are watching multiple types. And because of that, lastRW should also be keyed by kind (or each kind needs its own map).
I can send a fix in an hour or two... Adding a mutex is probably sufficient to avoid the crashes, but we/I should fix the map keying also.
Thanks!
We have a local fork of this repo where we've added a few other things (most notably a POC implementation of #191, but also #192 and a helper AddError
on CommonObjects
that adds an error string to the status if it's not already present; I'll probably submit PRs back here for most/all of those here eventually 😄).
In there, I did this to just make the map thread-safe; do you think it will be sufficient for now? I think we won't have any resources with equal namespace/name but different kinds (yet).
diff --git a/pkg/patterns/declarative/pkg/watch/dynamic.go b/pkg/patterns/declarative/pkg/watch/dynamic.go
index 5f69575..537d618 100644
--- a/pkg/patterns/declarative/pkg/watch/dynamic.go
+++ b/pkg/patterns/declarative/pkg/watch/dynamic.go
@@ -19,6 +19,7 @@ package watch
import (
"context"
"fmt"
+ "sync"
"time"
"k8s.io/apimachinery/pkg/api/meta"
@@ -56,7 +57,7 @@ type dynamicWatch struct {
// lastRV caches the last reported resource version.
// This helps us avoid sending duplicate events (e.g. on a rewatch)
- lastRV map[types.NamespacedName]string
+ lastRV sync.Map
}
func (dw *dynamicWatch) newDynamicClient(gvk schema.GroupVersionKind, namespace string) (dynamic.ResourceInterface, error) {
@@ -79,7 +80,7 @@ func (dw *dynamicWatch) Add(trigger schema.GroupVersionKind, options metav1.List
return fmt.Errorf("creating client for (%s): %v", trigger.String(), err)
}
- dw.lastRV = make(map[types.NamespacedName]string)
+ dw.lastRV = sync.Map{}
go func() {
for {
@@ -140,14 +141,14 @@ func (dw *dynamicWatch) watchUntilClosed(client dynamic.ResourceInterface, trigg
switch clientEvent.Type {
case watch.Deleted:
// stop lastRV growing indefinitely
- delete(dw.lastRV, key)
+ dw.lastRV.Delete(key)
// We always send the delete notification
case watch.Added, watch.Modified:
- if previousRV, found := dw.lastRV[key]; found && previousRV == rv {
+ if previousRV, found := dw.lastRV.Load(key); found && previousRV == rv {
// Don't send spurious invalidations
continue
}
- dw.lastRV[key] = rv
+ dw.lastRV.Store(key, rv)
}
log.WithValues("type", clientEvent.Type).WithValues("kind", trigger.String()).WithValues("name", key.Name, "namespace", key.Namespace).Info("broadcasting event")
Thanks - looking forward to those PRs :-)
And yes, that should be sufficient. This is "just" an optimization, and resourceVersions are (usually) globally unique, so as long as we're not crashing it's an improvement!
I'll file this one right away (it was easy enough to put in place!). The others will come once we've been using them for a little while so we can make breaking changes to the APIs etc if we realize ways to make stuff better.
We're seeing a
panic
ondw.lastRV[key] = rv
with the following stack trace:It seems to happen every time our operator starts, but for different keys every time. I don't grok what goes on under the hood well enough to understand what different paths could be updating this simultaneously; what am I looking for in troubleshooting this? Or is it just a case of having too many resources? (This operator is looking at 1000+ instances of the CR, which each creates a couple of
RoleBinding
s and anIAMPartialPolicy
in the namespace where the CR is, plus aRoleBinding
in a common namespace.)If there's anything I can help out with to contribute a workaround/fix here, please let me know.