kcp-dev / kcp

Kubernetes-like control planes for form-factors and use-cases beyond Kubernetes and container workloads.
https://kcp.io
Apache License 2.0
2.35k stars 381 forks source link

bug: Patched annotations by admission controller get removed by KCP #3182

Open entigo-mart-erlenheim opened 1 week ago

entigo-mart-erlenheim commented 1 week ago

Describe the bug

We have an admission controller that mutates our custom objects. The admission controller adds 1 annotation and 1 label with JsonPatch. Problem is that the label gets added successfully to the object but the annotation doesn't. I discovered that if I manually add a kcp.io/cluster annotation to the created object then our custom annotation also gets added successfully. I'm suspecting that the process which adds the kcp.io/cluster annotation. Note, annotations that have been manually added to the object aren't affected by this bug.

Steps To Reproduce

  1. Create a mutating admission controller that returns annotation patches. For example:
    pt := admission.PatchTypeJSONPatch
    patch := []byte(`[{ "op": "add", "path": "/metadata/annotations/foo", "value": "bar" }]`)
    responseAdmissionReview := &admission.AdmissionReview{}
    ...
    responseAdmissionReview.Response = &admission.AdmissionResponse{Allowed: true, PatchType: &pt, Patch: patch}
    ...
  2. Register the controller with MutatingWebhookConfiguration.
  3. Create a custom object that matches the configured webhook rule.
  4. Observe that the only annotations for the object are:
    kcp.io/cluster: 2go2ya0dhfrlvb9f
    kubectl.kubernetes.io/last-applied-configuration: "..."
  5. Delete the created object.
  6. Manually add the kcp.io/cluster annotation to the object yaml, value has to match the actual cluster:
    annotations:
    kcp.io/cluster: 2go2ya0dhfrlvb9f
  7. Re-create the object.
  8. Observe that the custom foo annotation got added successfully:
    foo: bar
    kcp.io/cluster: 2go2ya0dhfrlvb9f
    kubectl.kubernetes.io/last-applied-configuration: "..."

Expected Behaviour

Admission controller should be able to successfully add the custom annotation.

Additional Context

I'm not 100% sure that this is caused by KCP but the admission controller does work as expected when used in a local kubernetes cluster without KCP. I have verified that the behaviour described above at least happens in our KCP.

KCP version: v0.26.0

embik commented 1 day ago

Hi @entigo-mart-erlenheim, thank you for your report! Do you happen to have a minimal implementation of said webhook that you can share with us so we can run it? If not, no problem, we'll be able to test for reproducibility, it would just save us some time.

entigo-mart-erlenheim commented 1 day ago

@embik Thank you for looking into this problem. I tried putting together a minimal controller that mutates annotations and labels. Unfortunately, I don't currently have time to test this in a KCP cluster, but I validated the functionality in a regular cluster. This controller accepts TLS key and cert with a flag or an environment variable. Optionally, it's possible to create a small nginx proxy to serve the https requests. Inside our KCP deployment we have a centralized workspace for api export where we applied the webhook configuration. Make sure that the server or proxy is running on the port that is specified by the webhook configuration.

package main

import (
    "context"
    "errors"
    "flag"
    "fmt"
    "io"
    admission "k8s.io/api/admission/v1"
    corev1 "k8s.io/api/core/v1"
    "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
    "k8s.io/apimachinery/pkg/runtime"
    "k8s.io/apimachinery/pkg/runtime/serializer"
    "k8s.io/klog/v2"
    "net/http"
    "os"
    "os/signal"
    "strings"
    "syscall"
    "time"

    "encoding/json"
)

var (
    runtimeScheme = runtime.NewScheme()
    codecFactory  = serializer.NewCodecFactory(runtimeScheme)
    deserializer  = codecFactory.UniversalDeserializer()
)

// add kind AdmissionReview in scheme
func init() {
    _ = corev1.AddToScheme(runtimeScheme)
    _ = admission.AddToScheme(runtimeScheme)
}

