apache / iceberg

Apache Iceberg
https://iceberg.apache.org/
Apache License 2.0
6.23k stars 2.17k forks source link

SparkSessionCatalog with JDBC catalog: SHOW TABLES IN ... returns error but table exists in JDBC catalog #10003

Open matepek opened 6 months ago

matepek commented 6 months ago

Apache Iceberg version

1.5.0 (latest release)

Query engine

Spark

Please describe the bug 🐞

The issue

Issue with SparkSessionCatalog when using it with a non-Hive catalog

Repro

My expectations are based on the documentation:

org.apache.iceberg.spark.SparkSessionCatalog adds support for Iceberg tables to Spark's built-in catalog, and delegates to the built-in catalog for non-Iceberg tables

Note that spark_catalog is a org.apache.iceberg.spark.SparkSessionCatalog

Test Set 1:

NOTE: spark.sql.defaultCatalog = spark_catalog

CREATE
OR replace TABLE test_schema.iceberg_table USING iceberg partitioned BY (DAY (ts), truncate (2, id)) AS
SELECT
     1 AS id,
     TIMESTAMP('2000-01-01 01') AS ts,
     'a' AS col1
show tables in test_schema

Expected: the table Actual: ❌ org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException: [SCHEMA_NOT_FOUND] The schema 'test_schema' cannot be found. Verify the spelling and correctness of the schema and catalog.

show tables in spark_catalog.test_schema

Expected: the table Actual: ❌ org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException: [SCHEMA_NOT_FOUND] The schema 'test_schema' cannot be found. Verify the spelling and correctness of the schema and catalog.

show tables in iceberg_catalog.test_schema

Expected: the table Actual: ❌ org.apache.iceberg.exceptions.NoSuchNamespaceException: Namespace does not exist: test_schema

select * from test_schema.iceberg_table 

Expected: the table content Actual: ✅ the table content

select * from spark_catalog.test_schema.iceberg_table 

Expected: the table content Actual: ✅ the table content

select * from iceberg_catalog.test_schema.iceberg_table 

Expected: the table content Actual: ❌ org.apache.hive.service.cli.HiveSQLException: Error running query: [TABLE_OR_VIEW_NOT_FOUND]

Test Set 2:

I've tried setting the default catalog: spark.sql.defaultCatalog = iceberg_catalog

CREATE
OR replace TABLE test_schema.iceberg_table USING iceberg partitioned BY (DAY (ts), truncate (2, id)) AS
SELECT
     1 AS id,
     TIMESTAMP('2000-01-01 01') AS ts,
     'a' AS col1

The result:

show tables in test_schema

Expected: the table Actual: ✅ the table

show tables in spark_catalog.test_schema

Expected: the table Actual: ❌ org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException: [SCHEMA_NOT_FOUND] The schema 'test_schema' cannot be found. Verify the spelling and correctness of the schema and catalog.

show tables in iceberg_catalog.test_schema

Expected: the table Actual: ✅ the table

select * from test_schema.iceberg_table 

Expected: the table content Actual: ✅ the table content

select * from spark_catalog.test_schema.iceberg_table 

Expected: the table content Actual: ❌ org.apache.spark.sql.AnalysisException: [TABLE_OR_VIEW_NOT_FOUND] The table or view 'spark_catalog'.'test_schema'.'iceberg_table' cannot be found.

select * from iceberg_catalog.test_schema.iceberg_table 

Expected: the table content Actual: ✅ the table content

The Setup:

iceberg : 1.5.0 Java Version : 11.0.22 (Eclipse Adoptium) Scala Version : version 2.12.18 (offical image with little tweaks)

Config value
spark.sql.catalog.iceberg_catalog org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.iceberg_catalog.io-impl org.apache.iceberg.gcp.gcs.GCSFileIO
spark.sql.catalog.iceberg_catalog.jdbc.password *****(redacted)
spark.sql.catalog.iceberg_catalog.jdbc.schema-version V1
spark.sql.catalog.iceberg_catalog.jdbc.useSSL false
spark.sql.catalog.iceberg_catalog.jdbc.user *****(redacted)
spark.sql.catalog.iceberg_catalog.jdbc.verifyServerCertificate false
spark.sql.catalog.iceberg_catalog.table-default.write.metadata.delete-after-commit.enabled true
spark.sql.catalog.iceberg_catalog.table-default.write.metadata.previous-versions-max 50
spark.sql.catalog.iceberg_catalog.type jdbc
spark.sql.catalog.iceberg_catalog.uri jdbc:postgresql://localhost:5432/iceberg-catalog
spark.sql.catalog.iceberg_catalog.warehouse gs://spark_warehouse_staging
spark.sql.catalog.spark_catalog org.apache.iceberg.spark.SparkSessionCatalog
spark.sql.catalog.spark_catalog.io-impl org.apache.iceberg.gcp.gcs.GCSFileIO
spark.sql.catalog.spark_catalog.jdbc.password *****(redacted)
spark.sql.catalog.spark_catalog.jdbc.schema-version V1
spark.sql.catalog.spark_catalog.jdbc.useSSL false
spark.sql.catalog.spark_catalog.jdbc.user *****(redacted)
spark.sql.catalog.spark_catalog.jdbc.verifyServerCertificate false
spark.sql.catalog.spark_catalog.type jdbc
spark.sql.catalog.spark_catalog.uri jdbc:postgresql://localhost:5432/iceberg-catalog
spark.sql.catalog.spark_catalog.warehouse gs://spark_warehouse_staging
spark.sql.catalogImplementation hive
spark.sql.defaultCatalog spark_catalog
spark.sql.extensions org.apache.sedona.sql.SedonaSqlExtensions,org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
nastra commented 6 months ago

