microbean / microbean-kubernetes-controller

A toolset for writing Kubernetes controllers, or operators, in Java.
https://microbean.github.io/microbean-kubernetes-controller/
Apache License 2.0
20 stars 7 forks source link

Addition events for pre-existing resources #15

Open jkebinger opened 5 years ago

jkebinger commented 5 years ago

Thanks for writing this and the accompanying 11-part blog post! I'm new to kubernetes so this was a good primer. I was confused by addition events I got upon starting up a test controller for pods that already existed prior to the controller being started. From the code it looks like that's expected? If so, what would be the canonical way to properly ignore/filter those out in an event consumer? I spent some time to see if there's a way to provide a non-empty known objects map but I don't think that's what you've intended here right?

Here's my messy throwaway code

public static void main(String ... argz) throws InterruptedException, IOException {
    Config config = new ConfigBuilder().withMasterUrl("https://192.168.99.100:8443").build();
    KubernetesClient client = new DefaultKubernetesClient(config);
    LOG.info("current pods list {}", client.pods().list());
    Map<Object,Pod> knownPodjects = new HashMap<>();
    ResourceTrackingEventQueueConsumer<Pod> testConsumer = new ResourceTrackingEventQueueConsumer<Pod>(knownPodjects) {
      @Override
      protected void accept(AbstractEvent<? extends Pod> event) {
        LOG.info("Accepting {}", event);
      }
    };
    Controller<Pod> podController = new Controller<Pod>(client.pods().inNamespace("default"), knownPodjects, testConsumer);
    podController.start();
    LOG.info("Controller started");
    Thread.sleep(TimeUnit.DAYS.toMillis(1));

  }

and I get output like the following when calling start

