delta-io / delta

An open-source storage framework that enables building a Lakehouse architecture with compute engines including Spark, PrestoDB, Flink, Trino, and Hive and APIs
https://delta.io
Apache License 2.0
7.65k stars 1.72k forks source link

[BUG][Spark] Exception while using 3-layer-namespace on Spark #2434

Open rosspalmer opened 11 months ago

rosspalmer commented 11 months ago

Bug

Which Delta project/connector is this regarding?

Describe the problem

As of Spark 3.4.0, native support for 3-layer-namespaces for tables was added into SQL API, allowing multiple catalogs to be accessed through using a full table name of the <catalog>.<schema>.<table> convention. Multiple catalogs can be set using the spark.sql.catalog.<catalog_name>=... spark config.

This works when using the Apache Iceberg example below, but does not work when utilizing multiple Delta catalogs. While the SparkSession is initiated with the catalog present in the session, when a second, non spark_catalog catalog is referenced, the following exception is thrown.

[INTERNAL_ERROR] The Spark SQL phase analysis failed with an internal error. You hit a bug in Spark or the Spark plugins you use. Please, report this bug to the corresponding communities or vendors, and provide the full stack trace.

Here is a recent StackOverflow post experiencing the same issue with PySpark: https://stackoverflow.com/questions/77751057/multiple-catalogs-in-spark

Steps to reproduce

I am running this on my local machine, in client mode, using my local filesystem to host data.

Here is my SparkSession generator:

override lazy val spark = SparkSession.builder()
      .master("local")
      .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
      .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
      .config("spark.sql.catalog.catalog_b", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
      .enableHiveSupport()
      .getOrCreate()

Here is a batch of testing code:

import spark.implicits._

val df = Seq(
  ("A", 1, 2.3),
  ("B", 4, 5.6)
).toDF("X", "Y", "Z")

spark.sql("create schema here")

df.write.mode(SaveMode.Overwrite).saveAsTable("here.now")

val df2 = df.union(Seq(("C", 7, 8.9)).toDF("X", "Y", "Z"))
df2.write.mode(SaveMode.Overwrite).saveAsTable("spark_catalog.here.now2")

spark.catalog.setCurrentCatalog("catalog_b")
spark.sql("create schema here")

val df3 = df2.union(Seq(("D", 10, 11.12)).toDF("X", "Y", "Z"))
df3.write.mode(SaveMode.Overwrite).saveAsTable("catalog_b.here.now")

Observed results

When running with the example above, the spark.catalog.setCurrentCatalog("catalog_b") command works but then the following spark.sql("create schema here") command throws the exception below:

[INTERNAL_ERROR] The Spark SQL phase analysis failed with an internal error. You hit a bug in Spark or the Spark plugins you use. Please, report this bug to the corresponding communities or vendors, and provide the full stack trace.
org.apache.spark.SparkException: [INTERNAL_ERROR] The Spark SQL phase analysis failed with an internal error. You hit a bug in Spark or the Spark plugins you use. Please, report this bug to the corresponding communities or vendors, and provide the full stack trace.
    at org.apache.spark.SparkException$.internalError(SparkException.scala:107)
    at org.apache.spark.sql.execution.QueryExecution$.toInternalError(QueryExecution.scala:536)
    at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:548)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:219)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
    at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:218)
    at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:77)
    at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:74)
    at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:66)
    at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:99)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
    at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
    at org.apache.spark.sql.SparkSession.$anonfun$sql$4(SparkSession.scala:691)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
    at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:682)
    at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:713)
    at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:744)
    at com.hungryroot.ThreeLayerDeltaTest.$anonfun$new$2(ThreeLayerNamespaceTest.scala:94)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
    at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
    at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
    at org.scalatest.Transformer.apply(Transformer.scala:22)
    at org.scalatest.Transformer.apply(Transformer.scala:20)
    at org.scalatest.flatspec.AnyFlatSpecLike$$anon$5.apply(AnyFlatSpecLike.scala:1832)
    at org.scalatest.TestSuite.withFixture(TestSuite.scala:196)
    at org.scalatest.TestSuite.withFixture$(TestSuite.scala:195)
    at org.scalatest.flatspec.AnyFlatSpec.withFixture(AnyFlatSpec.scala:1686)
    at org.scalatest.flatspec.AnyFlatSpecLike.invokeWithFixture$1(AnyFlatSpecLike.scala:1830)
    at org.scalatest.flatspec.AnyFlatSpecLike.$anonfun$runTest$1(AnyFlatSpecLike.scala:1842)
    at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
    at org.scalatest.flatspec.AnyFlatSpecLike.runTest(AnyFlatSpecLike.scala:1842)
    at org.scalatest.flatspec.AnyFlatSpecLike.runTest$(AnyFlatSpecLike.scala:1824)
    at org.scalatest.flatspec.AnyFlatSpec.runTest(AnyFlatSpec.scala:1686)
    at org.scalatest.flatspec.AnyFlatSpecLike.$anonfun$runTests$1(AnyFlatSpecLike.scala:1900)
    at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
    at scala.collection.immutable.List.foreach(List.scala:431)
    at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
    at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:390)
    at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:427)
    at scala.collection.immutable.List.foreach(List.scala:431)
    at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
    at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
    at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
    at org.scalatest.flatspec.AnyFlatSpecLike.runTests(AnyFlatSpecLike.scala:1900)
    at org.scalatest.flatspec.AnyFlatSpecLike.runTests$(AnyFlatSpecLike.scala:1899)
    at org.scalatest.flatspec.AnyFlatSpec.runTests(AnyFlatSpec.scala:1686)
    at org.scalatest.Suite.run(Suite.scala:1114)
    at org.scalatest.Suite.run$(Suite.scala:1096)
    at org.scalatest.flatspec.AnyFlatSpec.org$scalatest$flatspec$AnyFlatSpecLike$$super$run(AnyFlatSpec.scala:1686)
    at org.scalatest.flatspec.AnyFlatSpecLike.$anonfun$run$1(AnyFlatSpecLike.scala:1945)
    at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
    at org.scalatest.flatspec.AnyFlatSpecLike.run(AnyFlatSpecLike.scala:1945)
    at org.scalatest.flatspec.AnyFlatSpecLike.run$(AnyFlatSpecLike.scala:1943)
    at org.scalatest.flatspec.AnyFlatSpec.run(AnyFlatSpec.scala:1686)
    at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:47)
    at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13(Runner.scala:1321)
    at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13$adapted(Runner.scala:1315)
    at scala.collection.immutable.List.foreach(List.scala:431)
    at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1315)
    at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24(Runner.scala:992)
    at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24$adapted(Runner.scala:970)
    at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1481)
    at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:970)
    at org.scalatest.tools.Runner$.run(Runner.scala:798)
    at org.scalatest.tools.Runner.run(Runner.scala)
    at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2or3(ScalaTestRunner.java:43)
    at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:26)
