GoogleCloudPlatform / flink-on-k8s-operator

[DEPRECATED] Kubernetes operator for managing the lifecycle of Apache Flink and Beam applications.
Apache License 2.0
657 stars 265 forks source link

Flink operator shows job status as failed when it is still running #412

Closed pgandhijr closed 3 years ago

pgandhijr commented 3 years ago

We are experimenting with running flink using the K8 flink operator and Flink 1.10.1. Everything appears to run fine except for a couple of issues

1 When running "kubectl describe flinkclusters" command it reports that the job state is failed when the job is actually running without issues. Below is the status reported.

Status: Components: Config Map: Name: c5m-configmap State: Ready Job: Id: 6a7192e7b5536553dd1a560cb6e655a8 Name: c5m-job State: Failed Job Manager Deployment: Name: c5m-jobmanager State: Ready Job Manager Ingress: Name: c5m-jobmanager State: NotReady Urls: https://flink-c5m.yyyy.xxxcloud.com Job Manager Service: Name: c5m-jobmanager State: Ready Task Manager Deployment: Name: c5m-taskmanager State: Ready Current Revision: c5m-66886859d7-1 Last Update Time: 2021-02-12T16:54:40Z Next Revision: c5m-66886859d7-1 State: Running Events: Type Reason Age From Message


Normal StatusUpdate 6m23s FlinkOperator Cluster status: Creating Normal StatusUpdate 6m23s FlinkOperator JobManager deployment status: NotReady Normal StatusUpdate 6m23s FlinkOperator ConfigMap status: Ready Normal StatusUpdate 6m23s FlinkOperator JobManager service status: Ready Normal StatusUpdate 6m23s FlinkOperator JobManager ingress status: NotReady Normal StatusUpdate 6m23s FlinkOperator TaskManager deployment status: NotReady Normal StatusUpdate 6m17s FlinkOperator JobManager deployment status changed: NotReady -> Ready Normal StatusUpdate 6m11s FlinkOperator TaskManager deployment status changed: NotReady -> Ready Normal StatusUpdate 6m11s FlinkOperator Cluster status changed: Creating -> Running Normal StatusUpdate 3m50s (x2 over 6m11s) FlinkOperator (combined from similar events): Job status changed: Pending -> Failed Normal StatusUpdate 3m50s FlinkOperator Job status changed: Pending -> Failed

2 The second issue, which may be related to the above, is that we configured autoSavepoints to write to s3 (via minio) but the auto savepoint never triggers. We are using checkpoints to recover the job if the taskmanager fails. We are hoping that by using autosavepoints and setting the "Restart Policy" to FromSavepointOnFailure the job will automatically recover when the JobManager fails or when the Kubernetes environment is shutdown and brought up.

Find below the full configuration we are using.

Any help is much appreciated.

