apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.23k stars 2.39k forks source link

[SUPPORT] Flink Exceeded checkpoint tolerable failure threshold. #8276

Open jiangxinqi1995 opened 1 year ago

jiangxinqi1995 commented 1 year ago

Describe the problem you faced

A clear and concise description of the problem. "I use Flink cdc to read MySQL data, and then write it to S3 through hudi. I often encounter checkpoint org.apache.Flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold." "The common problem is that a checkpoint failure occurs every 20 minutes. I have no problems running on a local machine, but when I go to an EKS cluster, this problem occurs."

To Reproduce

Steps to reproduce the behavior:

Expected behavior

A clear and concise description of what you expected to happen.

2023-03-23 10:23:07,101 INFO  org.apache.hudi.sink.StreamWriteOperatorCoordinator          [] - Executor executes action [handle write metadata event for instant 20230323101927464] success!
2023-03-23 10:23:07,817 INFO  org.apache.flink.fs.s3.common.writer.S3Committer             [] - Committing reject/savepoint-fbea13-af1aa20f0400/_metadata with MPU ID rqLctlP9RnUBjWedNLI1bXhNB32evfVDwi7T1nNz8Gd9gDzFtYDRm615A5MCQivzMav.9yAJeD_Tp36Yp52oHitnmMZ6BqYmJV9G.JcDXEco.czZfXHRzGisnIuVzO._qSlcCQaQpSW.Qx.X0ex3LQ--
2023-03-23 10:23:08,065 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Completed checkpoint 11 for job fbea139434199b0095f544fe5c15d25f (747958 bytes, checkpointDuration=6668 ms, finalizationTime=0 ms).
2023-03-23 10:24:06,082 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering checkpoint 12 (type=CheckpointType{name='Checkpoint', sharingFilesStrategy=FORWARD_BACKWARD}) @ 1679567046082 for job fbea139434199b0095f544fe5c15d25f.
2023-03-23 10:24:06,765 INFO  org.apache.hudi.sink.StreamWriteOperatorCoordinator          [] - Executor executes action [taking checkpoint 12] success!
2023-03-23 10:34:06,083 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Checkpoint 12 of job fbea139434199b0095f544fe5c15d25f expired before completing.
2023-03-23 10:34:06,083 WARN  org.apache.flink.runtime.checkpoint.CheckpointFailureManager [] - Failed to trigger or complete checkpoint 12 for job fbea139434199b0095f544fe5c15d25f. (0 consecutive failed attempts so far)
org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint expired before completing.
    at org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:2143) [flink-dist-1.15.3.jar:1.15.3]
    at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) [?:?]
    at java.util.concurrent.FutureTask.run(Unknown Source) [?:?]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) [?:?]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?]
    at java.lang.Thread.run(Unknown Source) [?:?]
2023-03-23 10:34:06,084 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Trying to recover from a global failure.
org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold.
    at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.checkFailureAgainstCounter(CheckpointFailureManager.java:206) ~[flink-dist-1.15.3.jar:1.15.3]
    at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:169) ~[flink-dist-1.15.3.jar:1.15.3]
    at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleCheckpointException(CheckpointFailureManager.java:122) ~[flink-dist-1.15.3.jar:1.15.3]
    at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:2082) ~[flink-dist-1.15.3.jar:1.15.3]
    at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:2061) ~[flink-dist-1.15.3.jar:1.15.3]
    at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.access$600(CheckpointCoordinator.java:98) ~[flink-dist-1.15.3.jar:1.15.3]
    at org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:2143) ~[flink-dist-1.15.3.jar:1.15.3]
    at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) ~[?:?]
    at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) ~[?:?]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:?]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[?:?]
    at java.lang.Thread.run(Unknown Source) ~[?:?]

image image image image

Environment Description

