apache / iceberg

Apache Iceberg
https://iceberg.apache.org/
Apache License 2.0
5.88k stars 2.07k forks source link

NullPointerException after deleting old partition column #10626

Open mgmarino opened 1 week ago

mgmarino commented 1 week ago

Apache Iceberg version

1.5.2 (latest release)

Query engine

Spark

Please describe the bug šŸž

We have an iceberg table where we have changed the partitioning, going from an identity partition to hidden partitioning.

The partition specs are defined in the metadata json file:

  "partition-specs" : [ {
    "spec-id" : 0,
    "fields" : [ {
      "name" : "day",
      "transform" : "identity",
      "source-id" : 6,
      "field-id" : 1000
    } ]
  }, {
    "spec-id" : 1,
    "fields" : [ {
      "name" : "arrival_ts_day",
      "transform" : "day",
      "source-id" : 5,
      "field-id" : 1001
    } ]
  } ],

We did this evolution quite some time ago (I can't unfortunately remember which version of Iceberg we were using at the point we changed the partitioning), and are now trying to clean up the table by removing the old day column. Running a DROP COLUMN in spark (3.5.1, using Iceberg 1.5.2) succeeds, but then a subsequent read on the table, or e.g. the partitions metadata table results in:

Caused by: java.lang.NullPointerException: Type cannot be null
    at org.apache.iceberg.relocated.com.google.common.base.Preconditions.checkNotNull(Preconditions.java:921)
    at org.apache.iceberg.types.Types$NestedField.<init>(Types.java:447)
    at org.apache.iceberg.types.Types$NestedField.optional(Types.java:416)
    at org.apache.iceberg.PartitionSpec.partitionType(PartitionSpec.java:132)
    at org.apache.iceberg.Partitioning.buildPartitionProjectionType(Partitioning.java:274)
    at org.apache.iceberg.Partitioning.partitionType(Partitioning.java:242)
    at org.apache.iceberg.PartitionsTable.partitions(PartitionsTable.java:167)
    at org.apache.iceberg.PartitionsTable.task(PartitionsTable.java:122)
    at org.apache.iceberg.PartitionsTable.access$1100(PartitionsTable.java:35)
    at org.apache.iceberg.PartitionsTable$PartitionsScan.lambda$new$0(PartitionsTable.java:248)
    at org.apache.iceberg.StaticTableScan.doPlanFiles(StaticTableScan.java:53)
    at org.apache.iceberg.SnapshotScan.planFiles(SnapshotScan.java:139)
    at org.apache.iceberg.BatchScanAdapter.planFiles(BatchScanAdapter.java:119)
    at org.apache.iceberg.spark.source.SparkPartitioningAwareScan.tasks(SparkPartitioningAwareScan.java:174)
    at org.apache.iceberg.spark.source.SparkPartitioningAwareScan.taskGroups(SparkPartitioningAwareScan.java:202)
    at org.apache.iceberg.spark.source.SparkPartitioningAwareScan.outputPartitioning(SparkPartitioningAwareScan.java:104)
    at org.apache.spark.sql.execution.datasources.v2.V2ScanPartitioningAndOrdering$$anonfun$partitioning$1.applyOrElse(V2ScanPartitioningAndOrdering.scala:44)
    at org.apache.spark.sql.execution.datasources.v2.V2ScanPartitioningAndOrdering$$anonfun$partitioning$1.applyOrElse(V2ScanPartitioningAndOrdering.scala:42)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:517)
    at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1249)
    at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1248)
    at org.apache.spark.sql.catalyst.plans.logical.Project.mapChildren(basicLogicalOperators.scala:69)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:517)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:517)
    at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1249)
    at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1248)
    at org.apache.spark.sql.catalyst.plans.logical.LocalLimit.mapChildren(basicLogicalOperators.scala:1563)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:517)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:517)
    at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1249)
    at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1248)
    at org.apache.spark.sql.catalyst.plans.logical.GlobalLimit.mapChildren(basicLogicalOperators.scala:1542)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:517)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)
    at org.apache.spark.sql.execution.datasources.v2.V2ScanPartitioningAndOrdering$.partitioning(V2ScanPartitioningAndOrdering.scala:42)
    at org.apache.spark.sql.execution.datasources.v2.V2ScanPartitioningAndOrdering$.$anonfun$apply$1(V2ScanPartitioningAndOrdering.scala:35)
    at org.apache.spark.sql.execution.datasources.v2.V2ScanPartitioningAndOrdering$.$anonfun$apply$3(V2ScanPartitioningAndOrdering.scala:38)
    at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
    at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
    at scala.collection.immutable.List.foldLeft(List.scala:91)
    at org.apache.spark.sql.execution.datasources.v2.V2ScanPartitioningAndOrdering$.apply(V2ScanPartitioningAndOrdering.scala:37)
    at org.apache.spark.sql.execution.datasources.v2.V2ScanPartitioningAndOrdering$.apply(V2ScanPartitioningAndOrdering.scala:33)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:222)
    at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
    at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
    at scala.collection.immutable.List.foldLeft(List.scala:91)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:219)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:211)
    at scala.collection.immutable.List.foreach(List.scala:431)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:211)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:182)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:182)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$optimizedPlan$1(QueryExecution.scala:143)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:202)
    at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526)
    ... 32 more

