apache / iceberg

Apache Iceberg
https://iceberg.apache.org/
Apache License 2.0
6.1k stars 2.13k forks source link

Spark: Dropping partition column from old partition table corrupts entire table #10234

Open EXPEbdodla opened 4 months ago

EXPEbdodla commented 4 months ago

Apache Iceberg version

1.5.1 (latest release)

Query engine

Spark

Please describe the bug 🐞

Problem: Dropping an old partition spec column corrupts table. Metadata queries failing, Select fails and dataframe appends failing. Verified this on 1.2.1, 1.4.3 and 1.5.1 iceberg versions

What to expect:

How to reproduce: Try_Iceberg_partition_column_delete.md

Exported notebook as MD file and uploading it

StackTrace: SELECT:

Caused by: java.lang.NullPointerException: Type cannot be null at org.apache.iceberg.relocated.com.google.common.base.Preconditions.checkNotNull(Preconditions.java:907) at org.apache.iceberg.types.Types$NestedField.<init>(Types.java:447) at org.apache.iceberg.types.Types$NestedField.optional(Types.java:416) at org.apache.iceberg.PartitionSpec.partitionType(PartitionSpec.java:132) at org.apache.iceberg.Partitioning.buildPartitionProjectionType(Partitioning.java:286) at org.apache.iceberg.Partitioning.partitionType(Partitioning.java:254) at org.apache.iceberg.spark.source.SparkTable.metadataColumns(SparkTable.java:251) at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation.metadataOutput$lzycompute(DataSourceV2Relation.scala:61) at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation.metadataOutput(DataSourceV2Relation.scala:51) at org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias.metadataOutput(basicLogicalOperators.scala:1339) at org.apache.spark.sql.catalyst.analysis.Analyzer$AddMetadataColumns$.$anonfun$hasMetadataCol$3(Analyzer.scala:966) at org.apache.spark.sql.catalyst.analysis.Analyzer$AddMetadataColumns$.$anonfun$hasMetadataCol$3$adapted(Analyzer.scala:966) at scala.collection.Iterator.exists(Iterator.scala:969) at scala.collection.Iterator.exists$(Iterator.scala:967) at scala.collection.AbstractIterator.exists(Iterator.scala:1431) at scala.collection.IterableLike.exists(IterableLike.scala:79) at scala.collection.IterableLike.exists$(IterableLike.scala:78) at scala.collection.AbstractIterable.exists(Iterable.scala:56) at org.apache.spark.sql.catalyst.analysis.Analyzer$AddMetadataColumns$.$anonfun$hasMetadataCol$2(Analyzer.scala:966) at org.apache.spark.sql.catalyst.analysis.Analyzer$AddMetadataColumns$.$anonfun$hasMetadataCol$2$adapted(Analyzer.scala:961) at org.apache.spark.sql.catalyst.trees.TreeNode.exists(TreeNode.scala:346) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$exists$1(TreeNode.scala:349) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$exists$1$adapted(TreeNode.scala:349) at scala.collection.Iterator.exists(Iterator.scala:969) at scala.collection.Iterator.exists$(Iterator.scala:967) at scala.collection.AbstractIterator.exists(Iterator.scala:1431) at scala.collection.IterableLike.exists(IterableLike.scala:79) at scala.collection.IterableLike.exists$(IterableLike.scala:78) at scala.collection.AbstractIterable.exists(Iterable.scala:56) at org.apache.spark.sql.catalyst.trees.TreeNode.exists(TreeNode.scala:349) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$exists$1(TreeNode.scala:349) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$exists$1$adapted(TreeNode.scala:349) at scala.collection.Iterator.exists(Iterator.scala:969) at scala.collection.Iterator.exists$(Iterator.scala:967) at scala.collection.AbstractIterator.exists(Iterator.scala:1431) at scala.collection.IterableLike.exists(IterableLike.scala:79) at scala.collection.IterableLike.exists$(IterableLike.scala:78) at scala.collection.AbstractIterable.exists(Iterable.scala:56) at org.apache.spark.sql.catalyst.trees.TreeNode.exists(TreeNode.scala:349) at org.apache.spark.sql.catalyst.analysis.Analyzer$AddMetadataColumns$.$anonfun$hasMetadataCol$1(Analyzer.scala:961) at org.apache.spark.sql.catalyst.analysis.Analyzer$AddMetadataColumns$.$anonfun$hasMetadataCol$1$adapted(Analyzer.scala:961) at scala.collection.LinearSeqOptimized.exists(LinearSeqOptimized.scala:95) at scala.collection.LinearSeqOptimized.exists$(LinearSeqOptimized.scala:92) at scala.collection.immutable.Stream.exists(Stream.scala:204) at org.apache.spark.sql.catalyst.analysis.Analyzer$AddMetadataColumns$.org$apache$spark$sql$catalyst$analysis$Analyzer$AddMetadataColumns$$hasMetadataCol(Analyzer.scala:961) at org.apache.spark.sql.catalyst.analysis.Analyzer$AddMetadataColumns$$anonfun$apply$12.applyOrElse(Analyzer.scala:931) at org.apache.spark.sql.catalyst.analysis.Analyzer$AddMetadataColumns$$anonfun$apply$12.applyOrElse(Analyzer.scala:928) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$2(AnalysisHelper.scala:170) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$1(AnalysisHelper.scala:170) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning(AnalysisHelper.scala:168) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning$(AnalysisHelper.scala:164) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.analysis.Analyzer$AddMetadataColumns$.apply(Analyzer.scala:928) at org.apache.spark.sql.catalyst.analysis.Analyzer$AddMetadataColumns$.apply(Analyzer.scala:924) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:211) at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126) at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122) at scala.collection.immutable.List.foldLeft(List.scala:91) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:208) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:200) at scala.collection.immutable.List.foreach(List.scala:431) at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:200) at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:231) at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$execute$1(Analyzer.scala:227) at org.apache.spark.sql.catalyst.analysis.AnalysisContext$.withNewAnalysisContext(Analyzer.scala:173) at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:227) at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:188) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:179) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88) at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:179) at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:212) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330) at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:211) at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:76) at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:185) at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510) ... 25 more

