numaproj / numaplane

Control Plane for Numaproj
Apache License 2.0
4 stars 3 forks source link

Not using Informer cache for Pipelines and ISBServices #95

Open juliev0 opened 1 week ago

juliev0 commented 1 week ago

Describe the bug

We made a decision that in Numaplane we don't need to define the Pipeline/ISBService specs - therefore we are using the Kubernetes Dynamic client to create/update/get resources.

It looks like currently, we are not using an Informer cache and instead are making multiple repeated calls to GET from the Kubernetes API directly.

Assuming that we stick with the decision to use dynamic client (which may first be worth a deep dive analysis), we can incorporate a Dynamic client Informer described here.

See slack thread.

Expected behavior When calling Get() for a given resource multiple times in a row when the resource hasn't changed, I shouldn't hit the K8S API every time. Instead, my resource should be updated asynchronously into my local cache.


Message from the maintainers:

Impacted by this bug? Give it a 👍. We often sort issues this way to know what to prioritize.

juliev0 commented 1 week ago

So, I've been trying to figure out 2 different possible ways to handle this:

  1. see if there's a way to make this work using the controller-runtime framework
  2. Imitate what Argo Workflows does with its wfInformer (Workflow informer), in which the Event Handlers (AddFunc, UpdateFunc, and DeleteFunc described in the article) basically add a Workflow to a queue

I thought it would be nice if we could do 1 if it's easy. After seeing the comment on watching unstructured objects here, I think it may work to change the existing Watch for Pipelines (as an example) to:

        u := &unstructured.Unstructured{}
    u.SetGroupVersionKind(schema.GroupVersionKind{
        Kind:    "Pipeline",
        Group:   "numaflow.numaproj.io",
        Version: "v1alpha1",
    })
    if err := controller.Watch(internal.Kind{Cache: mgr.GetCache(), Type: u},
        handler.EnqueueRequestForOwner(mgr.GetScheme(), mgr.GetRESTMapper(), &apiv1.PipelineRollout{}, handler.OnlyControllerOwner()),
        predicate.ResourceVersionChangedPredicate{}); err != nil {
        return err
    }

but if that does work, we still need to use this Informer cache in the actual calls to Get our child resource. Would it work to call r.client.Get() and pass in an unstructured.Unstructured{}instead?