ibis-project / ibis

the portable Python dataframe library
https://ibis-project.org
Apache License 2.0
4.85k stars 579 forks source link

bug: unable to set database for table registered in databricks unity catalog using shared cluster #9604

Closed jstammers closed 1 month ago

jstammers commented 1 month ago

What happened?

I am unable to read from a table in a databricks unity catalog when using a compute cluster with a shared access mode When I try to load the table using

spark = ...

import ibis
con = ibis.pyspark.connect(spark)

table = con.table("my_table", database=("catalog", "schema"))

I receive the following error

py4j.security.Py4JSecurityException: Method public void org.apache.spark.sql.internal.CatalogImpl.setCurrentCatalog(java.lang.String) is not whitelisted on class class org.apache.spark.sql.internal.CatalogImpl

From a quick google search, I think this is related to the fact that certain spark functions are not enabled on clusters with a shared access mode (e.g. here).

If I manually set the catalog and schema using SQL statements

USE CATALOG catalog;
USE SCHEMA schema;

I am then able to load the table using

table = con.table("my_table")

For further context, I have not encountered these issues when using a personal compute cluster that has a single-user access mode

What version of ibis are you using?

9.1.0

What backend(s) are you using, if any?

pyspark - 3.4.1

Relevant log output

File /local_disk0/.ephemeral_nfs/envs/pythonEnv-6c76b61f-037f-47b2-b51d-bcb3b5d19e64/lib/python3.10/site-packages/ibis/backends/pyspark/__init__.py:237, in Backend._active_catalog_database(self, catalog, db)
    236 if catalog is not None:
--> 237     self._session.catalog.setCurrentCatalog(catalog)
    238 self._session.catalog.setCurrentDatabase(db)

File /databricks/spark/python/pyspark/instrumentation_utils.py:48, in _wrap_function.<locals>.wrapper(*args, **kwargs)
     47 try:
---> 48     res = func(*args, **kwargs)
     49     logger.log_success(
     50         module_name, class_name, function_name, time.perf_counter() - start, signature
     51     )

File /databricks/spark/python/pyspark/sql/catalog.py:122, in Catalog.setCurrentCatalog(self, catalogName)
    109 """Sets the current default catalog in this session.
    110 
    111 .. versionadded:: 3.4.0
   (...)
    120 >>> spark.catalog.setCurrentCatalog("spark_catalog")
    121 """
--> 122 return self._jcatalog.setCurrentCatalog(catalogName)

File /databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py:1355, in JavaMember.__call__(self, *args)
   1354 answer = self.gateway_client.send_command(command)
-> 1355 return_value = get_return_value(
   1356     answer, self.gateway_client, self.target_id, self.name)
   1358 for temp_arg in temp_args:

File /databricks/spark/python/pyspark/errors/exceptions/captured.py:188, in capture_sql_exception.<locals>.deco(*a, **kw)
    187 try:
--> 188     return f(*a, **kw)
    189 except Py4JJavaError as e:

File /databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py:330, in get_return_value(answer, gateway_client, target_id, name)
    329     else:
--> 330         raise Py4JError(
    331             "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
    332             format(target_id, ".", name, value))
    333 else:

Code of Conduct

gforsyth commented 1 month ago

Thanks for reporting @jstammers!

The fact that you can set a catalog and database (schema) via the SQL interface but not using the pyspark API is... frustrating. I can poke around at making those catalog and database calls via SQL instead.

Are you available to test out those changes if I push up a PR? It's hard for me to test because I don't have access to a cluster running in shared access mode.

jstammers commented 1 month ago

Sure, I'd be happy to test any changes if that would be helpful.

gforsyth commented 1 month ago

Great, thanks!

Just for the papertrail, we currently have context managers that make use of the pyspark API to set the catalog and database before performing any operations like creating or loading tables (where table location is obviously important).

This issue seems to indicate that there is a mismatch in permission structure for certain clusters for operations performed via the pyspark API vs via directly using Spark SQL.

My thinking here is to swap out the use of the pyspark API and instead set location hierarchy via spark SQL instead, e.g.

USE CATALOG catalog;
USE SCHEMA database;
jstammers commented 1 month ago

That makes sense to me. If we can leverage the spark SQL syntax, then that could be a good work-around. We might need to consider the fact that, as far as I'm aware, the spark SQL command sets the catalog and schema globally, which could result in unintended errors, e.g.

table = con.table("my_table") # assuming my_table exists in default catalog/schema
# assuming catalog.schema.my_table_2
new_table = con.table("my_table_2", database=("catalog", "schema"))
# error if catalog.schema.my_table doesn't exist or differs from default
table = con.table("my_table") 
gforsyth commented 1 month ago

Yep, definitely something to be wary of.

If we follow the current pattern, we would set the catalog and schema for the current query execution, then restore it to the previous value, so it should be relatively robust, but "should" is doing a lot of work there.