fabric8io / kubernetes-client

Java client for Kubernetes & OpenShift
http://fabric8.io
Apache License 2.0
3.42k stars 1.46k forks source link

Not receiving watcher events for Job sometimes #2323

Closed flavioschuindt closed 4 years ago

flavioschuindt commented 4 years ago

Hi, guys,

Sometimes I am experiencing an issue where I never get the job events. Like, 99% of the cases I get the job created, job completed events, etc, but 1% of them I see the job has been created in the cluster alongside with its pods, but I don’t get any event in the watcher. No job created event, no job completed event etc. Anyone had this issue before? I'm using 4.10.1.

The code for watcher is:

public class JobWatcher extends AbstractWatcher<Job> {
@Override
    public void eventReceived(Action action, Job job) {
        final String jobName = job.getMetadata().getName();
        if (action == Action.ADDED) {
            log.info("Job {} started.", jobName);
        } else if (action == Action.MODIFIED) {
            log.info("Job {} modified.", jobName);
        }
}

I removed a lot of the code in this eventReceived as it has IP. In any case, I would expect every time the line log.info("Job {} started.", jobName); to be executed, but like I said, I see it 99% of the times, but then suddenly it stops to work. Also, in the moment it stops, it never come back again, unless I restarted my app.

The code where I watch the job to be executed in the cluster:

client.batch()
                  .jobs()
                  .inNamespace(job.getMetadata().getNamespace())
                  .withName(job.getMetadata().getName())
                  .watch(new JobWatcher(taskMetadata, remoteService, mapper, callbackUrl));

Thank you!

rohanKanojia commented 4 years ago

Do you get any error when watch stops to work? Could this be due to #1529 (client not connecting when 410 is received when watch gets outdated due to too old resource version)?

flavioschuindt commented 4 years ago

No, @rohanKanojia. I don't see any error on my console output. Note that just after the code for watch that I shared, I have a piece of code that effectively creates the job in the cluster. This always works. So, I don't think I am losing connection to cluster or something like this. Also, I don't think I am getting a socket disconnection because I am not getting even the simple job created event. Effectively, in a socket disconnection scenario, I would expect the basic events (job started for example) to be received and then suddenly I would see a stack trace from fabric8 Java client saying timeout in the socket. I checked in the other issues that you mentioned, but couldn't understand what you mean by resource Version old.

rohanKanojia commented 4 years ago

By old resource version, I mean it's outdated in kubernetes storage. You can checkout kubernetes docs also[0]

A given Kubernetes server will only preserve a historical list of changes for a limited time. Clusters using etcd3 preserve changes in the last 5 minutes by default. When the requested watch operations fail because the historical version of that resource is not available, clients must handle the case by recognizing the status code 410 Gone, clearing their local cache, performing a list operation, and starting the watch from the resourceVersion returned by that new list operation

Could you please try out using informers instead of watch for watching jobs and see if it's able to resolve this issue?

[0] https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes

flavioschuindt commented 4 years ago

Ok, @rohanKanojia, couple of questions:

1) Maybe this is a stupid question, but as the SharedInformer is pretty new to me, in which sense it differs from the watcher? What I could understood is that it list and watch. But isn't the Watcher doing the same? As far as I understood, the watcher keeps pooling the API Server.

2) I tried following the namespaced example for Job instead of Pod in a simple PoC here and I keep getting NullPointerException. Essentially, the NullPointerException happens in the OperationSupport class, line 114, here:

return !Utils.isNullOrEmpty(this.apiGroupName) ? new URL(URLUtils.join(new String[]{this.config.getMasterUrl().toString(), "apis", this.apiGroupName, this.apiGroupVersion})) : new URL(URLUtils.join(new String[]{this.config.getMasterUrl().toString(), "api", this.apiGroupVersion}));

this.config is null and then the NullPointerException happens at getMasterUrl(). The client is the DefaultKubernetesClient() based on my kube config. My code:

