delta-io / delta

An open-source storage framework that enables building a Lakehouse architecture with compute engines including Spark, PrestoDB, Flink, Trino, and Hive and APIs
https://delta.io
Apache License 2.0
7.58k stars 1.7k forks source link

NullPointerException was thrown during the write after calling sparkSession.stop() and then recreating a new SparkSession #629

Closed qxzzxq closed 2 years ago

qxzzxq commented 3 years ago

Description

Our unit tests worked well before Delta 0.7.0. After upgrading Delta from 0.7.0 to 0.8.0, we noticed that our unit tests failed due to some java.lang.NullPointerException that was thrown during the write of Dataset (https://github.com/SETL-Framework/setl/pull/183).

The test framework we used is ScalaTest 3.2.1. In our tests, we stop the active SparkSession at the beginning of each test and re-create a new one with a different SparkConf.

We found that if we shut down the active SparkSession and create a new one, then Datasets/DataFrames could no longer be written. However, if we use the existing one, no NullPointerException will be thrown in this situation. Unfortunately, this is not a suitable solution as we need to modify the configuration for each test.

For other formats, like csv, we don't have any error. I don't know if this is a designed feature of Delta or there is indeed a bug.

Error Reproduction

Run the following piece of code to reproduce the error.

import org.apache.spark.sql.{SaveMode, SparkSession}

object Test {

  val path: String = "src/test/resources/test-delta2"
  val saveMode: SaveMode = SaveMode.Append
  val format = "delta"

  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder().master("local").getOrCreate()
    spark.range(10)
      .write
      .format(format)
      .mode(saveMode)
      .save(path)
    val loaded = spark.read.format(format).load(path)

    loaded.show(false)

    SparkSession.getActiveSession match {
      case Some(ss) =>
        ss.stop()  // if we comment this line, then the program will finish without NullPointerException
      case _ =>
    }
    val spark2 = SparkSession.builder().master("local").getOrCreate()
    spark2.range(12)
      .write
      .format(format)
      .mode(saveMode)
      .save(path)
    val loaded2 = spark2.read.format(format).load(path)

    loaded2.show(false)
  }
}

NullPointerException will be thrown with Delta 0.7.0 and 0.8.0 (the code does throw a NullPointerException with Delta 0.7.0 but I don't understand why our unit tests could still pass :p )

Exception in thread "main" java.lang.NullPointerException
    at org.apache.spark.storage.BlockManagerMaster.removeRdd(BlockManagerMaster.scala:130)
    at org.apache.spark.SparkContext.unpersistRDD(SparkContext.scala:1814)
    at org.apache.spark.rdd.RDD.unpersist(RDD.scala:220)
    at org.apache.spark.sql.delta.util.StateCache.$anonfun$uncache$1(StateCache.scala:104)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.sql.delta.util.StateCache.uncache(StateCache.scala:104)
    at org.apache.spark.sql.delta.util.StateCache.uncache$(StateCache.scala:101)
    at org.apache.spark.sql.delta.Snapshot.uncache(Snapshot.scala:52)
    at org.apache.spark.sql.delta.SnapshotManagement.$anonfun$updateInternal$2(SnapshotManagement.scala:304)
    at org.apache.spark.sql.delta.util.DeltaProgressReporter.withStatusCode(DeltaProgressReporter.scala:30)
    at org.apache.spark.sql.delta.util.DeltaProgressReporter.withStatusCode$(DeltaProgressReporter.scala:25)
    at org.apache.spark.sql.delta.DeltaLog.withStatusCode(DeltaLog.scala:59)
    at org.apache.spark.sql.delta.SnapshotManagement.$anonfun$updateInternal$1(SnapshotManagement.scala:275)
    at com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:77)
    at com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:67)
    at org.apache.spark.sql.delta.DeltaLog.recordOperation(DeltaLog.scala:59)
    at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:103)
    at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:89)
    at org.apache.spark.sql.delta.DeltaLog.recordDeltaOperation(DeltaLog.scala:59)
    at org.apache.spark.sql.delta.SnapshotManagement.updateInternal(SnapshotManagement.scala:275)
    at org.apache.spark.sql.delta.SnapshotManagement.updateInternal$(SnapshotManagement.scala:273)
    at org.apache.spark.sql.delta.DeltaLog.updateInternal(DeltaLog.scala:59)
    at org.apache.spark.sql.delta.SnapshotManagement.$anonfun$update$1(SnapshotManagement.scala:235)
    at org.apache.spark.sql.delta.DeltaLog.lockInterruptibly(DeltaLog.scala:152)
    at org.apache.spark.sql.delta.SnapshotManagement.update(SnapshotManagement.scala:235)
    at org.apache.spark.sql.delta.SnapshotManagement.update$(SnapshotManagement.scala:231)
    at org.apache.spark.sql.delta.DeltaLog.update(DeltaLog.scala:59)
    at org.apache.spark.sql.delta.OptimisticTransactionImpl.$anonfun$doCommit$1(OptimisticTransaction.scala:487)
    at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
    at org.apache.spark.sql.delta.DeltaLog.lockInterruptibly(DeltaLog.scala:152)
    at org.apache.spark.sql.delta.OptimisticTransactionImpl.doCommit(OptimisticTransaction.scala:464)
    at org.apache.spark.sql.delta.OptimisticTransactionImpl.doCommit$(OptimisticTransaction.scala:459)
    at org.apache.spark.sql.delta.OptimisticTransaction.doCommit(OptimisticTransaction.scala:80)
    at org.apache.spark.sql.delta.OptimisticTransactionImpl.$anonfun$commit$1(OptimisticTransaction.scala:340)
    at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
    at com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:77)
    at com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:67)
    at org.apache.spark.sql.delta.OptimisticTransaction.recordOperation(OptimisticTransaction.scala:80)
    at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:103)
    at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:89)
    at org.apache.spark.sql.delta.OptimisticTransaction.recordDeltaOperation(OptimisticTransaction.scala:80)
    at org.apache.spark.sql.delta.OptimisticTransactionImpl.commit(OptimisticTransaction.scala:295)
    at org.apache.spark.sql.delta.OptimisticTransactionImpl.commit$(OptimisticTransaction.scala:293)
    at org.apache.spark.sql.delta.OptimisticTransaction.commit(OptimisticTransaction.scala:80)
    at org.apache.spark.sql.delta.commands.WriteIntoDelta.$anonfun$run$1(WriteIntoDelta.scala:68)
    at org.apache.spark.sql.delta.commands.WriteIntoDelta.$anonfun$run$1$adapted(WriteIntoDelta.scala:64)
    at org.apache.spark.sql.delta.DeltaLog.withNewTransaction(DeltaLog.scala:188)
    at org.apache.spark.sql.delta.commands.WriteIntoDelta.run(WriteIntoDelta.scala:64)
    at org.apache.spark.sql.delta.sources.DeltaDataSource.createRelation(DeltaDataSource.scala:152)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:122)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:121)
    at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:963)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:963)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:415)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:346)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:288)
    at com.jcdecaux.setl.Test$.main(Test.scala:33)
    at com.jcdecaux.setl.Test.main(Test.scala)

