Qbeast-io / qbeast-spark

Qbeast-spark: DataSource enabling multi-dimensional indexing and efficient data sampling. Big Data, free from the unnecessary!
https://qbeast.io/qbeast-our-tech/
Apache License 2.0
213 stars 19 forks source link

Support SQL syntax delta.`path`and qbeast.`path`when using the QbeastCatalog. #412

Open cugni opened 1 month ago

cugni commented 1 month ago

What went wrong?

Clear, concise explanation and the expected behavior.

How to reproduce?

Different steps about how to reproduce the problem.

1. Code that triggered the bug, or steps to reproduce:

$SPARK_HOME/bin/spark-shell \
--packages io.qbeast:qbeast-spark_2.12:0.7.0,io.delta:delta-spark_2.12:3.1.0 \
--conf spark.sql.extensions=io.qbeast.spark.internal.QbeastSparkSessionExtension \
--conf spark.sql.catalog.spark_catalog=io.qbeast.spark.internal.sources.catalog.QbeastCatalog

spark.range(10).write.format("delta").save("/tmp/test1")
spark.sql("SELECT * FROM delta.`/tmp/test1`")
// org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException: [SCHEMA_NOT_FOUND] The schema `delta` cannot be found. Verify the spelling and correctness of the schema and catalog.

 // I get the same results If I create the in Qbeast
 spark.range(10).write.format("qbeast").option("columnsToIndex","id").save("/tmp/test2")
 spark.sql("SELECT * FROM qbeast.`/tmp/test2`")
 // org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException: [SCHEMA_NOT_FOUND] The schema `delta` cannot be found. Verify the spelling and correctness of the schema and catalog.

On the other hand, if we use the delta catalog

$SPARK_HOME/bin/spark-shell \
--packages io.qbeast:qbeast-spark_2.12:0.7.0,io.delta:delta-spark_2.12:3.1.0 \
--conf spark.sql.extensions=io.qbeast.spark.internal.QbeastSparkSessionExtension \
--conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog

spark.range(10).write.format("delta").save("/tmp/test3")
spark.sql("SELECT * FROM delta.`/tmp/test3`")
// This works 
 // I get the same results If I create the in Qbeast
 spark.range(10).write.format("qbeast").option("columnsToIndex","id").save("/tmp/test4")
 spark.sql("SELECT * FROM qbeast.`/tmp/test4`")
// 24/09/13 11:41:59 WARN ObjectStore: Failed to get database qbeast, returning NoSuchObjectException
org.apache.spark.sql.AnalysisException: [UNSUPPORTED_DATASOURCE_FOR_DIRECT_QUERY] Unsupported data source type for direct query on files: qbeast; line 1 pos 14
// but it works reading as Delta. 
park.sql("SELECT * FROM delta.`/tmp/test4`")

2. Branch and commit id:

0.7.0

3. Spark version:

3.5.0

4. Hadoop version:

3.3.4

5. How are you running Spark?

local computer

6. Stack trace:

Trace of the log/error messages when using QbeastCatalog

24/09/13 11:34:12 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
24/09/13 11:34:12 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist
24/09/13 11:34:13 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 2.3.0
24/09/13 11:34:13 WARN ObjectStore: setMetaStoreSchemaVersion called but recording version is disabled: version = 2.3.0, comment = Set by MetaStore cesare@127.0.0.1
24/09/13 11:34:14 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
24/09/13 11:34:14 WARN ObjectStore: Failed to get database delta, returning NoSuchObjectException
24/09/13 11:34:14 WARN ObjectStore: Failed to get database delta, returning NoSuchObjectException
org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException: [SCHEMA_NOT_FOUND] The schema `delta` cannot be found. Verify the spelling and correctness of the schema and catalog.
If you did not qualify the name with a catalog, verify the current_schema() output, or qualify the name with the correct catalog.
To tolerate the error on drop use DROP SCHEMA IF EXISTS.
  at org.apache.spark.sql.catalyst.catalog.SessionCatalog.requireDbExists(SessionCatalog.scala:250)
  at org.apache.spark.sql.catalyst.catalog.SessionCatalog.getTableRawMetadata(SessionCatalog.scala:540)
  at org.apache.spark.sql.catalyst.catalog.SessionCatalog.getTableMetadata(SessionCatalog.scala:526)
  at io.qbeast.spark.internal.sources.v2.QbeastTableImpl.table$lzycompute(QbeastTableImpl.scala:82)
  at io.qbeast.spark.internal.sources.v2.QbeastTableImpl.table(QbeastTableImpl.scala:77)
  at io.qbeast.spark.internal.sources.v2.QbeastTableImpl.schema(QbeastTableImpl.scala:88)
  at org.apache.spark.sql.connector.catalog.Table.columns(Table.java:65)
  at io.qbeast.spark.internal.sources.v2.QbeastTableImpl.columns(QbeastTableImpl.scala:56)

  Trace of the log/error messages when using DeltaCatalog

  24/09/13 11:41:59 WARN ObjectStore: Failed to get database qbeast, returning NoSuchObjectException