Caused by: java.lang.NullPointerException
    at org.apache.spark.sql.connector.catalog.DelegatingCatalogExtension.name(DelegatingCatalogExtension.java:50)
    at org.apache.spark.sql.connector.catalog.CatalogV2Util$.isSessionCatalog(CatalogV2Util.scala:374)
    at org.apache.spark.sql.catalyst.analysis.ResolveSessionCatalog$DatabaseNameInSessionCatalog$.unapply(ResolveSessionCatalog.scala:628)
    at org.apache.spark.sql.catalyst.analysis.ResolveSessionCatalog$$anonfun$apply$1.applyOrElse(ResolveSessionCatalog.scala:231)
    at org.apache.spark.sql.catalyst.analysis.ResolveSessionCatalog$$anonfun$apply$1.applyOrElse(ResolveSessionCatalog.scala:52)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$3(AnalysisHelper.scala:138)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$1(AnalysisHelper.scala:138)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning(AnalysisHelper.scala:134)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning$(AnalysisHelper.scala:130)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUpWithPruning(LogicalPlan.scala:32)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUp(AnalysisHelper.scala:111)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUp$(AnalysisHelper.scala:110)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUp(LogicalPlan.scala:32)
    at org.apache.spark.sql.catalyst.analysis.ResolveSessionCatalog.apply(ResolveSessionCatalog.scala:52)
    at org.apache.spark.sql.catalyst.analysis.ResolveSessionCatalog.apply(ResolveSessionCatalog.scala:46)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:222)
    at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
    at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
    at scala.collection.immutable.List.foldLeft(List.scala:91)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:219)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:211)
    at scala.collection.immutable.List.foreach(List.scala:431)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:211)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:226)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$execute$1(Analyzer.scala:222)
    at org.apache.spark.sql.catalyst.analysis.AnalysisContext$.withNewAnalysisContext(Analyzer.scala:173)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:222)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:188)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:182)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:89)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:182)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:209)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:208)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:77)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:138)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:219)
    at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
    ... 65 more