client = new DefaultKubernetesClient();
InputStream resourceis = new ClassPathResource("test.yaml").getInputStream(); //test.yaml is a simple "Kind: Job" object in K8s
ParameterNamespaceListVisitFromServerGetDeleteRecreateWaitApplicable<HasMetadata, Boolean> resources = client.load(resourceis);
SharedInformerFactory sharedInformerFactory = client.informers();
SharedIndexInformer<Job> podInformer = sharedInformerFactory.sharedIndexInformerFor(
                Job.class,
                JobList.class,
                new OperationContext().withNamespace("test"),
                10 * 1000L);

podInformer.addEventHandler(new JobInformer());
sharedInformerFactory.startAllRegisteredInformers();
client.resourceList(modified).createOrReplace();

The JobInformer class:

public class JobInformer implements ResourceEventHandler<Job> {
    @Override
    public void onAdd(Job job) {
        System.out.println("Job " + job.getMetadata().getName() + " got added");
    }

    @Override
    public void onUpdate(Job oldJob, Job newJob) {
        System.out.println("Job " + oldJob.getMetadata().getName() + " got updated");
    }

    @Override
    public void onDelete(Job job, boolean deletedFinalStateUnknown) {
        System.out.println("Job " + job.getMetadata().getName() + " got deleted");
    }
}

Thank you for your help.

rohanKanojia commented 4 years ago

Strange, I tried out a simple JobInformer before suggesting it to you and it worked for me on Friday:

        try (KubernetesClient client = new DefaultKubernetesClient()) {
            // Get Informer Factory
            SharedInformerFactory sharedInformerFactory = client.informers();

            // Create an instance for Job Informer
            SharedIndexInformer<Job> jobSharedIndexInformer = sharedInformerFactory.sharedIndexInformerFor(Job.class, JobList.class,
                    30 * 1000L);
            logger.info("Informer factory initialized.");

            // Add Event Handler for actions on all Job events received
            jobSharedIndexInformer.addEventHandler(
                    new ResourceEventHandler<>() {
                        @Override
                        public void onAdd(Job job) {
                            logger.info("Job " + job.getMetadata().getName() + " got added");
                        }

                        @Override
                        public void onUpdate(Job oldJob, Job newJob) {
                            logger.info("Job " + oldJob.getMetadata().getName() + " got updated");
                        }

                        @Override
                        public void onDelete(Job job, boolean deletedFinalStateUnknown) {
                            logger.info("Job " + job.getMetadata().getName() + " got deleted");
                        }
                    }
            );

            logger.info("Starting all registered informers");
            sharedInformerFactory.startAllRegisteredInformers();

            // Wait for 1 minute
            Thread.sleep(60 * 1000L);
        }

Regarding the difference between watch and informer API, I've written a short blog about it. Maybe it can be helpful for you: https://medium.com/@rohaan/introduction-to-fabric8-kubernetes-java-client-informer-api-b945082d69af

flavioschuindt commented 4 years ago

Made some progress here, @rohanKanojia.

This works if I target PODs (which is not what I am interested):

try (KubernetesClient client = new DefaultKubernetesClient()) {
    // Get Informer Factory
    SharedInformerFactory sharedInformerFactory = client.informers();

    // Create an instance for Job Informer
    SharedIndexInformer<Pod> jobSharedIndexInformer = sharedInformerFactory.sharedIndexInformerFor(Pod.class,
            PodList.class,
            30 * 1000L);
    System.out.println("Informer factory initialized.");

    // Add Event Handler for actions on all Job events received
    jobSharedIndexInformer.addEventHandler(
            new ResourceEventHandler<Pod>() {
                @Override
                public void onAdd(Pod pod) {
                    System.out.println("Pod " + pod.getMetadata().getName() + " got added");
                }

                @Override
                public void onUpdate(Pod oldPod, Pod newPod) {
                    System.out.println("Pod " + oldPod.getMetadata().getName() + " got updated");
                }

                @Override
                public void onDelete(Pod pod, boolean deletedFinalStateUnknown) {
                    System.out.println("Pod " + pod.getMetadata().getName() + " got deleted");
                }
            }
    );

    System.out.println("Starting all registered informers");
    sharedInformerFactory.startAllRegisteredInformers();

    // Wait for 1 minute
    Thread.sleep(60 * 1000L);
}

The above code fails if I remove the Thread.sleep:

