holdenk / spark-testing-base

Base classes to use when writing tests with Spark
Apache License 2.0
1.51k stars 359 forks source link

SparkSession with DeltaLake related settings #324

Open ashikaumanga opened 4 years ago

ashikaumanga commented 4 years ago

I am trying to unit-test DeltaLake operations. For this I need to initialize SparkSession with following options

abstract class BaseSpec
  extends FunSuite
    with BeforeAndAfter
    with BeforeAndAfterEach
    with GivenWhenThen
    with SharedSparkContext
    with DataFrameSuiteBase {

  override lazy val spark: SparkSession =
    SparkSession.builder().master("local")
      .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
      .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
      .getOrCreate()

   def printTestStartMessage(message: String): Unit = {
    println(s"${"*" * 12} ${this.getClass.getSimpleName} - ${message} ${"*" * 12}")
  }
}

However,when I run the unit-tests it still throws following error, probably caused by not setting above options to the SparkSession

20/06/23 22:06:43 WARN SparkSession$Builder: Using an existing SparkSession; the static sql configurations will not take effect. 20/06/23 22:06:43 WARN SparkSession$Builder: Using an existing SparkSession; some spark core configurations may not take effect.

This Delta operation requires the SparkSession to be configured with the DeltaSparkSessionExtension and the DeltaCatalog. Please set the necessary configurations when creating the SparkSession as shown below.

SparkSession.builder() .option("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") .option("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog" ... .build() ; org.apache.spark.sql.AnalysisException: This Delta operation requires the SparkSession to be configured with the DeltaSparkSessionExtension and the DeltaCatalog. Please set the necessary configurations when creating the SparkSession as shown below.

SparkSession.builder() .option("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") .option("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog" ... .build() ; at org.apache.spark.sql.delta.DeltaErrors$.configureSparkSessionWithExtensionAndCatalog(DeltaErrors.scala:949) at org.apache.spark.sql.delta.util.AnalysisHelper.improveUnsupportedOpError(AnalysisHelper.scala:62) at org.apache.spark.sql.delta.util.AnalysisHelper.improveUnsupportedOpError$(AnalysisHelper.scala:48) at io.delta.tables.DeltaTable.improveUnsupportedOpError(DeltaTable.scala:42) at io.delta.tables.execution.DeltaTableOperations.executeDelete(DeltaTableOperations.scala:37) at io.delta.tables.execution.DeltaTableOperations.executeDelete$(DeltaTableOperations.scala:37) at io.delta.tables.DeltaTable.executeDelete(DeltaTable.scala:42) at io.delta.tables.DeltaTable.delete(DeltaTable.scala:181) at com.r

Anyway to fix this ?

chanon-onman commented 3 years ago

@ashikaumanga I got the same issue with delta, I solve it by overriding conf method.

class MyDeltaTest extends FunSuite with DataFrameSuiteBase {
...
  override def conf: SparkConf = super.conf
    .set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .set("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
...
}

https://github.com/holdenk/spark-testing-base/blob/master/core/src/main/2.0/scala/com/holdenkarau/spark/testing/SparkContextProvider.scala#L28-L35

https://github.com/holdenk/spark-testing-base/blob/master/core/src/main/2.0/scala/com/holdenkarau/spark/testing/DataFrameSuiteBase.scala#L116