delta-io / delta

An open-source storage framework that enables building a Lakehouse architecture with compute engines including Spark, PrestoDB, Flink, Trino, and Hive and APIs
https://delta.io
Apache License 2.0
7.45k stars 1.67k forks source link

[BUG][SPARK] listTables() fails after createOrReplaceTempView('abc') called with PARSE_SYNTAX_ERROR #2610

Open richardcerny opened 7 months ago

richardcerny commented 7 months ago

Bug

Describe the problem

After upgrade from Spark spark_version 3.3.2 to 3.4.1 catalog.listTables command is always failing after the "createOrReplaceTempView" is called. See code snipped bellow.

Steps to reproduce

spark = (SparkSession
        .builder
        .appName("Python Spark SQL basic example")
        .config("spark.jars.packages", "io.delta:delta-core_2.12:2.4.0")
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
        .getOrCreate()
        )

SILVER_DB = 'silver_test'
spark.sql(f'CREATE DATABASE {SILVER_DB}')
view_name_fact = 'abc'

print(f"DBs before: {spark.catalog.listDatabases()}") # ok
print(f"Tables_before: {spark.catalog.listTables()}") # OK
print(f"Catalogs before: {spark.catalog.listCatalogs()}")  # ok
print(f'Current catalog before: {spark.catalog.currentCatalog()}')  # ok
print(f"Tables after silver: {spark.catalog.listTables(SILVER_DB)}") # ok

df_fact_fixture1 = spark.createDataFrame([Row('1', 'A', 'A', 100.0)])  # OK
df_fact_fixture1.createOrReplaceTempView(view_name_fact) # OK  ##### ONCE createOrReplaceTempView is called, afterward any command with spark.catalog.listTables() fails!!!!!!!!

spark.sql(f"select * from {view_name_fact}").show() # OK
df = spark.sql(f"select * from {view_name_fact}") # OK
assert 1 == df.count() # OK
print(f"DBs after: {spark.catalog.listDatabases()}")  # OK
print(f"Catalogs after: {spark.catalog.listCatalogs()}")   # OK
print(f'Current catalog after: {spark.catalog.currentCatalog()}')  # OK
print(f"Tables after: {spark.catalog.listTables()}") # ERROR
print(f"Tables after silver: {spark.catalog.listTables(SILVER_DB)}")  # ERROR

Observed results

   :: resolving dependencies :: org.apache.spark#spark-submit-parent-7b10b7e4-5468-4e77-acce-1eda288f12ba;1.0
        confs: [default]
        found io.delta#delta-core_2.12;2.4.0 in central
        found io.delta#delta-storage;2.4.0 in central
        found org.antlr#antlr4-runtime;4.9.3 in central
:: resolution report :: resolve 144ms :: artifacts dl 6ms
        :: modules in use:
        io.delta#delta-core_2.12;2.4.0 from central in [default]
        io.delta#delta-storage;2.4.0 from central in [default]
        org.antlr#antlr4-runtime;4.9.3 from central in [default]
        ---------------------------------------------------------------------
        |                  |            modules            ||   artifacts   |
        |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
        ---------------------------------------------------------------------
        |      default     |   3   |   0   |   0   |   0   ||   3   |   0   |
        ---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-7b10b7e4-5468-4e77-acce-1eda288f12ba
        confs: [default]
        0 artifacts copied, 3 already retrieved (0kB/4ms)
24/02/06 14:17:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
DBs before: [Database(name='default', catalog='spark_catalog', description='default database', locationUri='file:/workspaces/repository-pipeline/spark-warehouse'), Database(name='silver_test', catalog='spark_catalog', description='', locationUri='file:/workspaces/repository-pipeline/spark-warehouse/silver_test.db')]
Tables_before: []
Catalogs before: [CatalogMetadata(name='spark_catalog', description=None)]
Current catalog before: spark_catalog
Tables after silver: []
+---+---+---+-----+                                                             
| _1| _2| _3|   _4|
+---+---+---+-----+
|  1|  A|  A|100.0|
+---+---+---+-----+

DBs after: [Database(name='default', catalog='spark_catalog', description='default database', locationUri='file:/workspaces/repository-pipeline/spark-warehouse'), Database(name='silver_test', catalog='spark_catalog', description='', locationUri='file:/workspaces/repository-pipeline/spark-warehouse/silver_test.db')]
Catalogs after: [CatalogMetadata(name='spark_catalog', description=None)]
Current catalog after: spark_catalog

FAILED
>   print(f"Tables after: {spark.catalog.listTables()}")

libs/lakehouse/tests/test_z_catalog_2.py:133: 

/usr/local/lib/python3.10/site-packages/pyspark/sql/catalog.py:309: in listTables
    iter = self._jcatalog.listTables(dbName).toLocalIterator()