java.util.concurrent.RejectedExecutionException: Task okhttp3.RealCall$AsyncCall@61c1272f rejected from java.util.concurrent.ThreadPoolExecutor@483cb2d0[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]

For Job:

Failure executing: GET at: https://192.168.99.101:6443/api/v1/namespaces/test/jobs. Message: the server could not find the requested resource. Received status: Status(apiVersion=v1, code=404, details=StatusDetails(causes=[], group=null, kind=null, name=null, retryAfterSeconds=null, uid=null, additionalProperties={}), kind=Status, message=the server could not find the requested resource, metadata=ListMeta(_continue=null, remainingItemCount=null, resourceVersion=null, selfLink=null, additionalProperties={}), reason=NotFound, status=Failure, additionalProperties={})

Which Kubernetes version you tested? I think the issue with the non namespaced SharedInformer for Job is the fact that the library is trying to call /jobs and it should go to the batch API (/batch) as I am using Kubernetes 1.13. For example, if I open a 8080 on my localhost by using kubectl proxy to target the cluster, this works: curl http://localhost:8080/apis/batch/v1/namespaces/test/jobs and this http://localhost:8080/api/v1/namespaces/test/jobs returns the same 404 as in the library:

curl http://localhost:8080/api/v1/namespaces/test/jobs
{
  "kind": "Status",
  "apiVersion": "v1",
  "metadata": {

  },
  "status": "Failure",
  "message": "the server could not find the requested resource",
  "reason": "NotFound",
  "details": {

  },
  "code": 404
}

Could you confirm it please?

Btw, the blog gives cool information! Thanks for sharing.

rohanKanojia commented 4 years ago

I tested it on minikube v1.6.1 with Kubernetes version v1.17.0

rohanKanojia commented 4 years ago

@flavioschuindt : Hi, are you still facing this issue? Could you please upgrade to 4.10.3 and see if it resolves your issue? I tried again and it seemed to work for me. Could you please see if you're able to run it?

https://github.com/rohanKanojia/kubernetes-client-demo/blob/master/src/main/java/io/fabric8/JobInformer.java

flavioschuindt commented 4 years ago

Hey @rohanKanojia, only had a chance to look at this today.

So, looks like we have some good progress here. As per your suggestion, I tried with 4.10.3 and I can successfully use namespaced SharedInformer for Job. All events look like is working so far. I was checking the 4.10.2 release notes and looks like this PR is the one that fixes the namespaced issue.

So, from now on, seems to be good and now I need to port this small PoC to real code in our product and observe if I face the similar issues that I was having with the Watcher. I'll let you know in any case.

Just one more question about the informers. What happens if, let's say for example, my informer crashes and it restarts later. Fabric8 fetchs all resourceVersions changes again? Asking this because I was implementing some custom operator using the Java Operator SDK and the resourceVersions are always fecthed from the beginning, all the available one, as they don't persist state. So, just wondering here, what would be the design for the informers in fabric8.

Thank you once again for the inputs!

rohanKanojia commented 4 years ago

Informers underneath based on list + watch after resync period, I think with Informers also you should get fresh resourceVersions from the cluster for everything that currently exists. I think if you don't get resourceVersion from the beginning, you might be able to miss deletion events since Kubernetes won't be able to tell you about the resources which no longer exist in the cluster(since their resourceVersion has been changed).

flavioschuindt commented 4 years ago

I have just did an exploratory analysis here.

So, with a simple Job that contains only one container that keeps in loop forever:

For Watcher:

For Informers:

So, looks like the resyncPeriod in Informer takes care of this. Indeed, better.

You can close this issue for now. If I face any issue with informer losign track of the events, I can reopen. Thank you.

rktechie commented 2 years ago

@rohanKanojia @flavioschuindt

Related to the comment https://github.com/fabric8io/kubernetes-client/issues/2323#issuecomment-666761790, will informer also receive notifications of "delete" requests which happened when the informer was not running?

shawkins commented 2 years ago

If an informer watch goes down for a while, when the informer reestablishes it it will first obtain a full list of the existing resources. Anything in the cache that no longer exists will get removed and a delete event produced - with the final state unknown flag as true.