It's great that 1.5 supports views. We have been waiting for this for long. But it bleed from some issues.

There isn't anything view-related in your steps above. It seems to be just comparing the table behavior for SparkCatalog vs SparkSessionCatalog

nastra commented 6 months ago

I think the problem is actually the expectation of the SparkSessionCatalog behavior and that the documentation doesn't do a good job of describing what it's purpose is (I've stumbled into this as well).

Most of the time you'd want to use SparkCatalog. The SparkSessionCatalog currently doesn't work well if the underlying catalog is a non-hive catalog (and you're using the JDBC catalog in your example). The namespace behavior in SparkSessionCatalog also has issues currently, especially if using a non-hive catalog that requires namespaces to be created before creating a table in that namespace). If you look at the implementation of SparkSessionCatalog around how it deals with namespaces and tables, it actually delegates to Spark's session catalog and that is most likely the underlying issue here.

That being said, this issue should be updated to reflect that this is an issue with SparkSessionCatalog when using it with a non-Hive catalog and we should fix the behavior.

@matepek would you be interested in contributing/working on this issue to fix the SparkSessionCatalog behavior with non-hive catalogs?

matepek commented 6 months ago

What do you mean by that I'm using JDBC catalog? I thought spark.sql.catalogImplementation = hive sets it to hive catalog.

(I know I have a knowledge gap and I'm trying to catch up so I appreciate if you correct me and explain.)

My understanding of spark catalogs that there is always a spark_catalog which is a hive catalog because of the spark.sql.catalogImplementation = hive.

Also we created an iceberg_catalog which uses org.apache.iceberg.spark.SparkCatalog which was good to manage iceberg tables until v1.5, now views too.

So before v1.5 we needed to store the views and "non-managed tables" in hive catalog and work together with iceberg (managed) tables. For that we wrapped and set spark_catalog using org.apache.iceberg.spark.SparkSessionCatalog which meant to delegate functionalities between hive and iceberg catalogs. That worked okay, Actually we needed some customisation because SparkSessionCatalog was unable to properly list items from both catalogs so whenever we needed this functionality we list the items of the two catalogs and concatenated the results. So actually it was not working properly even before. It was a necessity to work work views and tables and "non-managed tables".

Since v1.5 the listing seems even less reliable (see this issue). But as we are talking more about it I started to think that I might don't need SparkSessionCatalog anymore since views are managed entities now by org.apache.iceberg.spark.SparkCatalog. I can just use use iceberg_catalog by default and whenever there is a rare need for "non-managed" table I can just specify the catalog like spark_catalog.schema_for_non_managed_tables.table_name. And I'm good.

So now I'm gonna try removing the definition for spark_catalog and I hope that it will make this work. BRB.

REMARKS:

by ""non-managed table" I mean something which his not managed by iceberg which is regular hive table. Ex.:

    create table schema_name.external_table (
            id LONG,
            dt DATE
    )
    partitioned by (dt)
    stored as PARQUET
    location 'gs://bucket/folder/'
matepek commented 6 months ago

I see what you meant now..

spark.sql.catalog.spark_catalog.type was configured to jdbc which was actually a mistake of mine.

But not defining the spark_catalog is an interesting option now that we have views in iceberg so I'm experimenting.

Btw I saw you worked on project Nessie. It's quite a cool one. I played around it a bit but couldn't make it work properly with DBT without a bunch of macro overrides which I would rather not to do.

nastra commented 6 months ago

spark.sql.catalog.spark_catalog.type was configured to jdbc which was actually a mistake of mine.

Yes exactly that's what I meant. That would use the JDBC catalog underneath, which is what causes this confusing behavior you're seeing.

Also, there's currently no Iceberg view support for SparkSessionCatalog available as that would be introduced by https://github.com/apache/iceberg/issues/9845.

If you want to use Iceberg views, then you should be fine using SparkCatalog and specifying the type to be any of jdbc / rest / nessie, as those are the only Iceberg catalogs right now that support Iceberg views.

matepek commented 6 months ago

Okay, for DBT sadly I need the SparkSessionCatalog as I suspected before. Tried almost everything, it's a pain otherwise. We had been using a rest catalog so I'm surprised we didn't have much issue with 1.4.3.

I have been checking the code and I found these lines:

String provider = properties.get("provider");

Interestingly the describe table extended ... returns with lines and one if them is Provider = iceberg. Note the capital letter. I don't think this is an issue just unexpected at first sight.

matepek commented 6 months ago

I'm working on something like this.

It would fixes this too after more adjustments like listNamespaces.

WDYT?

matepek commented 6 months ago

Not strictly related but I'm kinda stuck with this: Using SparkSessionCatalog with NessieCatalog I cannot create iceberg table:

create or replace table my_schema.table
    using iceberg
      as
SELECT something

results in

  org.apache.hive.service.cli.HiveSQLException: Error running query: org.apache.iceberg.exceptions.NoSuchNamespaceException: Namespace does not exist: my_schema
    at org.apache.spark.sql.hive.thriftserver.HiveThriftServerErrors$.runningQueryError(HiveThriftServerErrors.scala:46)
    at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.org$apache$spark$sql$hive$thriftserver$SparkExecuteStatementOperation$$execute(SparkExecuteStatementOperation.scala:262)
    at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2$$anon$3.$anonfun$run$2(SparkExecuteStatementOperation.scala:166)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.hive.thriftserver.SparkOperation.withLocalProperties(SparkOperation.scala:79)
    at org.apache.spark.sql.hive.thriftserver.SparkOperation.withLocalProperties$(SparkOperation.scala:63)
    at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.withLocalProperties(SparkExecuteStatementOperation.scala:41)
    at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2$$anon$3.run(SparkExecuteStatementOperation.scala:166)
    at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2$$anon$3.run(SparkExecuteStatementOperation.scala:161)
    at java.base/java.security.AccessController.doPrivileged(Native Method)
    at java.base/javax.security.auth.Subject.doAs(Unknown Source)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1878)
    at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2.run(SparkExecuteStatementOperation.scala:175)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
    at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.base/java.lang.Thread.run(Unknown Source)
  Caused by: org.apache.iceberg.exceptions.NoSuchNamespaceException: Namespace does not exist: my_schema
    at org.apache.iceberg.nessie.NessieUtil.maybeUseSpecializedException(NessieUtil.java:302)
    at org.apache.iceberg.nessie.NessieUtil.handleExceptionsForCommits(NessieUtil.java:224)
    at org.apache.iceberg.nessie.NessieTableOperations.doCommit(NessieTableOperations.java:125)
    at org.apache.iceberg.BaseMetastoreTableOperations.commit(BaseMetastoreTableOperations.java:126)
    at org.apache.iceberg.BaseTransaction.lambda$commitReplaceTransaction$1(BaseTransaction.java:381)
    at org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:413)
    at org.apache.iceberg.util.Tasks$Builder.runSingleThreaded(Tasks.java:219)
    at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:203)
    at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:196)
    at org.apache.iceberg.BaseTransaction.commitReplaceTransaction(BaseTransaction.java:365)
    at org.apache.iceberg.BaseTransaction.commitTransaction(BaseTransaction.java:314)
    at org.apache.iceberg.CommitCallbackTransaction.commitTransaction(CommitCallbackTransaction.java:126)
    at org.apache.iceberg.spark.source.StagedSparkTable.commitStagedChanges(StagedSparkTable.java:34)
    at org.apache.spark.sql.execution.datasources.v2.V2CreateTableAsSelectBaseExec.$anonfun$writeToTable$1(WriteToDataSourceV2Exec.scala:580)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1397)
    at org.apache.spark.sql.execution.datasources.v2.V2CreateTableAsSelectBaseExec.writeToTable(WriteToDataSourceV2Exec.scala:573)
    at org.apache.spark.sql.execution.datasources.v2.V2CreateTableAsSelectBaseExec.writeToTable$(WriteToDataSourceV2Exec.scala:567)
    at org.apache.spark.sql.execution.datasources.v2.AtomicReplaceTableAsSelectExec.writeToTable(WriteToDataSourceV2Exec.scala:183)
    at org.apache.spark.sql.execution.datasources.v2.AtomicReplaceTableAsSelectExec.run(WriteToDataSourceV2Exec.scala:216)
    at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
    at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
    at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
    at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
    at org.apache.spark.sql.Dataset.<init>(Dataset.scala:220)
    at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
    at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
    at org.apache.spark.sql.SparkSession.$anonfun$sql$4(SparkSession.scala:691)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
    at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:682)
    at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:713)
    at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:744)
    at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:651)
    at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.org$apache$spark$sql$hive$thriftserver$SparkExecuteStatementOperation$$execute(SparkExecuteStatementOperation.scala:227)
    ... 16 more
  Caused by: org.projectnessie.error.NessieReferenceConflictException: Namespace 'my_schema' must exist.
    at org.projectnessie.error.ErrorCode.lambda$asException$1(ErrorCode.java:66)
    at java.base/java.util.Optional.map(Unknown Source)
    at org.projectnessie.error.ErrorCode.asException(ErrorCode.java:66)
    at org.projectnessie.client.rest.ResponseCheckFilter.checkResponse(ResponseCheckFilter.java:58)
    at org.projectnessie.client.rest.NessieHttpResponseFilter.filter(NessieHttpResponseFilter.java:29)
    at org.projectnessie.client.http.impl.jdk11.JavaRequest.lambda$executeRequest$1(JavaRequest.java:143)
    at java.base/java.util.ArrayList.forEach(Unknown Source)
    at java.base/java.util.Collections$UnmodifiableCollection.forEach(Unknown Source)
    at org.projectnessie.client.http.impl.jdk11.JavaRequest.executeRequest(JavaRequest.java:143)
    at org.projectnessie.client.http.HttpRequest.post(HttpRequest.java:116)
    at org.projectnessie.client.rest.v1.RestV1TreeClient.commitMultipleOperations(RestV1TreeClient.java:204)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
    at java.base/java.lang.reflect.Method.invoke(Unknown Source)
    at org.projectnessie.client.rest.v1.RestV1Client$ExceptionRewriter.invoke(RestV1Client.java:84)
    at com.sun.proxy.$Proxy58.commitMultipleOperations(Unknown Source)
    at org.projectnessie.client.rest.v1.HttpCommitMultipleOperations.commit(HttpCommitMultipleOperations.java:34)
    at org.apache.iceberg.nessie.NessieIcebergClient.commitContent(NessieIcebergClient.java:695)
    at org.apache.iceberg.nessie.NessieIcebergClient.commitTable(NessieIcebergClient.java:627)
    at org.apache.iceberg.nessie.NessieTableOperations.doCommit(NessieTableOperations.java:120)
    ... 66 more

