GoogleCloudPlatform / flink-on-k8s-operator

[DEPRECATED] Kubernetes operator for managing the lifecycle of Apache Flink and Beam applications.
Apache License 2.0
658 stars 266 forks source link

`takeSavepointOnUpgrade` field is not created in `flinkcluster` #448

Open heojay opened 3 years ago

heojay commented 3 years ago

Hello.

I deployed the operator to my Kubernetes cluster (v1.20.4) by following the User Guide. The only change was to fix the controller-gen version to 0.2.4 mentioned on #266.

Then I deployed flinkcluster as below, and despite specifying the spec.job.takeSavepointOnUpgrade field, the flinkcluster created did not have that field.

Has anyone ever experienced the same issue?


wordcount.yaml

apiVersion: flinkoperator.k8s.io/v1beta1
kind: FlinkCluster
metadata:
  name: wordcount
spec:
  image:
    name: flink:1.9.2
  jobManager:
    ports:
      ui: 8081
    resources:
      limits:
        memory: "1024Mi"
        cpu: "200m"
  taskManager:
    replicas: 1
    resources:
      limits:
        memory: "1024Mi"
        cpu: "200m"
  job:
    jarFile: /cache/flink-app.jar
    className: org.apache.flink.streaming.examples.wordcount.WordCount
    args: ["--input", "./README.txt"]
    parallelism: 1
    savepointsDir: /cache/savepoints
    initContainers:
      - name: downloader
        image: curlimages/curl
        env:
          - name: JAR_URL
            value: https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.9.2/flink-examples-streaming_2.12-1.9.2-WordCount.jar
          - name: DEST_PATH
            value: /cache/flink-app.jar
        command: ['sh', '-c', 'curl -o ${DEST_PATH} ${JAR_URL}']
    takeSavepointOnUpgrade: true
    volumes:
      - name: cache
        emptyDir: {}
    volumeMounts:
      - mountPath: /cache
        name: cache
  flinkProperties:
    taskmanager.numberOfTaskSlots: "1"

$ kubectl apply -f wordcount.yaml
$ kubectl get flinkcluster wordcount -o yaml
apiVersion: flinkoperator.k8s.io/v1beta1
kind: FlinkCluster
metadata:
  annotations:
    kubectl.kubernetes.io/last-applied-configuration: |
      {"apiVersion":"flinkoperator.k8s.io/v1beta1","kind":"FlinkCluster","metadata":{"annotations":{},"name":"wordcount","namespace":"flink-test"},"spec":{"flinkProperties":{"taskmanager.numberOfTaskSlots":"1"},"image":{"name":"flink:1.9.2"},"job":{"args":["--input","./README.txt"],"className":"org.apache.flink.streaming.examples.wordcount.WordCount","initContainers":[{"command":["sh","-c","curl -o ${DEST_PATH} ${JAR_URL}"],"env":[{"name":"JAR_URL","value":"https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.9.2/flink-examples-streaming_2.12-1.9.2-WordCount.jar"},{"name":"DEST_PATH","value":"/cache/flink-app.jar"}],"image":"curlimages/curl","name":"downloader"}],"jarFile":"/cache/flink-app.jar","parallelism":1,"savepointsDir":"/cache/savepoints","takeSavepointOnUpgrade":true,"volumeMounts":[{"mountPath":"/cache","name":"cache"}],"volumes":[{"emptyDir":{},"name":"cache"}]},"jobManager":{"ports":{"ui":8081},"resources":{"limits":{"cpu":"200m","memory":"1024Mi"}}},"taskManager":{"replicas":1,"resources":{"limits":{"cpu":"200m","memory":"1024Mi"}}}}}
  creationTimestamp: "2021-05-12T02:51:50Z"
  generation: 1
  name: wordcount
  namespace: flink-test
  resourceVersion: "63495959"
  uid: 256fc78a-47ce-4a1c-8b06-1a594b5c5e98
spec:
  flinkProperties:
    taskmanager.numberOfTaskSlots: "1"
  image:
    name: flink:1.9.2
    pullPolicy: Always
  job:
    allowNonRestoredState: false
    args:
    - --input
    - ./README.txt
    className: org.apache.flink.streaming.examples.wordcount.WordCount
    cleanupPolicy:
      afterJobCancelled: DeleteCluster
      afterJobFails: KeepCluster
      afterJobSucceeds: DeleteCluster
    initContainers:
    - command:
      - sh
      - -c
      - curl -o ${DEST_PATH} ${JAR_URL}
      env:
      - name: JAR_URL
        value: https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.9.2/flink-examples-streaming_2.12-1.9.2-WordCount.jar
      - name: DEST_PATH
        value: /cache/flink-app.jar
      image: curlimages/curl
      name: downloader
      resources: {}
    jarFile: /cache/flink-app.jar
    noLoggingToStdout: false
    parallelism: 1
    resources: {}
    restartPolicy: Never
    savepointsDir: /cache/savepoints
    volumeMounts:
    - mountPath: /cache
      name: cache
    volumes:
    - emptyDir: {}
      name: cache
  jobManager:
    accessScope: Cluster
    memoryOffHeapMin: 600M
    memoryOffHeapRatio: 25
    ports:
      blob: 6124
      query: 6125
      rpc: 6123
      ui: 8081
    replicas: 1
    resources:
      limits:
        cpu: 200m
        memory: 1Gi
  recreateOnUpdate: true
  taskManager:
    memoryOffHeapMin: 600M
    memoryOffHeapRatio: 25
    ports:
      data: 6121
      query: 6125
      rpc: 6122
    replicas: 1
    resources:
      limits:
        cpu: 200m
        memory: 1Gi
status:
  components:
    configMap:
      name: wordcount-configmap
      state: Ready
    job:
      state: Pending
    jobManagerService:
      name: wordcount-jobmanager
      state: Ready
    jobManagerStatefulSet:
      name: wordcount-jobmanager
      state: NotReady
    taskManagerStatefulSet:
      name: wordcount-taskmanager
      state: NotReady
  currentRevision: wordcount-d9bb8f684-1
  lastUpdateTime: "2021-05-12T02:51:52Z"
  nextRevision: wordcount-d9bb8f684-1
  state: Creating

Thank you in advance.