delta-io / delta

An open-source storage framework that enables building a Lakehouse architecture with compute engines including Spark, PrestoDB, Flink, Trino, and Hive and APIs
https://delta.io
Apache License 2.0
7.62k stars 1.71k forks source link

[BUG] Spark - State reconstruction fails when testing when validating INSERT #3730

Closed chirag-s-db closed 1 month ago

chirag-s-db commented 1 month ago

Bug

Which Delta project/connector is this regarding?

Describe the problem

Steps to reproduce

This is reproducible through the CheckpointV2ReadSuite. A minimal example is:

   sql(s"CREATE TABLE tbl (a INT, b STRING) USING delta CLUSTER BY (a) LOCATION 'path' ")
   sql(s"INSERT INTO tbl VALUES (1, 'a'), (2, 'b')")

Note that this failure is only reproducible in a test environment.

Observed results

An exception is thrown here: https://github.com/apache/spark/blob/v3.5.3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/AnalysisHelper.scala#L253

[info] - optimize clustered table *** FAILED ***
[info]   org.apache.spark.SparkRuntimeException: This method should not be called in the analyzer.
[info]   at org.apache.spark.sql.errors.QueryExecutionErrors$.methodCalledInAnalyzerNotAllowedError(QueryExecutionErrors.scala:1549)
[info]   at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.assertNotAnalysisRule(AnalysisHelper.scala:253)
[info]   at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.assertNotAnalysisRule$(AnalysisHelper.scala:249)
[info]   at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.assertNotAnalysisRule(LogicalPlan.scala:32)
[info]   at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformAllExpressionsWithPruning(AnalysisHelper.scala:290)
[info]   at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformAllExpressionsWithPruning$(AnalysisHelper.scala:286)
[info]   at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformAllExpressionsWithPruning(LogicalPlan.scala:32)
[info]   at org.apache.spark.sql.catalyst.optimizer.ReassignLambdaVariableID$.apply(objects.scala:262)
[info]   at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.optimizedSerializer$lzycompute(ExpressionEncoder.scala:373)
[info]   at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.optimizedSerializer(ExpressionEncoder.scala:369)
[info]   at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.createSerializer(ExpressionEncoder.scala:393)
[info]   at org.apache.spark.sql.catalyst.expressions.ScalaUDF.$anonfun$catalystConverter$1(ScalaUDF.scala:132)
[info]   at scala.Option.map(Option.scala:230)
[info]   at org.apache.spark.sql.catalyst.expressions.ScalaUDF.catalystConverter(ScalaUDF.scala:131)
[info]   at org.apache.spark.sql.catalyst.expressions.ScalaUDF.<init>(ScalaUDF.scala:1191)
[info]   at org.apache.spark.sql.expressions.SparkUserDefinedFunction.createScalaUDF(UserDefinedFunction.scala:112)
[info]   at org.apache.spark.sql.expressions.SparkUserDefinedFunction.apply(UserDefinedFunction.scala:100)
[info]   at org.apache.spark.sql.delta.Snapshot.$anonfun$stateReconstruction$1(Snapshot.scala:433)
[info]   at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:171)
[info]   at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:169)
[info]   at org.apache.spark.sql.delta.Snapshot.recordFrameProfile(Snapshot.scala:80)
[info]   at org.apache.spark.sql.delta.Snapshot.stateReconstruction(Snapshot.scala:415)
[info]   at org.apache.spark.sql.delta.Snapshot.$anonfun$cachedState$1(Snapshot.scala:224)
[info]   at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:171)
[info]   at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:169)
[info]   at org.apache.spark.sql.delta.Snapshot.recordFrameProfile(Snapshot.scala:80)
[info]   at org.apache.spark.sql.delta.Snapshot.cachedState$lzycompute(Snapshot.scala:222)
[info]   at org.apache.spark.sql.delta.Snapshot.cachedState(Snapshot.scala:222)
[info]   at org.apache.spark.sql.delta.Snapshot.$anonfun$stateDF$1(Snapshot.scala:347)
[info]   at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:171)
[info]   at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:169)
[info]   at org.apache.spark.sql.delta.Snapshot.recordFrameProfile(Snapshot.scala:80)
[info]   at org.apache.spark.sql.delta.Snapshot.stateDF(Snapshot.scala:347)
[info]   at org.apache.spark.sql.delta.SnapshotStateManager.$anonfun$computedState$2(SnapshotState.scala:80)
[info]   at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:171)
[info]   at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:169)
[info]   at org.apache.spark.sql.delta.Snapshot.recordFrameProfile(Snapshot.scala:80)
[info]   at org.apache.spark.sql.delta.SnapshotStateManager.$anonfun$computedState$1(SnapshotState.scala:78)
[info]   at org.apache.spark.sql.delta.util.DeltaProgressReporter.withJobDescription(DeltaProgressReporter.scala:56)
[info]   at org.apache.spark.sql.delta.util.DeltaProgressReporter.withStatusCode(DeltaProgressReporter.scala:35)
[info]   at org.apache.spark.sql.delta.util.DeltaProgressReporter.withStatusCode$(DeltaProgressReporter.scala:29)
[info]   at org.apache.spark.sql.delta.Snapshot.withStatusCode(Snapshot.scala:80)
[info]   at org.apache.spark.sql.delta.SnapshotStateManager.computedState(SnapshotState.scala:78)
[info]   at org.apache.spark.sql.delta.SnapshotStateManager.computedState$(SnapshotState.scala:76)
[info]   at org.apache.spark.sql.delta.Snapshot.computedState$lzycompute(Snapshot.scala:80)
[info]   at org.apache.spark.sql.delta.Snapshot.computedState(Snapshot.scala:80)
[info]   at org.apache.spark.sql.delta.SnapshotStateManager.numOfFiles(SnapshotState.scala:159)
[info]   at org.apache.spark.sql.delta.SnapshotStateManager.numOfFiles$(SnapshotState.scala:159)
[info]   at org.apache.spark.sql.delta.Snapshot.numOfFiles(Snapshot.scala:80)
[info]   at org.apache.spark.sql.delta.SnapshotStateManager.numOfFilesIfKnown(SnapshotState.scala:168)
[info]   at org.apache.spark.sql.delta.SnapshotStateManager.numOfFilesIfKnown$(SnapshotState.scala:168)
[info]   at org.apache.spark.sql.delta.Snapshot.numOfFilesIfKnown(Snapshot.scala:80)
[info]   at org.apache.spark.sql.delta.files.ShallowSnapshotDescriptor.<init>(TahoeFileIndex.scala:229)
[info]   at org.apache.spark.sql.delta.files.TahoeLogFileIndex.<init>(TahoeFileIndex.scala:260)
[info]   at org.apache.spark.sql.delta.files.TahoeLogFileIndex$.apply(TahoeFileIndex.scala:377)
[info]   at org.apache.spark.sql.delta.DeltaLog.createRelation(DeltaLog.scala:565)
[info]   at org.apache.spark.sql.delta.catalog.DeltaTableV2.$anonfun$toBaseRelation$3(DeltaTableV2.scala:276)
[info]   at scala.Option.getOrElse(Option.scala:189)
[info]   at org.apache.spark.sql.delta.catalog.DeltaTableV2.toBaseRelation$lzycompute(DeltaTableV2.scala:275)
[info]   at org.apache.spark.sql.delta.catalog.DeltaTableV2.toBaseRelation(DeltaTableV2.scala:257)
[info]   at org.apache.spark.sql.delta.DeltaRelation$.$anonfun$fromV2Relation$1(DeltaAnalysis.scala:1211)
[info]   at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:171)
[info]   at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:169)
[info]   at org.apache.spark.sql.delta.DeltaRelation$.recordFrameProfile(DeltaAnalysis.scala:1196)
[info]   at org.apache.spark.sql.delta.DeltaRelation$.fromV2Relation(DeltaAnalysis.scala:1210)
[info]   at org.apache.spark.sql.delta.DeltaRelation$.unapply(DeltaAnalysis.scala:1201)
[info]   at org.apache.spark.sql.delta.DeltaUnsupportedOperationsCheck.checkDeltaTableExists(DeltaUnsupportedOperationsCheck.scala:122
[info]   at org.apache.spark.sql.delta.DeltaUnsupportedOperationsCheck.$anonfun$apply$1(DeltaUnsupportedOperationsCheck.scala:96)
[info]   at org.apache.spark.sql.delta.DeltaUnsupportedOperationsCheck.$anonfun$apply$1$adapted(DeltaUnsupportedOperationsCheck.scala:58)
[info]   at org.apache.spark.sql.catalyst.trees.TreeNode.foreach(TreeNode.scala:234)
[info]   at org.apache.spark.sql.delta.DeltaUnsupportedOperationsCheck.apply(DeltaUnsupportedOperationsCheck.scala:58)
[info]   at org.apache.spark.sql.delta.DeltaUnsupportedOperationsCheck.apply(DeltaUnsupportedOperationsCheck.scala:39)
[info]   at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$60(CheckAnalysis.scala:822)
[info]   at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$60$adapted(CheckAnalysis.scala:822)
[info]   at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
[info]   at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
[info]   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
[info]   at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis0(CheckAnalysis.scala:822)
[info]   at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis0$(CheckAnalysis.scala:197)
[info]   at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis0(Analyzer.scala:202)
[info]   at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis(CheckAnalysis.scala:193)
[info]   at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis$(CheckAnalysis.scala:171)
[info]   at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:202)
[info]   at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:225)
[info]   at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330)
[info]   at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:222)
[info]   at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:77)
[info]   at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:138)
[info]   at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:219)
[info]   at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
[info]   at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:219)
[info]   at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
[info]   at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:218)
[info]   at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:77)
[info]   at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:74)
[info]   at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
[info]   at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
[info]   at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
[info]   at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:869)
[info]   at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:645)
[info]   at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:579)
[info]   at org.apache.spark.sql.delta.clustering.ClusteredTableClusteringSuite.$anonfun$addFiles$2(ClusteredTableClusteringSuite.scala:43)
[info]   at org.apache.spark.sql.catalyst.plans.SQLHelper.withSQLConf(SQLHelper.scala:54)
[info]   at org.apache.spark.sql.catalyst.plans.SQLHelper.withSQLConf$(SQLHelper.scala:38)
[info]   at org.apache.spark.sql.delta.clustering.ClusteredTableClusteringSuite.org$apache$spark$sql$test$SQLTestUtilsBase$$super$withSQLConf(ClusteredTableClusteringSuite.scala:31)
[info]   at org.apache.spark.sql.test.SQLTestUtilsBase.withSQLConf(SQLTestUtils.scala:247)
[info]   at org.apache.spark.sql.test.SQLTestUtilsBase.withSQLConf$(SQLTestUtils.scala:245)
[info]   at org.apache.spark.sql.delta.clustering.ClusteredTableClusteringSuite.withSQLConf(ClusteredTableClusteringSuite.scala:31)
[info]   at org.apache.spark.sql.delta.clustering.ClusteredTableClusteringSuite.addFiles(ClusteredTableClusteringSuite.scala:43)
[info]   at org.apache.spark.sql.delta.clustering.ClusteredTableClusteringSuite.$anonfun$new$5(ClusteredTableClusteringSuite.scala:99)
[info]   at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
[info]   at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
[info]   at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
[info]   at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
[info]   at org.apache.spark.sql.delta.skipping.ClusteredTableTestUtilsBase.withClusteredTable(ClusteredTableTestUtils.scala:193)
[info]   at org.apache.spark.sql.delta.skipping.ClusteredTableTestUtilsBase.withClusteredTable$(ClusteredTableTestUtils.scala:185)
[info]   at org.apache.spark.sql.delta.clustering.ClusteredTableClusteringSuite.withClusteredTable(ClusteredTableClusteringSuite.scala:31)
[info]   at org.apache.spark.sql.delta.clustering.ClusteredTableClusteringSuite.$anonfun$new$4(ClusteredTableClusteringSuite.scala:98)
[info]   at org.apache.spark.sql.catalyst.plans.SQLHelper.withSQLConf(SQLHelper.scala:54)
[info]   at org.apache.spark.sql.catalyst.plans.SQLHelper.withSQLConf$(SQLHelper.scala:38)
[info]   at org.apache.spark.sql.delta.clustering.ClusteredTableClusteringSuite.org$apache$spark$sql$test$SQLTestUtilsBase$$super$withSQLConf(ClusteredTableClusteringSuite.scala:31)
[info]   at org.apache.spark.sql.test.SQLTestUtilsBase.withSQLConf(SQLTestUtils.scala:247)
[info]   at org.apache.spark.sql.test.SQLTestUtilsBase.withSQLConf$(SQLTestUtils.scala:245)
[info]   at org.apache.spark.sql.delta.clustering.ClusteredTableClusteringSuite.withSQLConf(ClusteredTableClusteringSuite.scala:31)
[info]   at org.apache.spark.sql.delta.clustering.ClusteredTableClusteringSuite.$anonfun$new$3(ClusteredTableClusteringSuite.scala:98)
[info]   at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
[info]   at org.scalatest.enablers.Timed$$anon$1.timeoutAfter(Timed.scala:127)
[info]   at org.scalatest.concurrent.TimeLimits$.failAfterImpl(TimeLimits.scala:282)
[info]   at org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:231)
[info]   at org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:230)
[info]   at org.apache.spark.SparkFunSuite.failAfter(SparkFunSuite.scala:69)
[info]   at org.apache.spark.SparkFunSuite.$anonfun$test$2(SparkFunSuite.scala:155)
[info]   at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226)
[info]   at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:227)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236)
[info]   at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218)
[info]   at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:69)
[info]   at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234)
[info]   at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227)
[info]   at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:69)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269)
[info]   at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
[info]   at scala.collection.immutable.List.foreach(List.scala:431)
[info]   at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
[info]   at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
[info]   at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:269)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:268)
[info]   at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1564)
[info]   at org.scalatest.Suite.run(Suite.scala:1114)
[info]   at org.scalatest.Suite.run$(Suite.scala:1096)
[info]   at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1564)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:273)
[info]   at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:273)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:272)
[info]   at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:69)
[info]   at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
[info]   at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
[info]   at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
[info]   at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:69)
[info]   at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:321)
[info]   at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:517)
[info]   at sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:414)
[info]   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
[info]   at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
[info]   at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
[info]   at java.base/java.lang.Thread.run(Thread.java:829)

This is caused by the fact that we are eagerly executing a query (state reconstruction) inside of the Analyzer (checkAnalysis). I think this should be a relatively simple fix to allow transformDown in the analyzer here: https://github.com/delta-io/delta/blob/master/spark/src/main/scala/org/apache/spark/sql/delta/DeltaUnsupportedOperationsCheck.scala#L120

Expected results

Insert and not throw an error.

Further details

I've bisected the issue to this commit: https://github.com/delta-io/delta/commit/68713799ea7ceb769366c32f0f3df811674765ef cc: @harperjiang

Environment information

Latest master

Willingness to contribute

The Delta Lake Community encourages bug fix contributions. Would you or another member of your organization be willing to contribute a fix for this bug to the Delta Lake code base?

chirag-s-db commented 1 month ago

Also cc: @vkorukanti who originally found the test failure