GoogleCloudPlatform / flink-on-k8s-operator

[DEPRECATED] Kubernetes operator for managing the lifecycle of Apache Flink and Beam applications.
Apache License 2.0
658 stars 266 forks source link

autoSavepointSeconds is not having any effect #344

Closed vinaykw closed 3 years ago

vinaykw commented 3 years ago

I am deploying a kubernetes flink job cluster. below is the yaml for it.

# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: flinkoperator.k8s.io/v1beta1
kind: FlinkCluster
metadata:
  name: flinkjobcluster-sample
spec:
  image:
    name: flink:1.11.2
    pullPolicy: Always
  jobManager:
    accessScope: Cluster
    ports:
      ui: 8081
    resources:
      limits:
        memory: "1024Mi"
        cpu: "200m"
    volumes:
     - name: flink-check
       persistentVolumeClaim:
                claimName: flink-check-pv-claim  
    volumeMounts:
      - mountPath: /cdep/flink-checkpoints
        name: flink-check  

  taskManager:
    replicas: 2
    resources:
      limits:
        memory: "1024Mi"
        cpu: "200m"
    volumes:
      - name: cache-volume
        emptyDir: {}
      - name: flink-check
        persistentVolumeClaim:
                claimName: flink-check-pv-claim  
    volumeMounts:
      - mountPath: /cache
        name: cache-volume
      - mountPath: /cdep/flink-checkpoints
        name: flink-check 
    sidecars:
      - name: sidecar
        image: alpine
        command:
          - "sleep"
          - "10000"

  job:
    autoSavePointSeconds: 300
    savepointsDir: /cdep/flink-checkpoints
    restartPolicy: FromSavepointOnFailure
    jarFile: /cdep/flink-checkpoints/flink-assembly-0.1-SNAPSHOT.jar
    className: com.radisys.reporting.jobs.CouchbaseEventTimeJob
    args: ["--input", "./README.txt"]
    parallelism: 2
    volumes:
      - name: cache-volume
        emptyDir: {}
      - name: flink-check
        persistentVolumeClaim:
                claimName: flink-check-pv-claim  
    volumeMounts:
      - mountPath: /cache
        name: cache-volume
      - mountPath: /cdep/flink-checkpoints
        name: flink-check  

  envVars:
    - name: FOO
      value: bar
  flinkProperties:
    taskmanager.numberOfTaskSlots: "1"
    taskmanager.memory.flink.size: "768m"
    restart-strategy.fixed-delay.attempts: "3"
    restart-strategy.fixed-delay.delay: "10s"
  logConfig:
    "log4j-console.properties": |
        rootLogger.level = INFO
        rootLogger.appenderRef.console.ref = LogConsole
        appender.console.name = LogConsole
        appender.console.type = CONSOLE
        appender.console.layout.type = PatternLayout
        appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n      

But autosavepointseconds is not effective. It is not triggering the savepoints. Also the RestartPolicy is not applied if the job fails.

Job Spec has only the below things in it when I describe the flink cluster.


 Job:
    Allow Non Restored State:  false
    Args:
      --input
      ./README.txt
    Class Name:  com.radisys.reporting.jobs.CouchbaseEventTimeJob
    Cleanup Policy:
      After Job Cancelled:  DeleteCluster
      After Job Fails:      KeepCluster
      After Job Succeeds:   DeleteCluster
    Jar File:               /cdep/flink-checkpoints/flink-assembly-0.1-SNAPSHOT.jar
    No Logging To Stdout:   false
    Parallelism:            2
    Resources:
    Restart Policy:  FromSavepointOnFailure
    Savepoints Dir:  /cdep/flink-checkpoints
    Volume Mounts:
      Mount Path:  /cache
      Name:        cache-volume
      Mount Path:  /cdep/flink-checkpoints
      Name:        flink-check
    Volumes:
      Empty Dir:
      Name:  cache-volume
      Name:  flink-check
      Persistent Volume Claim:
        Claim Name:  flink-check-pv-claim

There is no autosavepointseconds in above description

guruprasathT commented 3 years ago

Hello, I guess you had tried to looked into this job specification like below.

kubectl get job -n {namespace} {job-name} -o yaml

And If it is flink-job-cluster can you try this to look for autoSavepointSeconds ?.

