lyft / flinkk8soperator

Kubernetes operator that provides control plane for managing Apache Flink applications
Apache License 2.0
569 stars 159 forks source link

Application not starting or showing state #154

Open lucaspg96 opened 4 years ago

lucaspg96 commented 4 years ago

Hi! I'm trying to deploy a flink application using the newest operator image (lyft/flinkk8soperator:7fc2230b36ba1d9ee4f78622a56e183abce13be1). The operator deployment is the following:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: flinkoperator
  namespace: test
  labels:
    app: flinkoperator
spec:
  replicas: 1
  selector:
    matchLabels:
      app: flinkoperator
  template:
    metadata:
      labels:
    app: flinkoperator
        app.kubernetes.io/version: 0.2.0
    spec:
      serviceAccountName: flinkoperator
      volumes:
      - name: config-volume
        configMap:
          name: flink-operator-config
          items:
          - key: config
            path: config.yaml
      containers:
      - name: flinkoperator-gojson
        image: lyft/flinkk8soperator:7fc2230b36ba1d9ee4f78622a56e183abce13be1
        command:
        - flinkoperator
        args:
    - --logtostderr
        - --config
        - /etc/flinkoperator/config*/config.yaml
        env:
    - name: OPERATOR_NAME
          value: flinkk8soperator
        imagePullPolicy: IfNotPresent
        ports:
          - containerPort: 10254
        resources:
          requests:
            memory: "2Gi"
            cpu: "4"
          limits:
            memory: "8G"
            cpu: "8"
        volumeMounts:
        - name: config-volume
          mountPath: /etc/flinkoperator/config

I have a simple application that only reads the lines from a file and write them into a kafka topic. The application deployment is the following:

apiVersion: flink.k8s.io/v1beta1
kind: FlinkApplication
metadata:
  name: data-producer
  namespace: test
  annotations:
  labels:
    app: data-producer
    environment: test

spec:
  image: my.registry.com/flink-pipeline:0.2.0
  imagePullPolicy: Always
  imagePullSecrets:
    - name: "gitlab-registry"
  flinkConfig:
    taskmanager.heap.size: 200
    state.backend.fs.checkpointdir: file:///checkpoints/flink/checkpoints
    state.checkpoints.dir: file:///checkpoints/flink/externalized-checkpoints
    state.savepoints.dir: file:///checkpoints/flink/savepoints
    web.upload.dir: /opt/flink
  jobManagerConfig:
    resources:
      requests:
        memory: "2000Mi"
        cpu: "0.4"
    replicas: 1
    envConfig:
      envFrom:
    - configMapRef:
            name: flink-common
      envFrom:
    - configMapRef:
            name: data-producer
  taskManagerConfig:
    taskSlots: 4
    resources:
      requests:
        memory: "2000Mi"
        cpu: "0.4"
    envConfig:
      envFrom:
    - configMapRef:
            name: flink-common
      envFrom:
    - configMapRef:
            name: data-producer

  flinkVersion: "1.9"
  jarName: "flink-pipeline.jar"
  parallelism: 1
  entryClass: "my.package.DataProducerJob"
  deleteMode: None
  allowNonRestoredState: true
  programArgs: >
   --my-args args

If I run the command kubectl get flinkapplications.flink.k8s.io -A, I get the following output:

NAMESPACE   NAME                PHASE   CLUSTER HEALTH   JOB HEALTH   JOB RESTARTS   AGE
test        ocr-data-producer                                                        17h

Both task manager and job manager logs are ok and, at flink's dashboard, the correct task slots number is showing (but no job is running, completed or failed).

PS: I had to change the flinkoperator role, because I was getting a message saying that flinkoperator could not update the status of a flinkapplication, so I added this to the role.

Weird fact: I started using flink operator v0.3.0 image, as in the example. However, this version is not injecting the rpc address add the flink-conf.yaml file, so I have to set the respective environment variable to this. At this version, everything was working. The reason that I have to change the image was due to the update lifecycle, which did not work with my rpc workaround.

anandswaminathan commented 4 years ago

@lucaspg96 I believe you are hitting the issue where we have made a backward incompatible change to the CRD. Can you run the latest release - v0.4.0.

Please make sure to update the CRD and this update should not be deployed to a cluster where there are active flinkapplication updates occurring — i.e., all flinkapplications should be in a Running or DeployFailed state.

