It kind of works when modifying the sqlmesh code. Communicated findings to the team let's see if they can add support for it.
@property
def patched_spark(self):
if not self._use_spark_session:
raise SQLMeshError(
"SparkSession is not available. "
"Either run from a Databricks Notebook or "
"install `databricks-connect` and configure it to connect to your Databricks cluster."
)
if self.is_spark_session_cursor:
return self._connection_pool.get().spark
from databricks.connect import DatabricksSession
if self._spark is None:
if "databricks_connect_server_hostname" in self._extra_config:
self._spark = (
DatabricksSession.builder.remote(
host=self._extra_config["databricks_connect_server_hostname"],
token=self._extra_config["databricks_connect_access_token"],
cluster_id=self._extra_config["databricks_connect_cluster_id"],
)
.userAgent("sqlmesh")
.getOrCreate()
)
else:
self._spark = (
SparkSession.builder
.appName("My Spark Application") # Name of your application
.getOrCreate()
)
print("hey here")
print(type(self._spark))
catalog = self._extra_config.get("catalog")
if catalog:
self.set_current_catalog(catalog)
return self._spark
from pyspark.errors.exceptions.base import PySparkAttributeError
def patched_cursor(self) -> SparkSessionCursor:
try:
self.spark.sparkContext.setLocalProperty("spark.scheduler.pool", f"pool_{get_ident()}")
self.spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
self.spark.conf.set("hive.exec.dynamic.partition", "true")
self.spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
except NotImplementedError:
# Databricks Connect does not support accessing the SparkContext nor does it support
# setting dynamic partition overwrite since it uses replace where
pass
except PySparkAttributeError as e:
# Databricks serverless compute ...
pass
if self.catalog:
from py4j.protocol import Py4JError
try:
self.set_current_catalog(self.catalog)
# Databricks does not support `setCurrentCatalog` with Unity catalog
# and shared clusters so we use the Databricks Unity only SQL command instead
except Py4JError:
self.spark.sql(f"USE CATALOG {self.catalog}")
return SparkSessionCursor(self.spark)
from unittest.mock import patch
# with patch('sqlmesh.engines.spark.db_api.spark_session.SparkSessionConnection.cursor', new=patched_cursor):
with patch('sqlmesh.core.engine_adapter.databricks.DatabricksEngineAdapter.spark', patched_spark):
It kind of works when modifying the sqlmesh code. Communicated findings to the team let's see if they can add support for it.