I don't see anywhere the SparkSessionCatalog in the stack. Any hint would be welcomed 🙏

(V2Source type is not handled by SparkSessionCatalog? If I remember correctly this functionality was working when RESTCatalog was used.)

nastra commented 6 months ago

@matepek fundamentally that issue is the same as I described in https://github.com/apache/iceberg/issues/10003#issuecomment-2007780751. SparkSessionCatalog doesn't create a namespace in the underlying catalog (NessieCatalog in your case), and so this fails with what you're seeing during table creation

ajantha-bhat commented 6 months ago

Namespace has to be created explicitly in Nessie as described in https://projectnessie.org/blog/namespace-enforcement/

matepek commented 5 months ago

Thank you for the answers.

fundamentally that issue is the same

Yes I think I understand that. What I'm surprised that why for that table creation call stack I don't se the SparkSessionCatalog anywhere. I mean How could the SSC handle this case when it does not have control over it. I'm missing an important piece of the puzzle here, I feel.

Namespace has to be created explicitly in Nessie as described

Could the SparkSessionCatalog create it on demand? If not then I don't see how it could be used with nessie properly.

sivakanthavel-tigeranalytics commented 4 months ago

Hello @matepek @ajantha-bhat ,

Need some suggestions !

I am using org.apache.iceberg.spark.SparkSessionCatalog instead of SparkCatalog. I am able to create both Iceberg and non-Iceberg tables on the Glue catalog. However, when I try to execute SHOW TABLE EXTENDED db_name LIKE (table_name) on an Iceberg table, it throws this error: org.apache.hadoop.hive.ql.metadata.HiveException: Unable to fetch table. StorageDescriptor#InputFormat cannot be null for table:

I am using dbt Spark to load Iceberg tables on the Glue catalog. I don't have control over the SQL command which dbt generates to check table existence. Because of the error I am getting, I am unable to proceed further.

Any help on this issue would be helpful.

spark_default.conf settings spark.sql.extensions org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions spark.sql.catalog.spark_catalog org.apache.iceberg.spark.SparkSessionCatalog spark.sql.catalog.spark_catalog.catalog-impl org.apache.iceberg.aws.glue.GlueCatalog spark.sql.catalog.spark_catalog.warehouse s3:bucket_name