apache / iceberg

Apache Iceberg
https://iceberg.apache.org/
Apache License 2.0
6.49k stars 2.24k forks source link

How to reinitialize/refresh iceberg catalog object in spark catalog on an ongoing spark session #10227

Open vasudhamsr opened 7 months ago

vasudhamsr commented 7 months ago

Query engine

Spark

Question

We have a scenario where the catalog object keeps changing and the custom FileIO underneath as well. But the loadTable function (org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:185)) is picking up the older catalog impl class with stale FileIO(which doesn't hold access to new tables) and is throwing. From looks of it, it seems spark catalog is loading catalog impl class on initialization and even there are further changes, its not picking up. Is there a way to forceRefresh object of iceberg catalog in SparkCatalog? Thank you in advance!

jkolash commented 6 months ago

In integration tests I have done

REFRESH TABLE catalog.db.table

to force an immediate refresh.

github-actions[bot] commented 3 weeks ago

This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.

nerstak commented 2 weeks ago

Hello! With the following use case, it does not seems to be feasible. Is there an alternative?

scala> import org.apache.spark.sql.SparkSession
val sc = SparkSession.builder()
        .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
        .config(s"spark.sql.catalog.myCatalog", "org.apache.iceberg.spark.SparkCatalog")
        .config(s"spark.sql.catalog.myCatalog.warehouse", "s3a://path/to/warehouse")
        .config(s"spark.sql.defaultCatalog", "myCatalog")
        .config(s"spark.sql.catalog.myCatalog.type", "hive")
        .config(s"spark.sql.catalog.myCatalog.uri", "thrift://1.2.3.4:9083")
        .getOrCreate()

scala> sc.conf.isModifiable("spark.sql.catalog.myCatalog.uri")
res2: Boolean = false

scala> sc.sql("show schemas")
// Whatever results

scala> sc.conf.set(s"spark.sql.catalog.myCatalog.uri", "thrift://5.6.7.8:9083") 
// Changes from here are not applied to already initialized catalog "myCatalog". It will still use the previous conf

The REFRESH TABLE procedure does not seems to work either on my side (Spark 3.5, Iceberg 1.5).

Regards.