apache / iceberg

Apache Iceberg
https://iceberg.apache.org/
Apache License 2.0
6.4k stars 2.21k forks source link

Purpose of MAX_CONTINUOUS_EMPTY_COMMITS in IcebergFilesCommitter #6630

Closed cgpoh closed 1 month ago

cgpoh commented 1 year ago

Query engine

Flink

Question

I have a Flink job that uses side output to write to Iceberg table when there are errors in the main processing function. If there are no errors in the processing function, no data files will be added to be committed. I noticed that the Flink job is restarting and throwing the following exception:

IcebergFilesCommitter -> Sink: iceberg-error-sink-FPL (1/1)#152 (2b5a4587aa3a50c531671a32a2f1538c) switched from RUNNING to FAILED with failure cause: java.lang.IllegalStateException: Cannot determine partition spec: no data files have been added
    at org.apache.iceberg.relocated.com.google.common.base.Preconditions.checkState(Preconditions.java:502)
    at org.apache.iceberg.MergingSnapshotProducer.dataSpec(MergingSnapshotProducer.java:150)
    at org.apache.iceberg.BaseReplacePartitions.apply(BaseReplacePartitions.java:133)
    at org.apache.iceberg.SnapshotProducer.apply(SnapshotProducer.java:223)
    at org.apache.iceberg.BaseReplacePartitions.apply(BaseReplacePartitions.java:26)
    at org.apache.iceberg.SnapshotProducer.lambda$commit$2(SnapshotProducer.java:369)
    at org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:402)
    at org.apache.iceberg.util.Tasks$Builder.runSingleThreaded(Tasks.java:212)
    at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:196)
    at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:189)
    at org.apache.iceberg.SnapshotProducer.commit(SnapshotProducer.java:367)
    at org.apache.iceberg.BaseReplacePartitions.commit(BaseReplacePartitions.java:26)
    at org.apache.iceberg.flink.sink.IcebergFilesCommitter.commitOperation(IcebergFilesCommitter.java:372)
    at org.apache.iceberg.flink.sink.IcebergFilesCommitter.replacePartitions(IcebergFilesCommitter.java:314)
    at org.apache.iceberg.flink.sink.IcebergFilesCommitter.commitPendingResult(IcebergFilesCommitter.java:270)
    at org.apache.iceberg.flink.sink.IcebergFilesCommitter.commitUpToCheckpoint(IcebergFilesCommitter.java:255)
    at org.apache.iceberg.flink.sink.IcebergFilesCommitter.notifyCheckpointComplete(IcebergFilesCommitter.java:229)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.notifyCheckpointComplete(StreamOperatorWrapper.java:104)
    at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.notifyCheckpointComplete(RegularOperatorChain.java:145)
    at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpoint(SubtaskCheckpointCoordinatorImpl.java:409)
    at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpointComplete(SubtaskCheckpointCoordinatorImpl.java:343)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:1384)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointCompleteAsync$14(StreamTask.java:1325)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$17(StreamTask.java:1364)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
    at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
    at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
    at java.base/java.lang.Thread.run(Unknown Source)

I saw that in the commitPendingResult function of IcebergFilesCommitter.java, there's a condition to check whether to skip empty commit but if the MAX_CONTINUOUS_EMPTY_COMMITS is met, it will proceed to commit even there are no data files to commit and thus, throwing the above exception.

long totalFiles = summary.dataFilesCount() + summary.deleteFilesCount();
continuousEmptyCheckpoints = totalFiles == 0 ? continuousEmptyCheckpoints + 1 : 0;
if (totalFiles != 0 || continuousEmptyCheckpoints % maxContinuousEmptyCommits == 0) {
  if (replacePartitions) {
    replacePartitions(pendingResults, summary, newFlinkJobId, operatorId, checkpointId);
  } else {
    commitDeltaTxn(pendingResults, summary, newFlinkJobId, operatorId, checkpointId);
  }
  continuousEmptyCheckpoints = 0;
} else {
  LOG.info("Skipping committing empty checkpoint {}", checkpointId);
}

May I know what's the purpose of this empty commit?

dou-dou commented 1 year ago

I also encountered this exception,did you find the reason? I also want to get purpose of MAX_CONTINUOUS_EMPTY_COMMITS .