Additional context

 connector = hudi, 
 table.type = MERGE_ON_READ, 
 hoodie.clean.async=true, 
 hoodie.compact.inline= true,  
 hoodie.compact.inline.max.delta.commits=2, 
 hoodie.clean.max.commits=2, 
 hoodie.cleaner.commits.retained = 3, 
 hoodie.cleaner.policy = KEEP_LATEST_COMMITS, 
 hoodie.parquet.small.file.limit=104857600, 
 clustering.schedule.enabled=true, 
 clustering.async.enabled=true, 
 hoodie.clustering.inline= true, 
 hoodie.clustering.inline.max.commits= 2, 
 hoodie.clustering.plan.strategy.max.bytes.per.group= 107374182400, 
 hoodie.clustering.plan.strategy.max.num.groups= 1, 
 hoodie.datasource.write.recordkey.field = id,installs,rejects, 
 path = s3a://xxxxxxxx/xxxxxxx

Add any other context about the problem here.

Stacktrace

Add the stacktrace of the error.

danny0405 commented 1 year ago

Did you see any error stack trace in the logging then, what time interval the ckp triggers with?

jiangxinqi1995 commented 1 year ago

This is my Flink checkpoint configuration. I don't see any other useful log information image

danny0405 commented 1 year ago

Your ckp options seem good, what is your partition path field then? How many partitions do you estimate that can be touched for one ckp write operation?

jiangxinqi1995 commented 1 year ago

Partitions by day, only triggers the current day's partition each time ``

jiangxinqi1995 commented 1 year ago

I don't know why. At first, it is possible to synthesize Parquet files, but after synthesizing two Parquet files, subsequent files will not be merged, and there will be errors such as org. apache. link. runtime. checkpoint. CheckpointException: Checkpoint expired before completing or Exceeded checkpoint tolerable failure threshold.

jiangxinqi1995 commented 1 year ago

Every time a Flink task is restarted, every two checkpoints are performed, and the next checkpoint will fail. This phenomenon is very strange.

danny0405 commented 1 year ago

Sonds like a env problem or record payload issue.

jiangxinqi1995 commented 1 year ago

It may be an environmental issue because it always performs a compression operation and then never performs a compression operation again

danny0405 commented 1 year ago

It may be an environmental issue because it always performs a compression operation and then never performs a compression operation again

Did the compaction succeed or failed ?

jiangxinqi1995 commented 1 year ago

Once successful, a parquet file was generated, and after a period of time, a checkpoint exception occurred. Thereafter, no parquet file was generated, only log files were written, accompanied by checkpoint failures.

danny0405 commented 1 year ago

What kind of env did you run the Flink job on?

jiangxinqi1995 commented 1 year ago

Flink job is deployed in AWS EKS,with a jobmanager and a taskmanager

jiangxinqi1995 commented 1 year ago

Flink images are self built,this is my Dockerfile

FROM flink:1.15.3-scala_2.12-java11
WORKDIR /opt/flink
RUN wget -q -O /opt/flink/lib/hudi-flink1.15-bundle-0.13.0.jar https://repo.maven.apache.org/maven2/org/apache/hudi/hudi-flink1.15-bundle/0.13.0/hudi-flink1.15-bundle-0.13.0.jar && \
    wget -q -O /opt/flink/lib/hadoop-aws-3.2.1.jar https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.2.1/hadoop-aws-3.2.1.jar && \
    wget -q -O /opt/flink/lib/aws-java-sdk-bundle-1.11.874.jar https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.11.874/aws-java-sdk-bundle-1.11.874.jar && \
    wget -q -O /opt/flink/lib/mysql-connector-java-8.0.27.jar https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.27/mysql-connector-java-8.0.27.jar && \
    wget -q -O /opt/flink/lib/flink-connector-jdbc-1.15.3.jar https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/1.15.3/flink-connector-jdbc-1.15.3.jar && \
    mkdir /opt/flink/plugins/s3-fs-hadoop && \
    cp /opt/flink/opt/flink-s3-fs-hadoop-1.15.3.jar /opt/flink/plugins/s3-fs-hadoop/ && \
    wget -q -O /opt/hadoop-3.2.1.tar.gz  https://archive.apache.org/dist/hadoop/common/hadoop-3.2.1/hadoop-3.2.1.tar.gz && \
    tar -zxvf /opt/hadoop-3.2.1.tar.gz -C /opt/ && \
    rm /opt/hadoop-3.2.1.tar.gz