org.apache.spark.sql.AnalysisException: [UNSUPPORTED_DATASOURCE_FOR_DIRECT_QUERY] Unsupported data source type for direct query on files: qbeast; line 1 pos 14
  at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:52)
  at org.apache.spark.sql.execution.datasources.ResolveSQLOnFile.org$apache$spark$sql$execution$datasources$ResolveSQLOnFile$$resolveDataSource(rules.scala:58)
  at org.apache.spark.sql.execution.datasources.ResolveSQLOnFile$$anonfun$apply$1.applyOrElse(rules.scala:78)
  at org.apache.spark.sql.execution.datasources.ResolveSQLOnFile$$anonfun$apply$1.applyOrElse(rules.scala:63)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$2(AnalysisHelper.scala:170)
  at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$1(AnalysisHelper.scala:170)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning(AnalysisHelper.scala:168)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning$(AnalysisHelper.scala:164)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsDownWithPruning(LogicalPlan.scala:32)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$4(AnalysisHelper.scala:175)
  at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1215)
  at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1214)
  at org.apache.spark.sql.catalyst.plans.logical.Project.mapChildren(basicLogicalOperators.scala:71)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$1(AnalysisHelper.scala:175)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning(AnalysisHelper.scala:168)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning$(AnalysisHelper.scala:164)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsDownWithPruning(LogicalPlan.scala:32)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsWithPruning(AnalysisHelper.scala:99)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsWithPruning$(AnalysisHelper.scala:96)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsWithPruning(LogicalPlan.scala:32)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperators(AnalysisHelper.scala:76)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperators$(AnalysisHelper.scala:75)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:32)
  at org.apache.spark.sql.execution.datasources.ResolveSQLOnFile.apply(rules.scala:63)
  at org.apache.spark.sql.execution.datasources.ResolveSQLOnFile.apply(rules.scala:43)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:222)
  at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
  at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
  at scala.collection.immutable.List.foldLeft(List.scala:91)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:219)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:211)
  at scala.collection.immutable.List.foreach(List.scala:431)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:211)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:226)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$execute$1(Analyzer.scala:222)
  at org.apache.spark.sql.catalyst.analysis.AnalysisContext$.withNewAnalysisContext(Analyzer.scala:173)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:222)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:188)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:182)
  at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:89)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:182)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:209)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:208)
  at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:77)
  at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:138)
  at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:219)
  at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
  at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:219)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
  at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:218)
  at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:77)
  at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:74)
  at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:66)
  at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:99)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
  at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
  at org.apache.spark.sql.SparkSession.$anonfun$sql$4(SparkSession.scala:691)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
  at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:682)
  at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:713)
  at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:744)
  ... 47 elided
osopardo1 commented 1 month ago

In the second example, the QbeastCatalog is missing as a secondary catalog, and I think that's the problem when trying to recognize the format.

In the first one, I will look into the QbeastCatalog.loadTable method. Seems that is not parsing the TableIdentifier properly when a path is specified.

  override def loadTable(ident: Identifier): Table = {
    try {
      getSessionCatalog().loadTable(ident) match {
        case table
            if QbeastCatalogUtils.isQbeastProvider(table.properties().asScala.get("provider")) =>
          QbeastCatalogUtils.loadQbeastTable(table, tableFactory)
        case o => o
      }
    } catch {
      case _: NoSuchDatabaseException | _: NoSuchNamespaceException | _: NoSuchTableException
          if QbeastCatalogUtils.isPathTable(ident) =>
        QbeastTableImpl(
          TableIdentifier(ident.name(), ident.namespace().headOption),
          new Path(ident.name()),
          Map.empty,
          tableFactory = tableFactory)
    }
  }