kubectl get flinkcluster -n {namespace} {flink-job-cluster-name} -o yaml
kubectl describe flinkcluster -n {namespace} {flink-job-cluster-name}

Please share the result.

vinaykw commented 3 years ago

I just pasted the job part from the flink cluster describe

This is the result of kubectl get flinkcluster -n {namespace} {flink-job-cluster-name} -o yaml


apiVersion: flinkoperator.k8s.io/v1beta1
kind: FlinkCluster
metadata:
  annotations:
    kubectl.kubernetes.io/last-applied-configuration: |
      {"apiVersion":"flinkoperator.k8s.io/v1beta1","kind":"FlinkCluster","metadata":{"annotations":{},"name":"flinkjobcluster-sample","namespace":"default"},"spec":{"envVars":[{"name":"FOO","value":"bar"}],"flinkProperties":{"restart-strategy.fixed-delay.attempts":"3","restart-strategy.fixed-delay.delay":"10s","taskmanager.memory.flink.size":"768m","taskmanager.numberOfTaskSlots":"1"},"image":{"name":"flink:1.11.2","pullPolicy":"Always"},"job":{"args":["--input","./README.txt"],"autoSavePointSeconds":300,"className":"com.radisys.reporting.jobs.CouchbaseEventTimeJob","jarFile":"/cdep/flink-checkpoints/flink-assembly-0.1-SNAPSHOT.jar","parallelism":2,"restartPolicy":"FromSavepointOnFailure","savepointsDir":"/cdep/flink-checkpoints","volumeMounts":[{"mountPath":"/cache","name":"cache-volume"},{"mountPath":"/cdep/flink-checkpoints","name":"flink-check"}],"volumes":[{"emptyDir":{},"name":"cache-volume"},{"name":"flink-check","persistentVolumeClaim":{"claimName":"flink-check-pv-claim"}}]},"jobManager":{"accessScope":"Cluster","ports":{"ui":8081},"resources":{"limits":{"cpu":"200m","memory":"1024Mi"}},"volumeMounts":[{"mountPath":"/cdep/flink-checkpoints","name":"flink-check"}],"volumes":[{"name":"flink-check","persistentVolumeClaim":{"claimName":"flink-check-pv-claim"}}]},"logConfig":{"log4j-console.properties":"rootLogger.level = INFO\nrootLogger.appenderRef.console.ref = LogConsole\nappender.console.name = LogConsole\nappender.console.type = CONSOLE\nappender.console.layout.type = PatternLayout\nappender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n      \n"},"taskManager":{"replicas":2,"resources":{"limits":{"cpu":"200m","memory":"1024Mi"}},"sidecars":[{"command":["sleep","10000"],"image":"alpine","name":"sidecar"}],"volumeMounts":[{"mountPath":"/cache","name":"cache-volume"},{"mountPath":"/cdep/flink-checkpoints","name":"flink-check"}],"volumes":[{"emptyDir":{},"name":"cache-volume"},{"name":"flink-check","persistentVolumeClaim":{"claimName":"flink-check-pv-claim"}}]}}}
  creationTimestamp: "2020-10-13T14:55:44Z"
  generation: 1
  name: flinkjobcluster-sample
  namespace: default
  resourceVersion: "26378695"
  selfLink: /apis/flinkoperator.k8s.io/v1beta1/namespaces/default/flinkclusters/flinkjobcluster-sample
  uid: ad186f11-9310-4de2-b7e4-6eba334e7960