For Spark Dataframe Append:

org.apache.iceberg.exceptions.ValidationException: Cannot find source column for partition field: 1000: event_date: identity(6) at org.apache.iceberg.exceptions.ValidationException.check(ValidationException.java:49) at org.apache.iceberg.PartitionSpec.checkCompatibility(PartitionSpec.java:587) at org.apache.iceberg.PartitionSpec$Builder.build(PartitionSpec.java:568) at org.apache.iceberg.UnboundPartitionSpec.bind(UnboundPartitionSpec.java:46) at org.apache.iceberg.PartitionSpecParser.fromJson(PartitionSpecParser.java:71) at org.apache.iceberg.PartitionSpecParser.lambda$fromJson$1(PartitionSpecParser.java:88) at org.apache.iceberg.util.JsonUtil.parse(JsonUtil.java:95) at org.apache.iceberg.PartitionSpecParser.lambda$fromJson$2(PartitionSpecParser.java:88) at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.lambda$doComputeIfAbsent$14(BoundedLocalCache.java:2406) at java.base/java.util.concurrent.ConcurrentHashMap.compute(ConcurrentHashMap.java:1908) at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.doComputeIfAbsent(BoundedLocalCache.java:2404) at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.computeIfAbsent(BoundedLocalCache.java:2387) at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.LocalCache.computeIfAbsent(LocalCache.java:108) at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.LocalManualCache.get(LocalManualCache.java:62) at org.apache.iceberg.PartitionSpecParser.fromJson(PartitionSpecParser.java:86) at org.apache.iceberg.SerializableTable.lambda$specs$1(SerializableTable.java:192) at java.base/java.util.HashMap.forEach(HashMap.java:1337) at org.apache.iceberg.SerializableTable.specs(SerializableTable.java:190) at org.apache.iceberg.SerializableTable.spec(SerializableTable.java:180) at org.apache.iceberg.spark.source.SparkWrite$WriterFactory.createWriter(SparkWrite.java:638) at org.apache.iceberg.spark.source.SparkWrite$WriterFactory.createWriter(SparkWrite.java:632) at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:430) at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:381) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:136) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829)

manuzhang commented 4 months ago

Which Spark version are you using?

EXPEbdodla commented 4 months ago

Which Spark version are you using?

I was originally trying with Spark 3.3.0 and iceberg 1.2.1 version.

Later I tried with Spark-iceberg docker images tabulario/spark-iceberg:latest, tabulario/spark-iceberg:3.5.1_1.4.3 and tabulario/spark-iceberg:3.3.2_1.2.1.

manuzhang commented 4 months ago

I don't think drop column should be allowed when you still have data with the column as partition.

EXPEbdodla commented 4 months ago

Once partition column is dropped/ replaced from the partition spec, it should be similar to DROP COLUMN. Is that a right assumption?

Fokko commented 4 months ago

