delta-io / delta

An open-source storage framework that enables building a Lakehouse architecture with compute engines including Spark, PrestoDB, Flink, Trino, and Hive and APIs
https://delta.io
Apache License 2.0
7.58k stars 1.7k forks source link

[BUG][Spark] Deletion Vectors can cause `AMBIGUOUS_REFERENCE` errors on MERGE #2943

Open whitleykeith opened 6 months ago

whitleykeith commented 6 months ago

Bug

Which Delta project/connector is this regarding?

Describe the problem

TL;DR: Certain MERGE operations with deletion vectors enabled can consistently fail, though more investigation is needed on why these specific MERGEs fail

For context, we have a system to incremental take snapshots from upstream JDBC sources and write them into Delta. This system ultimately creates a DF that looks something like this like this:

+----+------+------+------------+
| id | col1 | col2 | _is_delete |
+----+------+------+------------+
|  1 | val1 | val2 | False      |
|  2 | val3 | val4 | False      |
|  3 | NULL | NULL | True       |
+----+------+------+------------+

The _is_delete column is a temporary column in this DF to determine if a row is being deleted or not in the Delta table. This DF is then MERGED into our existing snapshot table (we would have taken a normal snapshot if the table didn't exist yet), updating/deleting necessary rows. We do this in one MERGE so we can have single transaction for a given snapshot, and this works pretty well for our tables across the board.

We recently enabled Deletion Vectors for performance benefits, etc. and have noticed a sparse-yet-unavoidable ERRORs since enabling it. The core error is Reference `filePath` is ambiguous, could be: [`filePath`, `filePath`, `filePath`], and the stacktrace (pasted below) indicates this is happening when building the DV.

We've noticed the following:

Steps to reproduce

This is the following merge command we use:

      deltaTable
        .as("current")
        .merge(
          rows.as("rows"),
          s"current.${idCol} = rows.${idCol} and current.${idCol} >= ${minId} and current.${idCol} <= ${maxId}"
        )
        .whenMatched("rows._is_delete = true")
        .delete()
        .whenMatched()
        .updateAll()
        .whenNotMatched("rows._is_delete is null")
        .insertAll()
        .execute()

Observed results

Full Failure of MERGE operation

Expected results

Successful MERGE operation

Further details

Stacktrace:

org.apache.spark.sql.AnalysisException: [AMBIGUOUS_REFERENCE] Reference `filePath` is ambiguous, could be: [`filePath`, `filePath`, `filePath`].
    at org.apache.spark.sql.errors.QueryCompilationErrors$.ambiguousReferenceError(QueryCompilationErrors.scala:1938)
    at org.apache.spark.sql.catalyst.expressions.package$AttributeSeq.resolve(package.scala:377)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveChildren(LogicalPlan.scala:144)
    at org.apache.spark.sql.catalyst.analysis.ColumnResolutionHelper.$anonfun$resolveExpressionByPlanChildren$1(ColumnResolutionHelper.scala:364)
    at org.apache.spark.sql.catalyst.analysis.ColumnResolutionHelper.$anonfun$resolveExpression$3(ColumnResolutionHelper.scala:157)
    at org.apache.spark.sql.catalyst.analysis.package$.withPosition(package.scala:100)
    at org.apache.spark.sql.catalyst.analysis.ColumnResolutionHelper.$anonfun$resolveExpression$1(ColumnResolutionHelper.scala:164)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
    at org.apache.spark.sql.catalyst.analysis.ColumnResolutionHelper.innerResolve$1(ColumnResolutionHelper.scala:135)
    at org.apache.spark.sql.catalyst.analysis.ColumnResolutionHelper.resolveExpression(ColumnResolutionHelper.scala:194)
    at org.apache.spark.sql.catalyst.analysis.ColumnResolutionHelper.resolveExpressionByPlanChildren(ColumnResolutionHelper.scala:371)
    at org.apache.spark.sql.catalyst.analysis.ColumnResolutionHelper.resolveExpressionByPlanChildren$(ColumnResolutionHelper.scala:357)
    at org.apache.spark.sql.catalyst.analysis.ResolveReferencesInAggregate$.resolveExpressionByPlanChildren(ResolveReferencesInAggregate.scala:49)
    at org.apache.spark.sql.catalyst.analysis.ResolveReferencesInAggregate$.$anonfun$apply$1(ResolveReferencesInAggregate.scala:61)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at scala.collection.TraversableLike.map(TraversableLike.scala:286)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
    at scala.collection.AbstractTraversable.map(Traversable.scala:108)
    at org.apache.spark.sql.catalyst.analysis.ResolveReferencesInAggregate$.apply(ResolveReferencesInAggregate.scala:61)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$16.applyOrElse(Analyzer.scala:1602)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$16.applyOrElse(Analyzer.scala:1494)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$3(AnalysisHelper.scala:138)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$1(AnalysisHelper.scala:138)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning(AnalysisHelper.scala:134)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning$(AnalysisHelper.scala:130)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUpWithPruning(LogicalPlan.scala:32)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUp(AnalysisHelper.scala:111)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUp$(AnalysisHelper.scala:110)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUp(LogicalPlan.scala:32)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.apply(Analyzer.scala:1494)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.apply(Analyzer.scala:1469)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:222)
    at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
    at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
    at scala.collection.immutable.List.foldLeft(List.scala:91)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:219)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:211)
    at scala.collection.immutable.List.foreach(List.scala:431)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:211)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:226)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$execute$1(Analyzer.scala:222)
    at org.apache.spark.sql.catalyst.analysis.AnalysisContext$.withNewAnalysisContext(Analyzer.scala:173)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:222)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:188)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:182)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:89)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:182)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:209)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:208)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:77)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:138)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:219)
    at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:219)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
    at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:218)
    at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:77)
    at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:74)
    at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:66)
    at org.apache.spark.sql.Dataset$.$anonfun$ofRows$1(Dataset.scala:91)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
    at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:89)
    at org.apache.spark.sql.RelationalGroupedDataset.toDF(RelationalGroupedDataset.scala:75)
    at org.apache.spark.sql.RelationalGroupedDataset.agg(RelationalGroupedDataset.scala:244)
    at org.apache.spark.sql.delta.commands.DeletionVectorBitmapGenerator$DeletionVectorSet.computeResult(DMLWithDeletionVectorsHelper.scala:307)
    at org.apache.spark.sql.delta.commands.DeletionVectorBitmapGenerator$.buildDeletionVectors(DMLWithDeletionVectorsHelper.scala:359)
    at org.apache.spark.sql.delta.commands.DeletionVectorBitmapGenerator$.buildRowIndexSetsForFilesMatchingCondition(DMLWithDeletionVectorsHelper.scala:406)
    at org.apache.spark.sql.delta.commands.merge.ClassicMergeExecutor.$anonfun$writeDVs$1(ClassicMergeExecutor.scala:503)
    at org.apache.spark.sql.delta.commands.MergeIntoCommandBase.executeThunk$1(MergeIntoCommandBase.scala:423)
    at org.apache.spark.sql.delta.commands.MergeIntoCommandBase.$anonfun$recordMergeOperation$7(MergeIntoCommandBase.scala:440)
    at org.apache.spark.sql.delta.util.DeltaProgressReporter.withJobDescription(DeltaProgressReporter.scala:53)
    at org.apache.spark.sql.delta.util.DeltaProgressReporter.withStatusCode(DeltaProgressReporter.scala:32)
    at org.apache.spark.sql.delta.util.DeltaProgressReporter.withStatusCode$(DeltaProgressReporter.scala:27)
    at org.apache.spark.sql.delta.commands.MergeIntoCommand.withStatusCode(MergeIntoCommand.scala:60)
    at org.apache.spark.sql.delta.commands.MergeIntoCommandBase.$anonfun$recordMergeOperation$6(MergeIntoCommandBase.scala:440)
    at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:140)
    at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:138)
    at org.apache.spark.sql.delta.commands.MergeIntoCommand.recordFrameProfile(MergeIntoCommand.scala:60)
    at org.apache.spark.sql.delta.metering.DeltaLogging.$anonfun$recordDeltaOperationInternal$1(DeltaLogging.scala:133)
    at com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:128)
    at com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:117)
    at org.apache.spark.sql.delta.commands.MergeIntoCommand.recordOperation(MergeIntoCommand.scala:60)
    at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperationInternal(DeltaLogging.scala:132)
    at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:122)
    at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:112)
    at org.apache.spark.sql.delta.commands.MergeIntoCommand.recordDeltaOperation(MergeIntoCommand.scala:60)
    at org.apache.spark.sql.delta.commands.MergeIntoCommandBase.recordMergeOperation(MergeIntoCommandBase.scala:437)
    at org.apache.spark.sql.delta.commands.MergeIntoCommandBase.recordMergeOperation$(MergeIntoCommandBase.scala:401)
    at org.apache.spark.sql.delta.commands.MergeIntoCommand.recordMergeOperation(MergeIntoCommand.scala:60)
    at org.apache.spark.sql.delta.commands.merge.ClassicMergeExecutor.writeDVs(ClassicMergeExecutor.scala:473)
    at org.apache.spark.sql.delta.commands.merge.ClassicMergeExecutor.writeDVs$(ClassicMergeExecutor.scala:467)
    at org.apache.spark.sql.delta.commands.MergeIntoCommand.writeDVs(MergeIntoCommand.scala:60)
    at org.apache.spark.sql.delta.commands.MergeIntoCommand.$anonfun$runMerge$5(MergeIntoCommand.scala:132)
    at org.apache.spark.sql.delta.util.DeltaProgressReporter.withJobDescription(DeltaProgressReporter.scala:53)
    at org.apache.spark.sql.delta.util.DeltaProgressReporter.withStatusCode(DeltaProgressReporter.scala:32)
    at org.apache.spark.sql.delta.util.DeltaProgressReporter.withStatusCode$(DeltaProgressReporter.scala:27)
    at org.apache.spark.sql.delta.commands.MergeIntoCommand.withStatusCode(MergeIntoCommand.scala:60)
    at org.apache.spark.sql.delta.commands.MergeIntoCommand.$anonfun$runMerge$2(MergeIntoCommand.scala:132)
    at org.apache.spark.sql.delta.commands.MergeIntoCommand.$anonfun$runMerge$2$adapted(MergeIntoCommand.scala:83)
    at org.apache.spark.sql.delta.DeltaLog.withNewTransaction(DeltaLog.scala:223)
    at org.apache.spark.sql.delta.commands.MergeIntoCommand.$anonfun$runMerge$1(MergeIntoCommand.scala:83)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:140)
    at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:138)
    at org.apache.spark.sql.delta.commands.MergeIntoCommand.recordFrameProfile(MergeIntoCommand.scala:60)
    at org.apache.spark.sql.delta.metering.DeltaLogging.$anonfun$recordDeltaOperationInternal$1(DeltaLogging.scala:133)
    at com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:128)
    at com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:117)
    at org.apache.spark.sql.delta.commands.MergeIntoCommand.recordOperation(MergeIntoCommand.scala:60)
    at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperationInternal(DeltaLogging.scala:132)
    at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:122)
    at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:112)
    at org.apache.spark.sql.delta.commands.MergeIntoCommand.recordDeltaOperation(MergeIntoCommand.scala:60)
    at org.apache.spark.sql.delta.commands.MergeIntoCommand.runMerge(MergeIntoCommand.scala:81)
    at org.apache.spark.sql.delta.commands.MergeIntoCommandBase.$anonfun$run$1(MergeIntoCommandBase.scala:138)
    at org.apache.spark.sql.delta.commands.merge.MergeIntoMaterializeSource.runWithMaterializedSourceLostRetries(MergeIntoMaterializeSource.scala:106)
    at org.apache.spark.sql.delta.commands.merge.MergeIntoMaterializeSource.runWithMaterializedSourceLostRetries$(MergeIntoMaterializeSource.scala:94)
    at org.apache.spark.sql.delta.commands.MergeIntoCommand.runWithMaterializedSourceLostRetries(MergeIntoCommand.scala:60)
    at org.apache.spark.sql.delta.commands.MergeIntoCommandBase.run(MergeIntoCommandBase.scala:138)
    at org.apache.spark.sql.delta.commands.MergeIntoCommandBase.run$(MergeIntoCommandBase.scala:113)
    at org.apache.spark.sql.delta.commands.MergeIntoCommand.run(MergeIntoCommand.scala:60)

Environment information

Willingness to contribute

The Delta Lake Community encourages bug fix contributions. Would you or another member of your organization be willing to contribute a fix for this bug to the Delta Lake code base?

whitleykeith commented 6 months ago

I believe this is an issue with tables that have columns named path as it seems this error consistently happens on them and I can see DV look to have path columns defined: https://github.com/delta-io/delta/blob/7f199febb84d2c62218fdffbc3a7fe1e48086638/spark/src/main/scala/org/apache/spark/sql/delta/commands/DMLWithDeletionVectorsHelper.scala#L422-L424