spec:
  envVars:
  - name: FOO
    value: bar
  flinkProperties:
    restart-strategy.fixed-delay.attempts: "3"
    restart-strategy.fixed-delay.delay: 10s
    taskmanager.memory.flink.size: 768m
    taskmanager.numberOfTaskSlots: "1"
  image:
    name: flink:1.11.2
    pullPolicy: Always
  job:
    allowNonRestoredState: false
    args:
    - --input
    - ./README.txt
    className: com.radisys.reporting.jobs.CouchbaseEventTimeJob
    cleanupPolicy:
      afterJobCancelled: DeleteCluster
      afterJobFails: KeepCluster
      afterJobSucceeds: DeleteCluster
    jarFile: /cdep/flink-checkpoints/flink-assembly-0.1-SNAPSHOT.jar
    noLoggingToStdout: false
    parallelism: 2
    resources: {}
    restartPolicy: FromSavepointOnFailure
    savepointsDir: /cdep/flink-checkpoints
    volumeMounts:
    - mountPath: /cache
      name: cache-volume
    - mountPath: /cdep/flink-checkpoints
      name: flink-check
    volumes:
    - emptyDir: {}
      name: cache-volume
    - name: flink-check
      persistentVolumeClaim:
        claimName: flink-check-pv-claim
  jobManager:
    accessScope: Cluster
    memoryOffHeapMin: 600M
    memoryOffHeapRatio: 25
    ports:
      blob: 6124
      query: 6125
      rpc: 6123
      ui: 8081
    replicas: 1
    resources:
      limits:
        cpu: 200m
        memory: 1Gi
    volumeMounts:
    - mountPath: /cdep/flink-checkpoints
      name: flink-check
    volumes:
    - name: flink-check
      persistentVolumeClaim:
        claimName: flink-check-pv-claim
  logConfig:
    log4j-console.properties: "rootLogger.level = INFO\nrootLogger.appenderRef.console.ref
      = LogConsole\nappender.console.name = LogConsole\nappender.console.type = CONSOLE\nappender.console.layout.type
      = PatternLayout\nappender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS}
      %-5p %-60c %x - %m%n      \n"
  taskManager:
    memoryOffHeapMin: 600M
    memoryOffHeapRatio: 25
    ports:
      data: 6121
      query: 6125
      rpc: 6122
    replicas: 2
    resources:
      limits:
        cpu: 200m
        memory: 1Gi
    sidecars:
    - command:
      - sleep
      - "10000"
      image: alpine
      name: sidecar
      resources: {}
    volumeMounts:
    - mountPath: /cache
      name: cache-volume
    - mountPath: /cdep/flink-checkpoints
      name: flink-check
    volumes:
    - emptyDir: {}
      name: cache-volume
    - name: flink-check
      persistentVolumeClaim:
        claimName: flink-check-pv-claim
status:
  components:
    configMap:
      name: flinkjobcluster-sample-configmap
      state: Ready
    job:
      id: f7f8816108b599d5225e842154d4aa82
      name: flinkjobcluster-sample-job
      state: Failed
    jobManagerDeployment:
      name: flinkjobcluster-sample-jobmanager
      state: Ready
    jobManagerService:
      name: flinkjobcluster-sample-jobmanager
      state: Ready
    taskManagerDeployment:
      name: flinkjobcluster-sample-taskmanager
      state: Ready
  currentRevision: flinkjobcluster-sample-6dd96ffdfc-1
  lastUpdateTime: "2020-10-14T04:50:28Z"
  nextRevision: flinkjobcluster-sample-6dd96ffdfc-1
  state: Running

and this is the result of

kubectl describe flinkcluster

Name:         flinkjobcluster-sample
Namespace:    default
Labels:       <none>
Annotations:  API Version:  flinkoperator.k8s.io/v1beta1
Kind:         FlinkCluster
Metadata:
  Creation Timestamp:  2020-10-14T05:52:54Z
  Generation:          1
  Resource Version:    26393632
  Self Link:           /apis/flinkoperator.k8s.io/v1beta1/namespaces/default/flinkclusters/flinkjobcluster-sample
  UID:                 1dacfb6c-c7f2-4ece-8a71-a175399f2e29
Spec:
  Env Vars:
    Name:   FOO
    Value:  bar
  Flink Properties:
    restart-strategy.fixed-delay.attempts:  3
    restart-strategy.fixed-delay.delay:     10s
    taskmanager.memory.flink.size:          768m
    taskmanager.numberOfTaskSlots:          1
  Image:
    Name:         flink:1.11.2
    Pull Policy:  Always
  Job:
    Allow Non Restored State:  false
    Args:
      --input
      ./README.txt
    Class Name:  com.radisys.reporting.jobs.CouchbaseEventTimeJob
    Cleanup Policy:
      After Job Cancelled:  DeleteCluster
      After Job Fails:      KeepCluster
      After Job Succeeds:   DeleteCluster
    Jar File:               /cdep/flink-checkpoints/flink-assembly-0.1-SNAPSHOT.jar
    No Logging To Stdout:   false
    Parallelism:            2
    Resources:
    Restart Policy:  FromSavepointOnFailure
    Savepoints Dir:  /cdep/flink-checkpoints
    Volume Mounts:
      Mount Path:  /cache
      Name:        cache-volume
      Mount Path:  /cdep/flink-checkpoints
      Name:        flink-check
    Volumes:
      Empty Dir:
      Name:  cache-volume
      Name:  flink-check
      Persistent Volume Claim:
        Claim Name:  flink-check-pv-claim
  Job Manager:
    Access Scope:           Cluster
    Memory Off Heap Min:    600M
    Memory Off Heap Ratio:  25
    Ports:
      Blob:    6124
      Query:   6125
      Rpc:     6123
      Ui:      8081
    Replicas:  1
    Resources:
      Limits:
        Cpu:     200m
        Memory:  1Gi
    Volume Mounts:
      Mount Path:  /cdep/flink-checkpoints
      Name:        flink-check
    Volumes:
      Name:  flink-check
      Persistent Volume Claim:
        Claim Name:  flink-check-pv-claim
  Log Config:
    log4j-console.properties:  rootLogger.level = INFO