pvary commented 1 year ago

The Flink snapshot/checpoint state is kept in 3 places:

These 3 needs to be in sync, and we need to keep the changes since the last sync. So if we do not commit to Iceberg, then the Flink internal state and the file system temp tables are keep growing. To avoid this, we commit from time to time (and write Flink metadata to the Iceberg table in the process), and after this commit we are able to remove old temp files and clean some data from the Flink state.

So writing empty commit is intentional/needed, but the failure seems like a bug.

dou-dou commented 1 year ago

@pvary Thanks for your reply.

pvary commented 1 year ago

@dou-dou: Could you please share the Flink+Iceberg version you are using?

dou-dou commented 1 year ago

@pvary Version Flink 1.12.7 Iceberg 0.13.1 Question I am using chunjun to incrementally sync data from mysql to iceberg. I noticed that the Flink job is running in overwrite mode. However,when there is no incremental data, that is totalFiles == 0, and the number of checkpoint submissions is a multiple of MAX_CONTINUOUS_EMPTY_COMMITS, that is continuousEmptyCheckpoints % maxContinuousEmptyCommits == 0 ,it throws the following exception: image then, the Flink job is restarting and running.Next ,it will throwing the exception when checkpointId is 21(The last exception thrown when checkpointId is 12). I hope that there is totalFiles == 0 , and no exception will be generated when executing operation.commit(), or is there another solution.

pvary commented 1 year ago

Seems like a bug to me. When we are writing data to the IcebergSink, and there is no data come in for the specified maxContinuousEmptyCommits number of commits, then we try to commit an empty changeset with the following code in lcebergFilesCommitter:

    if (totalFiles != 0 || continuousEmptyCheckpoints % maxContinuousEmptyCommits == 0) {
      if (replacePartitions) {
        replacePartitions(pendingResults, summary, newFlinkJobId, operatorId, checkpointId);
      } else {
        commitDeltaTxn(pendingResults, summary, newFlinkJobId, operatorId, checkpointId);
      }
      continuousEmptyCheckpoints = 0;
    } else {
      LOG.info("Skip commit for checkpoint {} due to no data files or delete files.", checkpointId);
    }

I think we might want to handle the zero commits with a different code path. Maybe we just switch this case to handled by commitDeltaTxn regardless of the replacePartitions value.

I do not have enough time nowadays to write and test the patch, but if you have time to write it, I will try to find time to review it.

CC: @stevenzwu, @hililiwei

dou-dou commented 1 year ago

@pvary Thanks for your advice.

hililiwei commented 1 year ago

@pvary Yes, seems like a bug. But do you try to overwrite the data of a partition every time @cgpoh ? Is it reasonable to overwrite partitions or the table every time in a streaming task? It seems like we only do this in batch tasks.

If commit an empty snapshot using overwrite mode, it will occur this error. I raised #7983 , which follows @pvary ’s suggestion, regardless of the replacePartitions value.

pvary commented 1 year ago

@dou-dou: What is the Iceberg Sink configuration you are using?

dou-dou commented 1 year ago

@pvary I have found that the exception is caused by my unreasonable configuration.Thank you very much for your help.

pvary commented 1 year ago

Still, as @hililiwei mentioned in her other comment, it might be good to prevent these "unreasonable" configurations, if they are indeed unreasonable 😄

hililiwei commented 1 year ago

Yes, if we were not supposed to use overwriter in streaming tasks, then should we add a check to avoid this misuse? @pvary

cgpoh commented 1 year ago

@pvary I have found that the exception is caused by my unreasonable configuration.Thank you very much for your help.

Hi @dou-dou , what’s your unreasonable configuration? Like to check whether my configuration is unreasonable too.

dou-dou commented 1 year ago

@cgpoh as hililiwei mentioned in her other comment,I'm probably using override mode in the stream task when I use chunjun to incrementally sync data from mysql to iceberg.

cgpoh commented 1 year ago

Thanks @dou-dou for your reply! My replacePartitions flag is set to true in my streaming task and hence throwing exceptions with empty commit.

github-actions[bot] commented 2 months ago

This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.

github-actions[bot] commented 1 month ago

This issue has been closed because it has not received any activity in the last 14 days since being marked as 'stale'