Azure / spark-cdm-connector

MIT License
75 stars 32 forks source link

[Issue] Cannot read data when using Databricks Unity Catalog #149

Open JulesHuisman opened 1 year ago

JulesHuisman commented 1 year ago

Did you read the pinned issues and search the error message?

Yes, but I didn't find the answer.

Summary of issue

When we migrated to a Unity Catalog Databricks workspace we noticed the package stopped working. It started throwing the error:

spark_catalog requires a single-part namespace, but got [xyz.dfs.core.windows.net, /xyz, model.json].

This is probably because the data reading works different with Unity. It still works if we use a cluster without Unity.

Have others run into the same issue? And can we help to resolve the issue?

Error stack trace

Exception: Stream account failed with exception: Traceback (most recent call last):
  File "", line 7, in ingest_stream
    stream.ingest()
  File "", line 7, in wrapper
    df: DataFrame = func(*args, **kwargs)
  File "", line 286, in ingest
    df = self._load()
  File "", line 12, in _load
    return (spark
  File "/databricks/spark/python/pyspark/instrumentation_utils.py", line 48, in wrapper
    res = func(*args, **kwargs)
  File "/databricks/spark/python/pyspark/sql/readwriter.py", line 309, in load
    return self._df(self._jreader.load())
  File "/databricks/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1321, in __call__
    return_value = get_return_value(
  File "/databricks/spark/python/pyspark/errors/exceptions.py", line 234, in deco
    raise converted from None
pyspark.errors.exceptions.AnalysisException: spark_catalog requires a single-part namespace, but got [[REDACTED].dfs.core.windows.net, /[REDACTED], model.json].

Platform name

Databricks

Spark version

3.3.2

CDM jar version

spark3.1-1.19.3

What is the format of the data you are trying to read/write?

.csv

kecheung commented 1 year ago

Spark version 3.3.2 CDM jar version spark3.1-1.19.3

@JulesHuisman There is a difference in Spark version between your platform and connector. You should first fix that.

JulesHuisman commented 1 year ago

Spark version 3.3.2 CDM jar version spark3.1-1.19.3

@JulesHuisman There is a difference in Spark version between your platform and connector. You should first fix that.

Thanks for the tip! I do get a different issue now, which might bring me closer.

It now throws:

java.lang.ClassCastException: com.databricks.sql.managedcatalog.UnityCatalogV2Proxy cannot be cast to com.microsoft.cdm.CDMCatalog

Any idea what this might indicate?

kecheung commented 1 year ago

This error is specific to Spark 3.3 and required some changes to the connector (which was done in the CDM Spark 3.3 connector) to make it work. spark_catalog requires a single-part namespace, but got [xyz.dfs.core.windows.net, /xyz, model.json].

I missed a few lines in your post mentioning "It still works if we use a cluster without Unity". This was only developed for a cluster scenario. Without seeing the full stacktrace, I am guessing the error is due to the Databricks Unity Catalog platform you are using.

bradleyjamrozik commented 1 year ago

Are there any workarounds in the meantime? Specifically for the

java.lang.ClassCastException: com.databricks.sql.managedcatalog.UnityCatalogV2Proxy cannot be cast to com.microsoft.cdm.CDMCatalog

error

kecheung commented 1 year ago

@bradleyjamrozik Please give the full error stack trace for:

java.lang.ClassCastException: com.databricks.sql.managedcatalog.UnityCatalogV2Proxy cannot be cast to com.microsoft.cdm.CDMCatalog

We don't have any workarounds. As mentioned in #118, the original design of this is with Synapse Spark, which follows the open source code. I am suspecting the above error is originating from the platform you are using.

bradleyjamrozik commented 1 year ago

Py4JJavaError                             Traceback (most recent call last)
File <command-227313506375295>:8
      1 (spark.read.format("com.microsoft.cdm")
      2     .option("storage", "[REDACTED].dfs.core.windows.net")
      3     .option("manifestpath", "dynamics365-financeandoperations/[REDACTED].operations.dynamics.com/Tables/Common/Customer/Main/Main.manifest.cdm.json")
      4     .option("entity", "CustTable")
      5     .option("tenantid", tenantid)
      6     .option("appid", clientid)
      7     .option("appkey", clientsecret)
----> 8     .load()
      9 )

File /databricks/spark/python/pyspark/instrumentation_utils.py:48, in _wrap_function.<locals>.wrapper(*args, **kwargs)
     46 start = time.perf_counter()
     47 try:
---> 48     res = func(*args, **kwargs)
     49     logger.log_success(
     50         module_name, class_name, function_name, time.perf_counter() - start, signature
     51     )
     52     return res

File /databricks/spark/python/pyspark/sql/readwriter.py:314, in DataFrameReader.load(self, path, format, schema, **options)
    312     return self._df(self._jreader.load(self._spark._sc._jvm.PythonUtils.toSeq(path)))
    313 else:
--> 314     return self._df(self._jreader.load())

File /databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py:1322, in JavaMember.__call__(self, *args)
   1316 command = proto.CALL_COMMAND_NAME +\
   1317     self.command_header +\
   1318     args_command +\
   1319     proto.END_COMMAND_PART
   1321 answer = self.gateway_client.send_command(command)
-> 1322 return_value = get_return_value(
   1323     answer, self.gateway_client, self.target_id, self.name)
   1325 for temp_arg in temp_args:
   1326     if hasattr(temp_arg, "_detach"):

File /databricks/spark/python/pyspark/errors/exceptions/captured.py:188, in capture_sql_exception.<locals>.deco(*a, **kw)
    186 def deco(*a: Any, **kw: Any) -> Any:
    187     try:
--> 188         return f(*a, **kw)
    189     except Py4JJavaError as e:
    190         converted = convert_exception(e.java_exception)

File /databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
    324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325 if answer[1] == REFERENCE_TYPE:
--> 326     raise Py4JJavaError(
    327         "An error occurred while calling {0}{1}{2}.\n".
    328         format(target_id, ".", name), value)
    329 else:
    330     raise Py4JError(
    331         "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
    332         format(target_id, ".", name, value))

Py4JJavaError: An error occurred while calling o532.load.
: java.lang.ClassCastException: com.databricks.sql.managedcatalog.UnityCatalogV2Proxy cannot be cast to com.microsoft.cdm.CDMCatalog
    at com.microsoft.cdm.DefaultSource.extractIdentifier(DefaultSource.scala:41)
    at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils$.loadV2Source(DataSourceV2Utils.scala:115)
    at org.apache.spark.sql.DataFrameReader.$anonfun$load$1(DataFrameReader.scala:333)
    at scala.Option.flatMap(Option.scala:271)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:331)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:226)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:397)
    at py4j.Gateway.invoke(Gateway.java:306)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:195)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:115)
    at java.lang.Thread.run(Thread.java:750)```
bradleyjamrozik commented 1 year ago

Makes sense, thank you

kecheung commented 1 year ago

@bradleyjamrozik In DefaultSource.scala, we explicitly define the "cdm" catalog and expect a CDMCatalog to be returned. Instead UnityCatalogV2Proxy is being returned instead.

It still works if we use a cluster without Unity.

This previous mention that it works leads me to believe the issue is due to how your platform is configured, which I do not have any insight. As mentioned in #118, this connector was originally built with Azure Synapse in mind.

JulesHuisman commented 1 year ago

@bradleyjamrozik In DefaultSource.scala, we explicitly define the "cdm" catalog and expect a CDMCatalog to be returned. Instead UnityCatalogV2Proxy is being returned instead.

It still works if we use a cluster without Unity.

This previous mention that it works leads me to believe the issue is due to how your platform is configured, which I do not have any insight. As mentioned in #118, this connector was originally built with Azure Synapse in mind.

Do you think this is a difficult fix, if we took the open source route?

BenSchaedle commented 10 months ago

Hi all,

any chance to get an update on this topic?

zpo commented 7 months ago

Same problem here. It would be awesome thing to make it run on Databrciks.

JulesHuisman commented 7 months ago

We built this package to make it work better with Databricks, it is not finished yet. But it might help: https://github.com/quantile-development/pyspark-cdm

carlo-quinonez commented 4 months ago

there's a PR with documentation changes that make this issue, and the work around, clear.