2019-02-06 16:54:50.192 [pool-1-thread-1] INFO  c.h.j.k.o.service.TestLauncher - Accepting ADDITION: Pod(apiVersion=v1, kind=Pod, metadata=ObjectMeta(annotations=null, clusterName=null, creationTimestamp=2019-02-06T21:04:44Z, deletionGracePeriodSeconds=null, deletionTimestamp=null, finalizers=[], generateName=hello-minikube-6fd785d459-, generation=null, initializers=null, labels={pod-template-hash=6fd785d459, run=hello-minikube}, name=hello-minikube-6fd785d459-trqq5, namespace=default, ownerReferences=[OwnerReference(apiVersion=apps/v1, blockOwnerDeletion=true, controller=true, kind=ReplicaSet, name=hello-minikube-6fd785d459, uid=d3ffc20f-2a52-11e9-af67-0800272da21b, additionalProperties={})], resourceVersion=700, selfLink=/api/v1/namespaces/default/pods/hello-minikube-6fd785d459-trqq5, uid=d405fb60-2a52-11e9-af67-0800272da21b, additionalProperties={}), spec=PodSpec(activeDeadlineSeconds=null, affinity=null, automountServiceAccountToken=null, containers=[Container(args=[], command=[], env=[], envFrom=[], image=k8s.gcr.io/echoserver:1.10, imagePullPolicy=IfNotPresent, lifecycle=null, livenessProbe=null, name=hello-minikube, ports=[ContainerPort(containerPort=8080, hostIP=null, hostPort=null, name=null, protocol=TCP, additionalProperties={})], readinessProbe=null, resources=ResourceRequirements(limits=null, requests=null, additionalProperties={}), securityContext=null, stdin=null, stdinOnce=null, terminationMessagePath=/dev/termination-log, terminationMessagePolicy=File, tty=null, volumeDevices=[], volumeMounts=[VolumeMount(mountPath=/var/run/secrets/kubernetes.io/serviceaccount, mountPropagation=null, name=default-token-s47ww, readOnly=true, subPath=null, additionalProperties={})], workingDir=null, additionalProperties={})], dnsConfig=null, dnsPolicy=ClusterFirst, enableServiceLinks=true, hostAliases=[], hostIPC=null, hostNetwork=null, hostPID=null, hostname=null, imagePullSecrets=[], initContainers=[], nodeName=minikube, nodeSelector=null, priority=0, priorityClassName=null, readinessGates=[], restartPolicy=Always, runtimeClassName=null, schedulerName=default-scheduler, securityContext=PodSecurityContext(fsGroup=null, runAsGroup=null, runAsNonRoot=null, runAsUser=null, seLinuxOptions=null, supplementalGroups=[], sysctls=[], additionalProperties={}), serviceAccount=default, serviceAccountName=default, shareProcessNamespace=null, subdomain=null, terminationGracePeriodSeconds=30, tolerations=[Toleration(effect=NoExecute, key=node.kubernetes.io/not-ready, operator=Exists, tolerationSeconds=300, value=null, additionalProperties={}), Toleration(effect=NoExecute, key=node.kubernetes.io/unreachable, operator=Exists, tolerationSeconds=300, value=null, additionalProperties={})], volumes=[Volume(awsElasticBlockStore=null, azureDisk=null, azureFile=null, cephfs=null, cinder=null, configMap=null, downwardAPI=null, emptyDir=null, fc=null, flexVolume=null, flocker=null, gcePersistentDisk=null, gitRepo=null, glusterfs=null, hostPath=null, iscsi=null, name=default-token-s47ww, nfs=null, persistentVolumeClaim=null, photonPersistentDisk=null, portworxVolume=null, projected=null, quobyte=null, rbd=null, scaleIO=null, secret=SecretVolumeSource(defaultMode=420, items=[], optional=null, secretName=default-token-s47ww, additionalProperties={}), storageos=null, vsphereVolume=null, additionalProperties={})], additionalProperties={}), status=PodStatus(conditions=[PodCondition(lastProbeTime=null, lastTransitionTime=2019-02-06T21:04:44Z, message=null, reason=null, status=True, type=Initialized, additionalProperties={}), PodCondition(lastProbeTime=null, lastTransitionTime=2019-02-06T21:04:55Z, message=null, reason=null, status=True, type=Ready, additionalProperties={}), PodCondition(lastProbeTime=null, lastTransitionTime=2019-02-06T21:04:55Z, message=null, reason=null, status=True, type=ContainersReady, additionalProperties={}), PodCondition(lastProbeTime=null, lastTransitionTime=2019-02-06T21:04:44Z, message=null, reason=null, status=True, type=PodScheduled, additionalProperties={})], containerStatuses=[ContainerStatus(containerID=docker://ad6050e94bed55c9830d7105d2a14a937b0ef739d7d75a0311dd0a8cfbf0a794, image=k8s.gcr.io/echoserver:1.10, imageID=docker-pullable://k8s.gcr.io/echoserver@sha256:cb5c1bddd1b5665e1867a7fa1b5fa843a47ee433bbb75d4293888b71def53229, lastState=ContainerState(running=null, terminated=null, waiting=null, additionalProperties={}), name=hello-minikube, ready=true, restartCount=0, state=ContainerState(running=ContainerStateRunning(startedAt=2019-02-06T21:04:54Z, additionalProperties={}), terminated=null, waiting=null, additionalProperties={}), additionalProperties={})], hostIP=10.0.2.15, initContainerStatuses=[], message=null, nominatedNodeName=null, phase=Running, podIP=172.17.0.4, qosClass=BestEffort, reason=null, startTime=2019-02-06T21:04:44Z, additionalProperties={}), additionalProperties={})

Does there need to be a way to get a controller "up to speed" with the state of the world before it starts processing net-new events as deltas upon that state?

ljnelson commented 5 years ago

I believe (if I remember right!) that you want to test if the AbstractEvent is an instance of SynchronizationEvent. If it is, then it's an event that is getting the controller "up to speed". If it's not, then it's just an Event, and it is a true event.

For various reasons you don't want to be too reliant on the presence of Events to signal that something has happened if you can help it. You want to inspect the state of the cache instead (the knownObjects map). Having said that for most situations you can probably react to the event itself directly.

The analogous Go code refers to this state as cache sync. I seem to recall I actually fire a Java beans property when this happens, but it's unclear exactly how you'd make use of that here, to be fair.

jkebinger commented 5 years ago

Thank you for the answer, checking for SynchonizationEvent will help filter out some noise there.

When you say

you don't want to be too reliant on the presence of Events to signal that something has happened

is the preferred pattern to just poll the cache entries periodically? If the presence of Events aren't reliable, why would the cache, which is being updated from events if I understand this correctly, be more reliable?

Just discovered awaitEventCacheSynchronization method which I hadn't noticed which is handy. An analogous callback on ResourceTrackingEventQueueConsumer might be nice so that object could manage both time and event based updates itself?

ljnelson commented 5 years ago

(Bear in mind the Go code is the authoritative answer; I've just tried to translate it into Java idiomatically.)

Remember that if you set up a synchronization interval, then every so often the cache gets repopulated in its entirety via a list operation.

I think the salient point here is when an event comes in, you don't just grab the data from the event, but you ask the cache for the relevant data.

Regarding awaitEventCacheSynchronization, I'll look to see if it can be exposed elsewhere.

xguerin commented 5 years ago

I was dealing with this by checking timestamps, but using the SynchronizationEvent trait is probably better. I'll check that out.

jkebinger commented 5 years ago

Might be cool to get events posted when the timed synchronization has happened too?

ljnelson commented 5 years ago

For background on the patterns involved here, have a look at this: https://stackoverflow.com/questions/31041766/what-does-edge-based-and-level-based-mean

So Kubernetes in general is biased towards level-based. That is, rather than relying on the reception of an event (in general), Kubernetes internals always check the "level" of the system to verify if it matches what any given event or stopwatch expiration might claim it is supposed to be.

I am sure there is more work to do in this area in my framework.

For example, one of the things I'd like to do in the abstract is to tell the event broadcasting mechanism to not fire at all until awaitEventCacheSynchronization has happened. This is perhaps less important in this project than it is in the microbean-kubernetes-controller-cdi derivative.