This fails in Spark, but writes/commits from Flink (1.18.1, also using Iceberg 1.5.2) also fail following this change. There the stack trace looks like:

java.lang.NullPointerException: Type cannot be null
    at org.apache.iceberg.relocated.com.google.common.base.Preconditions.checkNotNull(Preconditions.java:921)
    at org.apache.iceberg.types.Types$NestedField.<init>(Types.java:448)
    at org.apache.iceberg.types.Types$NestedField.optional(Types.java:417)
    at org.apache.iceberg.PartitionSpec.partitionType(PartitionSpec.java:132)
    at org.apache.iceberg.util.PartitionSet.lambda$new$0(PartitionSet.java:46)
    at org.apache.iceberg.relocated.com.google.common.collect.RegularImmutableMap.forEach(RegularImmutableMap.java:297)
    at org.apache.iceberg.util.PartitionSet.<init>(PartitionSet.java:46)
    at org.apache.iceberg.util.PartitionSet.create(PartitionSet.java:38)
    at org.apache.iceberg.ManifestFilterManager.<init>(ManifestFilterManager.java:94)
    at org.apache.iceberg.MergingSnapshotProducer$DataFileFilterManager.<init>(MergingSnapshotProducer.java:1028)
    at org.apache.iceberg.MergingSnapshotProducer$DataFileFilterManager.<init>(MergingSnapshotProducer.java:1026)
    at org.apache.iceberg.MergingSnapshotProducer.<init>(MergingSnapshotProducer.java:118)
    at org.apache.iceberg.MergeAppend.<init>(MergeAppend.java:32)
    at org.apache.iceberg.BaseTable.newAppend(BaseTable.java:180)
    at org.apache.iceberg.flink.sink.IcebergFilesCommitter.commitDeltaTxn(IcebergFilesCommitter.java:360)
    at org.apache.iceberg.flink.sink.IcebergFilesCommitter.commitPendingResult(IcebergFilesCommitter.java:298)
    at org.apache.iceberg.flink.sink.IcebergFilesCommitter.commitUpToCheckpoint(IcebergFilesCommitter.java:280)
    at org.apache.iceberg.flink.sink.IcebergFilesCommitter.initializeState(IcebergFilesCommitter.java:198)
    at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:274)
    at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:753)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:728)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:693)
    at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:955)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:924)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:748)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:564)
    at java.base/java.lang.Thread.run(Thread.java:829)

We are using the AWS Glue Catalog to store information about the table. Here are the current table properties set:

+------------------------------------------+-------------------+
|key                                       |value              |
+------------------------------------------+-------------------+
|connector                                 |none               |
|current-snapshot-id                       |2617120118159963811|
|format                                    |iceberg/parquet    |
|format-version                            |2                  |
|history.expire.max-snapshot-age-ms        |6000000            |
|write.metadata.delete-after-commit.enabled|true               |
|write.metadata.previous-versions-max      |2880               |
+------------------------------------------+-------------------+

The only way for us to recover was to force the table to point to the metadata file right before the change.

I can provide the two metadata files if that's helpful, but I would rather do that privately if possible.

This seems quite similar to #7386, the table was initially written using Iceberg 1.2.1.

Please let me know if I can provide any other information!

dramaticlly commented 6 days ago

Based on the stacktrace, looks like partitions table need to evaluate all historical partition specs to build the partition value per #7551 , but since the column is already dropped from current schema. I am not sure if old partition spec is reference to the old schema or current table schema for looking up the source field id, if it's current I think it would be the cause of NPE. FYI @szehon-ho for better insight

lurnagao-dahua commented 6 days ago

Hi, same issues in 10234

mgmarino commented 6 days ago

@lurnagao-dahua thanks for linking that, I did not find it when searching. I see a few things possibly different here (though of course the underlying cause could be the same):