apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.43k stars 2.42k forks source link

[SUPPORT] ALTER TABLE DROP PARTITION DDL may cause data inconsistencies when table service actions are performed #7663

Closed voonhous closed 1 year ago

voonhous commented 1 year ago

Tips before filing an issue

Describe the problem you faced

A clear and concise description of the problem.

TLDR

Dropping a partition when there's a pending table service action may cause downstream write errors + data inconsistencies

Detailed explanation

  1. There's an append-only job with online clustering enabled that is executing.
  2. User executed a ALTER-TABLE DROP partition DDL to drop a partition
  3. User is still able to query out the partition's data
  4. Operations on a table may fail if AbstractTableFileSystemView#resetFileGroupsReplaced is invoked

Root cause

  1. clustering plan created to cluster partition=a/filegroup_1 (assume that partition=a only has filegroup_1)
  2. User executed a ALTER-TABLE DROP partition DDL to drop partition=a
  3. replacecommit file created for ALTER-TABLE DROP PARTITION DDL to replace/ignore filegroup_1
  4. clustering plan is executed, at this point, it will cluster partition=a/filegroup_1 to produce partition=a/filegroup_2
  5. when reading a partition, it will read out partition=a/filegroup_2. (if it somehow manages to bypass AbstractFileSystemView#resetFileGroupsReplaced)
  6. duplicate key error if AbstractFileSystemView#resetFileGroupsReplaced is invoked

Errors

  1. Spark-SQL will still show data in the partition
  2. When FileSystemView is invoked, an error like this will be thrown:

To Reproduce

Steps to reproduce the behavior:

  test("ALTER TABLE DROP PARTITION + CLUSTER (Unable to drop partition issue)") {
    withTempDir { tmp =>
      Seq("cow").foreach { tableType =>
        val tableName = generateTableName
        val basePath = s"${tmp.getCanonicalPath}t/$tableName"
        spark.sql(
          s"""
             |create table $tableName (
             |  id int,
             |  name string,
             |  price double,
             |  ts long
             |) using hudi
             | options (
             |  primaryKey ='id',
             |  type = '$tableType',
             |  preCombineField = 'ts'
             | )
             | partitioned by(ts)
             | location '$basePath'
       """.stripMargin)
        spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
        spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)")
        spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)")
        val client = HoodieCLIUtils.createHoodieClientFromPath(spark, basePath, Map.empty)
        // Generate the first clustering plan
        val firstScheduleInstant = HoodieActiveTimeline.createNewInstantTime
        client.scheduleClusteringAtInstant(firstScheduleInstant, HOption.empty())

        checkAnswer(s"call show_clustering('$tableName')")(
          Seq(firstScheduleInstant, 3, HoodieInstant.State.REQUESTED.name(), "*")
        )

        spark.sql(s"ALTER TABLE $tableName DROP PARTITION(ts=1002)")

        // Do clustering for all clustering plan generated above, and no new clustering
        // instant will be generated because of there is no commit after the second
        // clustering plan generated
        checkAnswer(s"call run_clustering(table => '$tableName', order => 'ts', show_involved_partition => true)")(
          Seq(firstScheduleInstant, 3, HoodieInstant.State.COMPLETED.name(), "ts=1000,ts=1001,ts=1002")
        )

        // Check the number of finished clustering instants
        val fs = new Path(basePath).getFileSystem(spark.sessionState.newHadoopConf())
        val finishedClustering = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath)
          .getInstants
          .iterator().asScala
          .filter(p => p.getAction == HoodieTimeline.REPLACE_COMMIT_ACTION)
          .toSeq
        // 1 drop partition + 1 clustering
        assertResult(2)(finishedClustering.size)

        checkAnswer(s"select id, name, price, ts from $tableName order by id")(
          Seq(1, "a1", 10.0, 1000),
          Seq(2, "a2", 10.0, 1001)
        )
      }
    }

Expected behavior