lucaspg96 commented 4 years ago

Hi! Thank you for replying. I changed the CRD and the operator's image tag. Indeed, it is deploying the application now! However, I still cannot update the application once it has started. I have two versions of the image: 0.1.0 and 0.2.0, both working. I can deploy any of them first but, when I change the version and run a kubectl apply, The pods start and them I receive a DeployFailed. This is the log from operator:

{"json":{"app_name":"ocr-data-producer","ns":"develop","phase":"Savepointing"},"level":"info","msg":"Handling state for application","ts":"2020-01-16T13:02:29Z"}
{"json":{"app_name":"ocr-data-producer","ns":"develop","phase":"Savepointing"},"level":"info","msg":"Logged Normal event: CancellingJob: Cancelling job e19cc109c6833d123f5435ea938ef7ab with a final savepoint","ts":"2020-01-16T13:02:30Z"}
{"json":{"app_name":"ocr-data-producer","ns":"develop","phase":"Savepointing"},"level":"info","msg":"Handling state for application","ts":"2020-01-16T13:02:30Z"}
{"json":{"app_name":"ocr-data-producer","ns":"develop","phase":"Savepointing"},"level":"info","msg":"Logged Warning event: SavepointFailed: Failed to take savepoint for job e19cc109c6833d123f5435ea938ef7ab: {java.util.concurrent.CompletionException java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: org.apache.flink.runtime.checkpoint.CheckpointException: Failed to trigger savepoint. Failure reason: An Exception occurred while triggering the checkpoint.\n\tat org.apache.flink.runtime.scheduler.LegacyScheduler.lambda$triggerSavepoint$0(LegacyScheduler.java:510)\n\tat java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)\n\tat java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)\n\tat java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)\n\tat org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)\n\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)\n\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)\n\tat scala.PartialFunction.applyOrElse(PartialFunction.scala:123)\n\tat scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)\n\tat akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)\n\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)\n\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)\n\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)\n\tat akka.actor.Actor.aroundReceive(Actor.scala:517)\n\tat akka.actor.Actor.aroundReceive$(Actor.scala:515)\n\tat akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)\n\tat akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)\n\tat akka.actor.ActorCell.invoke(ActorCell.scala:561)\n\tat akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)\n\tat akka.dispatch.Mailbox.run(Mailbox.scala:225)\n\tat akka.dispatch.Mailbox.exec(Mailbox.scala:235)\n\tat akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)\n\tat akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)\n\tat akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)\n\tat akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)\nCaused by: java.util.concurrent.CompletionException: org.apache.flink.runtime.checkpoint.CheckpointException: Failed to trigger savepoint. Failure reason: An Exception occurred while triggering the checkpoint.\n\tat java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)\n\tat java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)\n\tat java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:607)\n\tat java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:628)\n\tat java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1996)\n\tat org.apache.flink.runtime.scheduler.LegacyScheduler.triggerSavepoint(LegacyScheduler.java:504)\n\tat org.apache.flink.runtime.jobmaster.JobMaster.triggerSavepoint(JobMaster.java:638)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.lang.reflect.Method.invoke(Method.java:498)\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)\n\t... 22 more\nCaused by: org.apache.flink.runtime.checkpoint.CheckpointException: Failed to trigger savepoint. Failure reason: An Exception occurred while triggering the checkpoint.\n\tat org.apache.flink.runtime.checkpoint.CheckpointCoordinator.triggerSavepointInternal(CheckpointCoordinator.java:428)\n\tat org.apache.flink.runtime.checkpoint.CheckpointCoordinator.triggerSavepoint(CheckpointCoordinator.java:377)\n\tat org.apache.flink.runtime.scheduler.LegacyScheduler.triggerSavepoint(LegacyScheduler.java:503)\n\t... 29 more\n}","ts":"2020-01-16T13:02:30Z"}
{"json":{"app_name":"ocr-data-producer","ns":"develop","phase":"Savepointing"},"level":"info","msg":"no externalized checkpoint found","ts":"2020-01-16T13:02:30Z"}
{"json":{"app_name":"ocr-data-producer","ns":"develop","phase":"Savepointing"},"level":"info","msg":"Logged Warning event: RolledBackDeploy: Successfully rolled back deploy d9f1da2e","ts":"2020-01-16T13:02:30Z"}
{"json":{"app_name":"ocr-data-producer","ns":"develop","phase":"DeployFailed"},"level":"info","msg":"Logged Normal event: ToreDownCluster: Deleted old cluster with hash d9f1da2e","ts":"2020-01-16T13:02:30Z"}
anandswaminathan commented 4 years ago

