lyft / flinkk8soperator

Kubernetes operator that provides control plane for managing Apache Flink applications
Apache License 2.0
568 stars 157 forks source link

bad state after delete issued #119

Open andrewgdavis opened 5 years ago

andrewgdavis commented 5 years ago

similar issue to https://github.com/lyft/flinkk8soperator/issues/13

basically, there was an access issue to the save point location (i was using S3), and after seeing the logs that something was wrong kubectl delete -f myFlinkApp.yaml

however the jobs would not delete, and the following error message is looped infinitely: {"json":{"app_name":"davis-wordcount-operator-example","ns":"flink-operator","phase":"Deleting"},"level":"info","msg":"Logged Warning event: SavepointFailed: Failed to take savepoint {java.util.concurrent.CompletionException java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: org.apache.flink.runtime.checkpoint.CheckpointException: Failed to trigger savepoint. Failure reason: Not all required tasks are currently running.\n\tat org.apache.flink.runtime.scheduler.LegacyScheduler.lambda$triggerSavepoint$0(LegacyScheduler.java:510)\n\tat java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)\n\tat java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)\n\tat java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)\n\tat org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)\n\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)\n\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)\n\tat scala.PartialFunction.applyOrElse(PartialFunction.scala:123)\n\tat scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)\n\tat akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)\n\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)\n\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)\n\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)\n\tat akka.actor.Actor.aroundReceive(Actor.scala:517)\n\tat akka.actor.Actor.aroundReceive$(Actor.scala:515)\n\tat akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)\n\tat akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)\n\tat akka.actor.ActorCell.invoke(ActorCell.scala:561)\n\tat akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)\n\tat akka.dispatch.Mailbox.run(Mailbox.scala:225)\n\tat akka.dispatch.Mailbox.exec(Mailbox.scala:235)\n\tat akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)\n\tat akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)\n\tat akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)\n\tat akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)\nCaused by: java.util.concurrent.CompletionException: org.apache.flink.runtime.checkpoint.CheckpointException: Failed to trigger savepoint. Failure reason: Not all required tasks are currently running.\n\tat java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)\n\tat java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)\n\tat java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)\n\tat java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:614)\n\tat java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1983)\n\tat org.apache.flink.runtime.scheduler.LegacyScheduler.triggerSavepoint(LegacyScheduler.java:504)\n\tat org.apache.flink.runtime.jobmaster.JobMaster.triggerSavepoint(JobMaster.java:638)\n\tat sun.reflect.GeneratedMethodAccessor113.invoke(Unknown Source)\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.lang.reflect.Method.invoke(Method.java:498)\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)\n\t... 22 more\nCaused by: org.apache.flink.runtime.checkpoint.CheckpointException: Failed to trigger savepoint. Failure reason: Not all required tasks are currently running.\n\tat org.apache.flink.runtime.checkpoint.CheckpointCoordinator.triggerSavepointInternal(CheckpointCoordinator.java:428)\n\tat org.apache.flink.runtime.checkpoint.CheckpointCoordinator.triggerSavepoint(CheckpointCoordinator.java:377)\n\tat org.apache.flink.runtime.scheduler.LegacyScheduler.triggerSavepoint(LegacyScheduler.java:503)\n\t... 28 more\n}","ts":"2019-10-10T21:16:34Z"}

it may be nice to have pre-emptive test, or configurable number of retrys on failure.


after manually issuing kubectl delete deployment $nameOfFlinkAppDeployment the flinkoperator gets into a bad state trying to manage the jobs. issuing kubectl delete flinkapplication $nameOfFlinkApp results in a failure message in the operator: {"json":{"app_name":"davis-wordcount-operator-example","ns":"flink-operator","phase":"Deleting"},"level":"warning","msg":"Failed to reconcile resource flink-operator/davis-wordcount-operator-example: GetJobs call failed with status FAILED and message []: Get http://davis-wordcount-operator-example-0a501337.flink-operator:8081/jobs: net/http: request canceled while waiting for connection (Client.Timeout exceeded while awaiting headers)","ts":"2019-10-10T21:26:47Z"} The workaround was to delete the finalizer when kubectl edit flinkapplication $nameOfFlinkApp

anandswaminathan commented 5 years ago

@andrewgdavis

This is the default setting. We try to savepoint the job before bringing the cluster down. Please set the deleteMode to None if you want deletion to go through without savepointing - https://github.com/lyft/flinkk8soperator/blob/master/docs/crd.md

andrewgdavis commented 5 years ago

Savepointing was desired; it just happened to fail, which in turn put the operator into a bad state.

anandswaminathan commented 5 years ago

@andrewgdavis I missed the last line in your issue.

We already have it. You can configure by setting here: https://github.com/lyft/flinkk8soperator/blob/master/pkg/controller/config/config.go#L26