rootLogger.appenderRef.console.ref = LogConsole
appender.console.name = LogConsole
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n

  Task Manager:
    Memory Off Heap Min:    600M
    Memory Off Heap Ratio:  25
    Ports:
      Data:    6121
      Query:   6125
      Rpc:     6122
    Replicas:  2
    Resources:
      Limits:
        Cpu:     200m
        Memory:  1Gi
    Sidecars:
      Command:
        sleep
        10000
      Image:  alpine
      Name:   sidecar
      Resources:
    Volume Mounts:
      Mount Path:  /cache
      Name:        cache-volume
      Mount Path:  /cdep/flink-checkpoints
      Name:        flink-check
    Volumes:
      Empty Dir:
      Name:  cache-volume
      Name:  flink-check
      Persistent Volume Claim:
        Claim Name:  flink-check-pv-claim
Status:
  Components:
    Config Map:
      Name:   flinkjobcluster-sample-configmap
      State:  Ready
    Job:
      Id:     b361d1140b86dd05d05ed6b5c9308d48
      Name:   flinkjobcluster-sample-job
      State:  Running
    Job Manager Deployment:
      Name:   flinkjobcluster-sample-jobmanager
      State:  Ready
    Job Manager Service:
      Name:   flinkjobcluster-sample-jobmanager
      State:  Ready
    Task Manager Deployment:
      Name:          flinkjobcluster-sample-taskmanager
      State:         Ready
  Current Revision:  flinkjobcluster-sample-6dd96ffdfc-1
  Last Update Time:  2020-10-14T05:53:57Z
  Next Revision:     flinkjobcluster-sample-6dd96ffdfc-1
  State:             Running
Events:
  Type    Reason        Age                From           Message
  ----    ------        ----               ----           -------
  Normal  StatusUpdate  60s                FlinkOperator  Cluster status: Creating
  Normal  StatusUpdate  60s                FlinkOperator  JobManager deployment status: NotReady
  Normal  StatusUpdate  60s                FlinkOperator  ConfigMap status: Ready
  Normal  StatusUpdate  60s                FlinkOperator  JobManager service status: Ready
  Normal  StatusUpdate  60s                FlinkOperator  TaskManager deployment status: NotReady
  Normal  StatusUpdate  31s                FlinkOperator  JobManager deployment status changed: NotReady -> Ready
  Normal  StatusUpdate  20s                FlinkOperator  TaskManager deployment status changed: NotReady -> Ready
  Normal  StatusUpdate  20s                FlinkOperator  Cluster status changed: Creating -> Running
  Normal  StatusUpdate  18s (x2 over 18s)  FlinkOperator  Job status: Pending
  Normal  StatusUpdate  <invalid>          FlinkOperator  (combined from similar events): Job status changed: Pending -> Running
guruprasathT commented 3 years ago

job: autoSavePointSeconds: 300 savepointsDir: /cdep/flink-checkpoints

Ah i see autoSavepointSeconds and not autoSavePointSeconds.

vinaykw commented 3 years ago

I referred the document : https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/docs/savepoints_guide.md

job: autoSavepointSeconds: 300 savepointsDir: gs://my-bucket/savepoints/

is this documentation Bug?

functicons commented 3 years ago

Thanks for the feedback. autoSavepointSeconds is correct, the doc is wrong. flinkcluster_types.go is the source of truth.