// serve handles the http portion of a request prior to handing to an admit
// function
func serve(w http.ResponseWriter, r *http.Request) {
    var body []byte
    if r.Body != nil {
        if data, err := io.ReadAll(r.Body); err == nil {
            body = data
        }
    }

    contentType := r.Header.Get("Content-Type")
    if contentType != "application/json" {
        klog.Errorf("contentType=%s, expect application/json", contentType)
        return
    }

    klog.Infof("handling request: %s", body)
    var responseObj runtime.Object
    if obj, gvk, err := deserializer.Decode(body, nil, nil); err != nil {
        msg := fmt.Sprintf("Request could not be decoded: %v", err)
        klog.Error(msg)
        http.Error(w, msg, http.StatusBadRequest)
        return
    } else {
        requestedAdmissionReview, ok := obj.(*admission.AdmissionReview)
        if !ok {
            klog.Errorf("Expected v1.AdmissionReview but got: %T", obj)
            return
        }
        object := unstructured.Unstructured{}
        if _, _, err := deserializer.Decode(requestedAdmissionReview.Request.Object.Raw, nil, &object); err != nil {
            klog.Errorf("Could not decode object: %v", err)
            return
        }

        responseAdmissionReview := &admission.AdmissionReview{}
        responseAdmissionReview.SetGroupVersionKind(*gvk)
        pt := admission.PatchTypeJSONPatch
        patches := getPatches(object)
        responseAdmissionReview.Response = &admission.AdmissionResponse{Allowed: true, PatchType: &pt, Patch: patches}
        responseAdmissionReview.Response.UID = requestedAdmissionReview.Request.UID
        responseObj = responseAdmissionReview
    }
    klog.Infof("sending response: %v", responseObj)
    respBytes, err := json.Marshal(responseObj)
    if err != nil {
        klog.Error(err)
        http.Error(w, err.Error(), http.StatusInternalServerError)
        return
    }
    w.Header().Set("Content-Type", "application/json")
    if _, err := w.Write(respBytes); err != nil {
        klog.Error(err)
    }
}

func getPatches(object unstructured.Unstructured) []byte {
    var patches []string
    if object.GetLabels() == nil {
        patches = append(patches, `{ "op": "add", "path": "/metadata/labels", "value": {} }`)
    }
    patches = append(patches, `{ "op": "add", "path": "/metadata/labels/foo", "value": "bar" }`)
    if object.GetAnnotations() == nil {
        patches = append(patches, `{ "op": "add", "path": "/metadata/annotations", "value": {} }`)
    }
    patches = append(patches, `{ "op": "add", "path": "/metadata/annotations/foo", "value": "bar" }`)
    return []byte(fmt.Sprintf(`[%s]`, strings.Join(patches, ",")))
}

func main() {
    terminated := make(chan os.Signal, 1)
    signal.Notify(terminated, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)

    tlsKey := os.Getenv("TLS_KEY")
    tlsCert := os.Getenv("TLS_CERT")
    port := 8081
    flag.StringVar(&tlsKey, "tlsKey", tlsKey, "Path to the TLS key")
    flag.StringVar(&tlsCert, "tlsCert", tlsCert, "Path to the TLS certificate")
    flag.Parse()
    http.HandleFunc("/mutate", serve)
    http.HandleFunc("/readyz", func(w http.ResponseWriter, req *http.Request) { w.Write([]byte("ok")) })
    httpServer := &http.Server{Addr: fmt.Sprintf(":%d", port)}

    go func() {
        if tlsKey == "" && tlsCert == "" {
            klog.Infof("starting server on port %d\n", port)
            if err := httpServer.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
                klog.Fatalf("Could not listen on %d: %v\n", port, err)
            }
        } else {
            klog.Infof("starting TLS server on port %d\n", port)
            if err := httpServer.ListenAndServeTLS(tlsKey, tlsCert); err != nil && !errors.Is(err, http.ErrServerClosed) {
                klog.Fatalf("Could not listen on %d: %v\n", port, err)
            }
        }
    }()

    <-terminated
    stopServer(httpServer)
}

func stopServer(srv *http.Server) {
    klog.Info("server shutting down")
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()
    if err := srv.Shutdown(ctx); err != nil {
        klog.Fatalf("server Shutdown: %+v", err)
    }
}

Example nginx proxy conf, where conf is mounted to /etc/nginx/nginx.conf and certs to /etc/nginx/certs

events {
    worker_connections 1024;
}

http {
    server {
        listen 8443 ssl;
        server_name devportal.localhost;

        ssl_certificate /etc/nginx/certs/web.crt;
        ssl_certificate_key /etc/nginx/certs/web.key;

        location / {
            proxy_pass http://host.docker.internal:8081/;
            proxy_set_header Host $host;
            proxy_set_header X-Real-IP $remote_addr;
            proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
            proxy_set_header X-Forwarded-Proto $scheme;
        }
    }
}

Example webhook configuration that calls the controller service for configmaps:

apiVersion: admissionregistration.k8s.io/v1
kind: MutatingWebhookConfiguration
metadata:
  name: development-portal-mutation
webhooks:
  - name: "mutateannotation.default.svc"
    rules:
      - operations: [ "CREATE", "UPDATE" ]
        apiGroups: [ "" ]
        apiVersions: [ "v1" ]
        resources: [ "configmaps" ]
        scope: "*"
    clientConfig:
      service:
        namespace: default
        name: webhook-server
        path: "/mutate"
        port: 8443
      caBundle: ${ENCODED_CA}
    admissionReviewVersions: ["v1"]
    sideEffects: None

Please let me know if I should provide anything else.