ENV HADOOP_HOME=/opt/hadoop-3.2.1
ENV HADOOP_COMMON_HOME=$HADOOP_HOME
ENV PATH=$PATH:$HADOOP_HOME/bin
ENV HADOOP_CLASSPATH=${HADOOP_HOME}/etc/hadoop:${HADOOP_HOME}/share/hadoop/common/lib/*:${HADOOP_HOME}/share/hadoop/common/*:${HADOOP_HOME}/share/hadoop/hdfs:${HADOOP_HOME}/share/hadoop/hdfs/lib/*:${HADOOP_HOME}/share/hadoop/hdfs/*:${HADOOP_HOME}/share/hadoop/mapreduce/lib/*:${HADOOP_HOME}/share/hadoop/mapreduce/*:${HADOOP_HOME}/share/hadoop/yarn:${HADOOP_HOME}/share/hadoop/yarn/lib/*:${HADOOP_HOME}/share/hadoop/yarn/*
ENV CLASSPATH=$CLASSPATH:$HADOOP_CLASSPATH
danny0405 commented 1 year ago

I see, I guess it is because of the resource, did you try allocating more resource to the job here, the slot/memory/parallelism.

c-f-cooper commented 1 year ago

It's sound like the ckp interval ,try to decrease the ckp interval!

jiangxinqi1995 commented 1 year ago

I think it's because I set the automatic trigger for the flink savepoint, which causes checkpoint failures after each savepoint. I removed the automatic trigger for the savepoint, and now everything is normal. I think it's because of this problem, but I don't understand why this situation occurs. Has it happened before? Thank you. @danny0405 @c-f-cooper

danny0405 commented 1 year ago

Hav't learnt about the details of auto savepoint, guess it's caused by the preemption of resources, both the compaction and savepoint have rigorous requirements for resource.

jiangxinqi1995 commented 1 year ago

I did a test, and there was a 10 minute interval between savepoint and checkpoint. After triggering savepoint, the checkpoint still failed because the resources were sufficient at that time 13275348-3bb1-48b1-a08a-921258b684e0

gfunc commented 1 year ago

I am facing the same problem, tunning up resources could resolve the issue, but I am still curious about the reason behind it.

I have a pipeline running for days ATM, and the savepoint was triggered by the k8s operator every 6 hours. 4c8g x2 task managers in use with a total of 8 slots. Every checkpoint would take around 10min, processing 100k records and affecting 20+ partitions. And every checkpoint after savepoint would result in failure: Checkpoint expired before completing (60 min).

Could someone elaborate on what exactly is different about the process after Savepoint?

danny0405 commented 1 year ago

Nice findings, can you help to dig into the root cause @gfunc ?

jarrodcodes commented 1 year ago

We are experiencing this issue as well. We're also on Flink 1.15. Just curious if you've tried a newer version of Flink?

gfunc commented 1 year ago

I am with Flink 1.16 and Hudi 0.13. I tried looking into the code but was not able to identify the exact task that was responsible (my example job was always stuck at the sink part).

Since we were using S3 for checkpoint backend, we turned off auto savepoint and moved on to test other features, hoping that maybe async Hudi job execution (cleaning, etc.) would help in pinpointing the issue.

danny0405 commented 1 year ago

my example job was always stuck at the sink part

@gfunc Is it stuck for regular writing or when the auto savepoint kicks in? One thing need to note is the data flushing is kinda of a blocking operation.

gfunc commented 1 year ago

@danny0405 Sorry for any confusion, I meant the above-mentioned specific scenario: the first checkpoint after savepoint. Normal checkpoint is relatively slow but ok since we had a bad partition key making the number of partitions a bit too many for this small setup.

danny0405 commented 1 year ago

the first checkpoint after savepoint

Got it, I need to figure out what's the savepoint's effect to the ensueing checkpoint.