Expected results

I would expect this to create a schema here in the catalog catalog_b and allow me to save data to it.

Further details

This is an effort to create a local "delta lake" for testing which can be compatible with Databrick's three layer namepace used by their Unity Catalog.

Environment information

Willingness to contribute

The Delta Lake Community encourages bug fix contributions. Would you or another member of your organization be willing to contribute a fix for this bug to the Delta Lake code base?

jelmerk commented 8 months ago

+1 this makes it hard to create tests for code that uses multiple catalogs in production

ezu-mutt commented 8 months ago

+1

boittega commented 6 months ago

+1 Any workaround to be able to test 3-layer-namespace?

chriotte commented 6 months ago

Hi! We're encountering the same problem. Have you made any progress or found a workaround to resolve this issue?

jackedmundson commented 6 months ago

Any update or does anyone have a workaround for this ?

meharanjan318 commented 5 months ago

+1

rosspalmer commented 5 months ago

Just confirming I am still being blocked by this. We have a "workaround" where we squish the catalog and database names together when running locally but its not pretty...

flaviokr commented 5 months ago

I am also facing this issue

faltesekATG commented 5 months ago

I'd also like to see this address for unit tests

scottsand-db commented 4 months ago

I've confirmed that (1) using Delta catalog and an Iceberg catalog works ✅ (2) using two Iceberg catalogs works ✅ (3) using two Delta catalogs fails ❌

WIP investigating why

carloshhelt commented 3 months ago

+1 i'm facing the same problem while using 2 delta catalogs

keanuxr commented 3 months ago

+1

daniel-vizcaino commented 3 months ago

+1

denisoliveirac commented 3 months ago

+1

abutala commented 3 months ago

+1

wysisoft commented 3 months ago

is there any feedback on this? seems like iceberg is the only way to do this?

johnh1989 commented 2 months ago

+1

nemenko commented 1 month ago

+1 in pyspark have the same issue in test below and at the end it will thrown with Parsing exception like

[PARSE_SYNTAX_ERROR] Syntax error at or near '.'.(line 1, pos 20)

== SQL == spark_catalog.source.source_table_join --------------------^^^ and the same does not work for DeltaTable.createOrReplace if I use full qualified name catalog.schema.table `

@pytest.fixture(scope="session")
def spark_session():
    shutil.rmtree("spark-warehouse", ignore_errors=True)

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
builder = (SparkSession.builder
         .master("local[*]")
         .config("spark.jars.packages", "io.delta:delta-core_2.12:2.3.0")
         .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
         .config("spark.databricks.delta.schema.autoMerge.enabled", "true") 
         .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
         .appName("test")
)
yield configure_spark_with_delta_pip(builder).getOrCreate()

shutil.rmtree("spark-warehouse", ignore_errors=True)

  @pytest.mark.usefixtures("spark_session")
      def test_join_operation_with_catalog(self, spark_session: SparkSession):
          source_schema = StructType([
              StructField("id", StringType(), True),
              StructField("derived_column", StringType(), True),
              StructField("filter_column", StringType(), True)
          ])

    spark_session.sql("CREATE SCHEMA source")
    spark_session.sql("DROP TABLE IF EXISTS spark_catalog.source.source_table_join")
    spark_session.catalog.setCurrentCatalog("spark_catalog")
    spark_session.catalog.setCurrentDatabase("source")

    DeltaTable.createOrReplace(spark_session).tableName("source_table_join").addColumns(
                source_schema).execute()

    try:
        print(DeltaTable.forName(spark_session, "source.source_table_join").toDF().collect())
        print('SUCCESS')
    except Exception as err:
        print("FAILURE")
        print(err)`
mahic commented 1 month ago

@scottsand-db I've tested this with Spark 4.0-preview2 and Delta Lake 4.0 preview, same issue. This should be fixed before shipping Delta for 4.0 at least.