Environment

Java 1.8 Scala 2.12.10 Spark 3.0.1 Delta 0.8.0 ScalaTest 3.2.1

jaceklaskowski commented 3 years ago

It's Spark 3.0.1, isn't it? Could this be somehow related to https://github.com/apache/spark/commit/9236fc407fe11db13063129d6a2494e573891766? Looks suspiciously similar. I'd check out 3.0.2 to see if things got any better.

qxzzxq commented 3 years ago

Hi @jaceklaskowski thank you for your reply. The version of Spark we used is 3.0.1.

I tried to run with Spark 3.0.2 and 3.0.0, I still got the same problem.

walmaaoui commented 3 years ago

seems like the problem still exists in delta 1.0.0 with a different message

  java.lang.IllegalStateException: SparkContext has been shutdown
jaceklaskowski commented 3 years ago

Can you post the app (to reproduce the issue) to delta forum?

walmaaoui commented 3 years ago

The snippet in the initial message of @qxzzxq reproduce it, I will post it in the forum

walmaaoui commented 3 years ago

the forum link https://groups.google.com/g/delta-users/c/K3_7kmKbmI4

zsxwing commented 3 years ago

@qxzzxq Thanks for reporting this. Which version doesn't have this issue? I tried 0.6.0 and 0.70 and saw NullPointerException as well. One simple fix would be ignoring exceptions at this line since we don't care about the uncache failure: https://github.com/delta-io/delta/blob/v1.0.0/core/src/main/scala/org/apache/spark/sql/delta/util/StateCache.scala#L107

Feel free to open a PR to fix it if anyone has free time :)

dan-coates commented 3 years ago

I'm also seeing this bug with Delta 1.0.0 and Spark 3.1.2. It seems like Delta is storing some state about a cached snapshot that persists even when the underlying SparkSession has been closed. It probably needs to either detect when the SparkSession has changed or been deleted and invalidate the cache, or tolerate exceptions in the uncaching as @zsxwing suggested.

This is happening in my unit tests as with @qxzzxq. Are there any suggested workarounds? I'm not sure that persisting a single SparkSession across tests is a good idea, but I'm not sure if there are any other options. There would have to be some way to access the snapshot in question and unpersist it prior to shutting down the first SparkSession. I'm not entirely sure what objects Delta is creating that are spanning SparkSessions and tests.

scottsand-db commented 3 years ago

We are currently reviewing this issue and will follow up shortly.

arunbenoyv commented 2 years ago

Hi, when can we expect a fix for this issue. And in the mean time is there a temporary workaround to avoid this issue?

zsxwing commented 2 years ago

I submitted #881 to fix this. You can call org.apache.spark.sql.delta.DeltaLog.clearCache() after stopping Spark as a workaround.