Name: c5m Namespace: test-flink-k8 Labels: Annotations: API Version: flinkoperator.k8s.io/v1beta1 Kind: FlinkCluster Metadata: Creation Timestamp: 2021-02-12T16:52:07Z Generation: 1 Managed Fields: API Version: flinkoperator.k8s.io/v1beta1 Fields Type: FieldsV1 fieldsV1: f:spec: .: f:envVars: f:flinkProperties: .: f:akka.ask.timeout: f:akka.framesize: f:heartbeat.timeout: f:jobmanager.memory.flink.size: f:restart-strategy.failure-rate.delay: f:restart-strategy.failure-rate.failure-rate-interval: f:restart-strategy.failure-rate.max-failures-per-interval: f:s3.access-key: f:s3.endpoint: f:s3.path.style.access: f:s3.secret-key: f:s3.socket-timeout: f:state.backend.rocksdb.thread.num: f:state.checkpoints.dir: f:state.savepoint: f:taskmanager.memory.flink.size: f:taskmanager.memory.managed.size: f:taskmanager.numberOfTaskSlots: f:image: .: f:imagePullPolicy: f:name: f:job: .: f:autoSavepointSeconds: f:className: f:initContainers: f:jarFile: f:parallelism: f:restartPolicy: f:savepointsDir: f:volumeMounts: f:volumes: f:jobManager: .: f:accessScope: f:ingress: .: f:annotations: .: f:ingress.kubernetes.io/force-ssl-redirect: f:kubernetes.io/ingress.allow-http: f:kubernetes.io/ingress.class: f:projectcontour.io/tls-minimum-protocol-version: f:hostFormat: f:tlsSecretName: f:useTLS: f:useTls: f:ports: .: f:ui: f:resources: .: f:limits: .: f:memory: f:taskManager: .: f:replicas: f:resources: .: f:limits: .: f:memory: Manager: hackney Operation: Update Time: 2021-02-12T16:52:07Z API Version: flinkoperator.k8s.io/v1beta1 Fields Type: FieldsV1 fieldsV1: f:status: .: f:components: .: f:configMap: .: f:name: f:state: f:job: .: f:id: f:name: f:state: f:jobManagerDeployment: .: f:name: f:state: f:jobManagerIngress: .: f:name: f:state: f:urls: f:jobManagerService: .: f:name: f:state: f:taskManagerDeployment: .: f:name: f:state: f:currentRevision: f:lastUpdateTime: f:nextRevision: f:state: Manager: flink-operator Operation: Update Time: 2021-02-12T16:52:19Z Resource Version: 1519039 Self Link: /apis/flinkoperator.k8s.io/v1beta1/namespaces/xxx-kots/flinkclusters/c5m UID: d4770a96-1b43-44d6-98ac-96f32af8fa58 Spec: Env Vars: Name: KAFKA_SCHEMA_REGISTRY Value: http://schemaregistry:8081 Name: FLINK_CHECKPOINT_BUCKET Value: s3://flink/checkpoints/c5m Name: FLINK_CHECKPOINT_MODE Value: EXACTLY_ONCE Name: FLINK_CHECKPOINT_TIMEOUT Value: 600 Name: FLINK_CHECKPOINT_FREQUENCY Value: 60 Name: MINIO_ACCESS_KEY Value: ABCDEF1234 Name: MINIO_ACCESS_SECRET Value: EFGH1234 Name: MINIO_ENDPOINT_URL Value: http://minio-hl:9000 Flink Properties: akka.ask.timeout: 180s akka.framesize: 2147483647b heartbeat.timeout: 120000 jobmanager.memory.flink.size: 4G restart-strategy.failure-rate.delay: 30 s restart-strategy.failure-rate.failure-rate-interval: 10 min restart-strategy.failure-rate.max-failures-per-interval: 30 s3.access-key: ABCDEF1234 s3.endpoint: http://minio-hl:9000 s3.path.style.access: true s3.secret-key: EFGH1234 s3.socket-timeout: 120s state.backend.rocksdb.thread.num: 4 state.checkpoints.dir: s3://flink/checkpoints state.savepoint: s3://flink/savepoints/c5m taskmanager.memory.flink.size: 20G taskmanager.memory.managed.size: 10g taskmanager.numberOfTaskSlots: 1 Image: Name: 478685488679.dkr.ecr.us-west-2.amazonaws.com/flink:1.10.1 Pull Policy: Always Job: Allow Non Restored State: false Auto Savepoint Seconds: 300 Class Name: com.cogility.hcep.cli.Main Cleanup Policy: After Job Cancelled: DeleteCluster After Job Fails: KeepCluster After Job Succeeds: DeleteCluster Init Containers: Args: -O /cache/job.jar http://authoring-otp.xxx-kots:4000/jars/briefly-1613/briefly-148718-579678-1.jar Command: wget Image: docker.io/busybox:1.31.1 Name: jar-fetcher Resources: Jar File: /cache/job.jar No Logging To Stdout: false Parallelism: 2 Resources: Restart Policy: FromSavepointOnFailure Savepoints Dir: s3://flink/savepoints/c5m Volume Mounts: Mount Path: /cache Name: cache-volume Volumes: Empty Dir: Name: cache-volume Job Manager: Access Scope: Cluster Ingress: Annotations: ingress.kubernetes.io/force-ssl-redirect: true kubernetes.io/ingress.allow-http: false kubernetes.io/ingress.class: contour projectcontour.io/tls-minimum-protocol-version: 1.2 Host Format: flink-c5m.yyyy.xxxcloud.com Tls Secret Name: xxx-global-tls Use Tls: true Memory Off Heap Min: 600M Memory Off Heap Ratio: 25 Ports: Blob: 6124 Query: 6125 Rpc: 6123 Ui: 8081 Replicas: 1 Resources: Limits: Memory: 4G Task Manager: Memory Off Heap Min: 600M Memory Off Heap Ratio: 25 Ports: Data: 6121 Query: 6125 Rpc: 6122 Replicas: 2 Resources: Limits: Memory: 20G

elanv commented 3 years ago

Could you share the log of the flink operator and the job sumitter?

pgandhijr commented 3 years ago

Find attached the job submitter and flink operator logs. Note that these logs are from a different run as I didn't have the logs from the run I reported the problem. However, the issue is the same.

The job submitter logs shows an exception but the job is still running. I don't know whether this is the issue.

Thanks for your help.

joblauncher.log flink-operator.log

pgandhijr commented 3 years ago

We updated the operator to the latest v1beta-9 tag but still encounter the same issue. The job submitter pod log shows an exception because the job was submitted in detached mode. Could this be the issue why the auto savepoints is not working and the job is showing a failed state?

pgandhijr commented 3 years ago

The issue was that our job starter code was not handling the --detach mode correctly and causing an exception. Once this was fixed the auto save points worked. So closing the issue.

colax94 commented 2 years ago

Hello @pgandhijr I'm experiencing the same issue (auto savepoints not being triggered). can you tell me what solved that for you please ?

chengjoey commented 2 years ago

Hello @pgandhijr I'm experiencing the same issue (auto savepoints not being triggered). can you tell me what solved that for you please ?

hi, I'm experiencing the same issue too. I solved the problem by reduce the length of sql

becase flink-operator unmarchal the pod message to FlinkJobSubmitLog struct

if message is too long ,jobID will be cut off image

image

if jobID is empty, flink-operator will make job status failed image

chengjoey commented 2 years ago

k8s pod`message is be limitted by 2048 bytes or 80 lines

chengjoey commented 2 years ago

@elanv could you solve the problem