@EXPEbdodla To clarify, are you dropping a column that's part of the current partition spec?

I did some work to fix this on previous partition specs: https://github.com/apache/iceberg/issues/5399

Fokko commented 4 months ago

Thanks for sharing the steps on how to reproduce this. I think the problem here is that it is not the current spec, but it is still in use. This looks like a bug indeed and should be fixed.

manuzhang commented 3 months ago

The issue is we don't persist the partition spec along with its schema in metadata.json file, and always use current schema when rebuilding partition spec from metadata file.

nastra commented 3 months ago

I was able to reproduce this with this short example and I agree that this is a bug that we need to fix:

create schema iceberg;
create table iceberg.tbl (id int, trip_id int, distance int) partitioned by (bucket(8, trip_id));
insert into iceberg.tbl values (1, 1, 10);
alter table iceberg.tbl replace partition field trip_id_bucket with bucket(4, distance);
alter table iceberg.tbl drop column trip_id;
select * from iceberg.tbl;
Caused by: org.apache.iceberg.exceptions.ValidationException: Cannot find source column for partition field: 1000: trip_id_bucket: bucket[8](2)
    at org.apache.iceberg.exceptions.ValidationException.check(ValidationException.java:49)
    at org.apache.iceberg.PartitionSpec.checkCompatibility(PartitionSpec.java:562)
    at org.apache.iceberg.PartitionSpec$Builder.build(PartitionSpec.java:543)
    at org.apache.iceberg.UnboundPartitionSpec.bind(UnboundPartitionSpec.java:46)
    at org.apache.iceberg.PartitionSpecParser.fromJson(PartitionSpecParser.java:71)
    at org.apache.iceberg.PartitionSpecParser.lambda$fromJson$1(PartitionSpecParser.java:88)
    at org.apache.iceberg.util.JsonUtil.parse(JsonUtil.java:98)
    at org.apache.iceberg.PartitionSpecParser.lambda$fromJson$2(PartitionSpecParser.java:88)
    at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.lambda$doComputeIfAbsent$14(BoundedLocalCache.java:2406)
    at java.base/java.util.concurrent.ConcurrentHashMap.compute(ConcurrentHashMap.java:1908)
    at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.doComputeIfAbsent(BoundedLocalCache.java:2404)
    at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.computeIfAbsent(BoundedLocalCache.java:2387)
    at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.LocalCache.computeIfAbsent(LocalCache.java:108)
    at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.LocalManualCache.get(LocalManualCache.java:62)
    at org.apache.iceberg.PartitionSpecParser.fromJson(PartitionSpecParser.java:86)
    at org.apache.iceberg.BaseContentScanTask.spec(BaseContentScanTask.java:71)
    at org.apache.iceberg.BaseFileScanTask.spec(BaseFileScanTask.java:27)
    at org.apache.iceberg.BaseFileScanTask$SplitScanTask.spec(BaseFileScanTask.java:127)
    at org.apache.iceberg.util.PartitionUtil.constantsMap(PartitionUtil.java:49)
    at org.apache.iceberg.util.PartitionUtil.constantsMap(PartitionUtil.java:42)
    at org.apache.iceberg.spark.source.BaseReader.constantsMap(BaseReader.java:198)
    at org.apache.iceberg.spark.source.BatchDataReader.open(BatchDataReader.java:91)
    at org.apache.iceberg.spark.source.BatchDataReader.open(BatchDataReader.java:41)
    at org.apache.iceberg.spark.source.BaseReader.next(BaseReader.java:143)
lurnagao-dahua commented 2 months ago

Hi, may I ask the repair method is to traverse all schemas or the partition spec persist with type in metadata?

walkkker commented 2 months ago

Hi, may I ask the repair method is to traverse all schemas or the partition spec persist with type in metadata?

According to #10352 , I assume it traverses all manifests to find if any manifest file's partitionSpec matches the historical particionSpec whose referenced source column is going to be dropped. If there is a match, it will throw an exception to prevent dropping the column which is referenced by the active partitionSpec.

Though the PR hasn't been merged, i think it's a good repair approach.

lurnagao-dahua commented 2 months ago

Hi, may I ask the repair method is to traverse all schemas or the partition spec persist with type in metadata?

According to #10352 , I assume it traverses all manifests to find if any manifest file's partitionSpec matches the historical particionSpec whose referenced source column is going to be dropped. If there is a match, it will throw an exception to prevent dropping the column which is referenced by the active partitionSpec.

Though the PR hasn't been merged, i think it's a good repair approach.

Thank you very much!