A clear and concise description of what you expected to happen.

Environment Description

Additional context

Add any other context about the problem here.

Stacktrace

Add the stacktrace of the error.

Caused by: java.lang.IllegalStateException: Duplicate key [20230113023147076__replacecommit__COMPLETED]
    at java.util.stream.Collectors.lambda$throwingMerger$0(Collectors.java:133)
    at java.util.HashMap.merge(HashMap.java:1255)
    at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1320)
    at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
    at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
    at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
    at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
    at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
    at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
    at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
    at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
    at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272)
    at java.util.Iterator.forEachRemaining(Iterator.java:116)
    at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
    at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
    at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
    at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
    at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272)
    at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
    at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
    at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
    at org.apache.hudi.common.table.view.AbstractTableFileSystemView.resetFileGroupsReplaced(AbstractTableFileSystemView.java:247)
    at org.apache.hudi.common.table.view.AbstractTableFileSystemView.init(AbstractTableFileSystemView.java:110)
    at org.apache.hudi.common.table.view.HoodieTableFileSystemView.init(HoodieTableFileSystemView.java:113)
    at org.apache.hudi.common.table.view.HoodieTableFileSystemView.<init>(HoodieTableFileSystemView.java:107)
    at org.apache.hudi.common.table.view.HoodieTableFileSystemView.<init>(HoodieTableFileSystemView.java:98)
    at org.apache.hudi.common.table.view.HoodieTableFileSystemView.<init>(HoodieTableFileSystemView.java:176)
    at org.apache.hudi.BaseHoodieTableFileIndex.loadFileSlicesForPartitions(BaseHoodieTableFileIndex.java:252)
    at org.apache.hudi.BaseHoodieTableFileIndex.ensurePreloadedPartitions(BaseHoodieTableFileIndex.java:240)
    at org.apache.hudi.BaseHoodieTableFileIndex.getInputFileSlices(BaseHoodieTableFileIndex.java:226)
    at org.apache.hudi.HoodieFileIndex.listFiles(HoodieFileIndex.scala:138)
voonhous commented 1 year ago

Not sure if this is the correct approach , but should we prevent users from dropping a partition if there's a pending table service action on partition path?

SparkDeletePartitionCommitActionExecutor#execute

public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() {
    // ensure that there are no pending inflight clustering/compaction operations involving this partition
    SyncableFileSystemView fileSystemView = (SyncableFileSystemView) table.getSliceView();
    List<String> partitionPathsWithPendingInflightTableServiceActions = Stream
        .concat(fileSystemView.getPendingCompactionOperations(), fileSystemView.getPendingLogCompactionOperations())
        .map(op -> op.getRight().getPartitionPath())
        .distinct()
        .collect(Collectors.toList());

    partitionPathsWithPendingInflightTableServiceActions.addAll(
        fileSystemView.getFileGroupsInPendingClustering()
            .map(x -> x.getKey().getPartitionPath())
            .collect(Collectors.toList()));

    if (partitions.stream().anyMatch(partitionPathsWithPendingInflightTableServiceActions::contains)) {
      throw new HoodieDeletePartitionException("Failed to drop partitions. "
          + "Please ensure that there are no pending table service actions (clustering/compaction) for the "
          + "partitions to be deleted: " + partitions);
    }

    try {
...
voonhous commented 1 year ago

Not a fix, but added a step to prevent such actions from happening with an informative error message on how to remedy such actions here: #7669

ad1happy2go commented 1 year ago

@voonhous Confirmed with the master code , that fix is working fine. It is now restricting the drop partition to happen only if any remaining compaction/clustering is pending.

org.apache.hudi.exception.HoodieDeletePartitionException: Failed to drop partitions. Please ensure that there are no pending table service actions (clustering/compaction) for the partitions to be deleted: [ts=1002]. Instant(s) of offending pending table service action: [20230512161603721]

Closing this bug as this seems to be a reasonable fix for this problem.