@lucaspg96 At this moment. Everything looks good from Operator.

There is an issue with your application. Check this

{"json":{"app_name":"ocr-data-producer","ns":"develop","phase":"Savepointing"},"level":"info","msg":"Logged Warning event: SavepointFailed: Failed to take savepoint for job e19cc109c6833d123f5435ea938ef7ab: {java.util.concurrent.CompletionException java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: org.apache.flink.runtime.checkpoint.CheckpointException: Failed to trigger savepoint. Failure reason: An Exception occurred while triggering the checkpoint.\n\tat org.apache.flink.runtime.scheduler.LegacyScheduler.lambda$triggerSavepoint$0(LegacyScheduler.java:510)\n\tat java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)\n\tat java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)\n\tat java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)\n\tat org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)\n\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)\n\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)\n\tat scala.PartialFunction.applyOrElse(PartialFunction.scala:123)\n\tat scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)\n\tat akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)\n\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)\n\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)\n\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)\n\tat akka.actor.Actor.aroundReceive(Actor.scala:517)\n\tat akka.actor.Actor.aroundReceive$(Actor.scala:515)\n\tat akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)\n\tat akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)\n\tat akka.actor.ActorCell.invoke(ActorCell.scala:561)\n\tat akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)\n\tat akka.dispatch.Mailbox.run(Mailbox.scala:225)\n\tat akka.dispatch.Mailbox.exec(Mailbox.scala:235)\n\tat akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)\n\tat akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)\n\tat akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)\n\tat akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)\nCaused by: java.util.concurrent.CompletionException: org.apache.flink.runtime.checkpoint.CheckpointException: Failed to trigger savepoint. Failure reason: An Exception occurred while triggering the checkpoint.\n\tat java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)\n\tat java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)\n\tat java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:607)\n\tat java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:628)\n\tat java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1996)\n\tat org.apache.flink.runtime.scheduler.LegacyScheduler.triggerSavepoint(LegacyScheduler.java:504)\n\tat org.apache.flink.runtime.jobmaster.JobMaster.triggerSavepoint(JobMaster.java:638)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.lang.reflect.Method.invoke(Method.java:498)\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)\n\t... 22 more\nCaused by: org.apache.flink.runtime.checkpoint.CheckpointException: Failed to trigger savepoint. Failure reason: An Exception occurred while triggering the checkpoint.\n\tat org.apache.flink.runtime.checkpoint.CheckpointCoordinator.triggerSavepointInternal(CheckpointCoordinator.java:428)\n\tat org.apache.flink.runtime.checkpoint.CheckpointCoordinator.triggerSavepoint(CheckpointCoordinator.java:377)\n\tat org.apache.flink.runtime.scheduler.LegacyScheduler.triggerSavepoint(LegacyScheduler.java:503)\n\t... 29 more\n}","ts":"2020-01-16T13:02:30Z"}
lucaspg96 commented 4 years ago

@anandswaminathan I inspected the log that you pointed and those exceptions are due to a savepoint. My code does not use savepoint, checkpoint or similar features. It only reads from a file and send each line to a Kafka topic. However, I'm using watermarks to simulate a throughput in the stream. Do you think that is possible that flinkoperator have some compatibility problem with this feature, at least on the updating cycle?

lucaspg96 commented 4 years ago

Hi, anyone has any news about this issue? I'm stuck in this problem for days...

anandswaminathan commented 4 years ago

@lucaspg96 At the moment Flinkk8soperator will always try to savepoint during updates.

lucaspg96 commented 4 years ago

@anandswaminathan Ok, but do you think that the problem at savepointing is related to the features that I described? If so, do you have any advice about what I should investigate to solve this? (in any case an advice will be very welcome ^^' )

anandswaminathan commented 4 years ago

@lucaspg96 The application resource status, task manager/Job manager logs should have the reason for the Savepoint failure/stuck.