/usr/local/lib/python3.10/site-packages/py4j/java_gateway.py:1322: in __call__
    return_value = get_return_value(
a = ('xro77', <py4j.clientserver.JavaClient object at 0x7f98286213f0, 'o36', 'listTables'), kw = {}, converted = ParseException()

    def deco(*a: Any, **kw: Any) - Any:
        try:
            return f(*a, **kw)
        except Py4JJavaError as e:
            converted = convert_exception(e.java_exception)
            if not isinstance(converted, UnknownException):
                # Hide where the exception came from that shows a non-Pythonic
                # JVM exception message.
               raise converted from None
E               pyspark.errors.exceptions.captured.ParseException: 
E               [PARSE_SYNTAX_ERROR] Syntax error at or near end of input.(line 1, pos 0)
E               
E               == SQL ==
E               
E               ^^^

/usr/local/lib/python3.10/site-packages/pyspark/errors/exceptions/captured.py:175: ParseException

Expected results

Shows list of tables.

Further details

While removing following configuration from the spark session, the code works, but the catalog extension is necessary for other features.

        .config("spark.jars.packages", "io.delta:delta-core_2.12:2.4.0")
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

Environment information

Willingness to contribute

The Delta Lake Community encourages bug fix contributions. Would you or another member of your organization be willing to contribute a fix for this bug to the Delta Lake code base?

mdrakiburrahman commented 4 months ago

We're hitting this as well, @richardcerny were you able to get to a resolution?

mdrakiburrahman commented 4 months ago

Found a workaround:

Went from:

def listTables(databaseName: String): Array[String] = {
    if (databaseExists(databaseName)) {
      return spark.catalog.listTables(databaseName).collect().map(_.name)
    }
    Array.empty[String]
  }

To this:

def listTables(databaseName: String): Array[String] = {
    if (databaseExists(databaseName)) {

      // Delta 2.4.0 has a regression with Spark 3.4.1 that makes
      // spark.catalog.listTables calls fail
      //
      // >>> https://github.com/delta-io/delta/issues/2610
      //
      return spark
        .sql(s"SHOW TABLES IN $databaseName")
        .collect()
        .map(row => row.getAs[String]("tableName"))
    }
    Array.empty[String]
  }
richardcerny commented 4 months ago

thank you @mdrakiburrahman. We have used the same workaround.

felipepessoto commented 2 days ago

It seems the problem is this line, val isTemp = row.getBoolean(2): https://github.com/apache/spark/blob/1eb558c3a6fbdd59e5a305bc3ab12ce748f6511f/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala#L126

returns false when the catalog is set to DeltaCatalog

You can see it by starting a spark shell with/without Delta and run

spark.range(0,2).createOrReplaceTempView("abc")

val namespace = Seq("spark_catalog", "default")
val plan = org.apache.spark.sql.catalyst.plans.logical.ShowTables(org.apache.spark.sql.catalyst.analysis.UnresolvedNamespace(namespace), None)

    val tables = spark.sessionState.executePlan(plan).toRdd.collect().map { row =>
      val tableName = row.getString(1)
      println(tableName)
      val namespaceName = row.getString(0)
      println(namespaceName)
      val isTemp = row.getBoolean(2)
      println(isTemp)
      if (isTemp) {

        // Temp views do not belong to any catalog. We shouldn't prepend the catalog name here.
        // val ns = if (namespaceName.isEmpty) Nil else Seq(namespaceName)
        // makeTable(ns :+ tableName)
      } else {
        //val ns = parseIdent(namespaceName)
        val ns = spark.sessionState.sqlParser.parseMultipartIdentifier(namespaceName)
        //makeTable(catalog.name() +: ns :+ tableName)
      }
    }

@cloud-fan I have seen some contribs you did for Delta and Spark related to catalog. Any insights?

cloud-fan commented 1 day ago

@felipepessoto thanks for providing the repro! What was the error you hit? And can you also post the result of spark.sessionState.executePlan(plan).analyzed.treeString?

felipepessoto commented 1 day ago

@cloud-fan it is the same error that @richardcerny reported. In spark-shell, using my repro code:

org.apache.spark.sql.catalyst.parser.ParseException:
[PARSE_SYNTAX_ERROR] Syntax error at or near end of input.(line 1, pos 0)

== SQL ==

^^^

  at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:306)
  at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:144)
  at org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:52)
  at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parseMultipartIdentifier(ParseDriver.scala:67)
  at $anonfun$tables$1(<console>:37)
  at $anonfun$tables$1$adapted(<console>:23)
  at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
  at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
  at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
  at scala.collection.TraversableLike.map(TraversableLike.scala:286)
  at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
  at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
  ... 64 elided

Calling spark.catalog.listTables().show():

org.apache.spark.sql.catalyst.parser.ParseException:
[PARSE_SYNTAX_ERROR] Syntax error at or near end of input.(line 1, pos 0)

== SQL ==

^^^

  at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:306)
  at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:144)
  at org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:52)
  at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parseMultipartIdentifier(ParseDriver.scala:67)
  at org.apache.spark.sql.internal.CatalogImpl.parseIdent(CatalogImpl.scala:49)
  at org.apache.spark.sql.internal.CatalogImpl.$anonfun$listTables$1(CatalogImpl.scala:132)
  at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
  at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
  at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
  at scala.collection.TraversableLike.map(TraversableLike.scala:286)
  at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
  at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
  at org.apache.spark.sql.internal.CatalogImpl.listTables(CatalogImpl.scala:123)
  at org.apache.spark.sql.internal.CatalogImpl.listTables(CatalogImpl.scala:98)
  ... 47 elided

treeString:

scala> println(spark.sessionState.executePlan(plan).analyzed.treeString)
ShowTables [namespace#2, tableName#3, isTemporary#4]
+- ResolvedNamespace org.apache.spark.sql.delta.catalog.DeltaCatalog@32855523, [default]
cloud-fan commented 18 hours ago

one workaround is to set spark.sql.legacy.useV1Command to true. Ideally DeltaCatalog should not return views in listTables.