databricks / mlops-stacks

This repo provides a customizable stack for starting new ML projects on Databricks that follow production best-practices out of the box.
https://docs.databricks.com/en/dev-tools/bundles/mlops-stacks.html
Apache License 2.0
454 stars 154 forks source link

ValueError: Catalog 'dev' does not exist in the metastore #173

Open bs-davidnagy opened 3 days ago

bs-davidnagy commented 3 days ago

Hey,

I am new to Databricks, and I am trying to test the mlops-stack bundle.

Within that bundle there is a feature-engineering workflow and I have a problem to make it run. The main problem is the following. the bundle specified the target to be $bundle.target which is in my case would be dev. I have created the dev catalog and within the project schema according to the template.

The issue is that when I run the workflow, the notebook fails at fe.create_table( name=output_table_name, primary_keys=[x.strip() for x in pk_columns.split(",")] + [ts_column], # Include timeseries column in primary_keys timestamp_keys=[ts_column], df=features_df, )

I am getting that: ValueError: Catalog 'dev' does not exist in the metastore. And I don't understand why?. If I ran the notebook manually through all purpose cluster it works.

I printed the available catalogs when I am running through workflow through bundle and I only got spark_catalog.

arpitjasa-db commented 2 days ago

@bs-davidnagy can you confirm that you're using a UC-enabled cluster? And whoever you're running the workflow as (SP or yourself) has the right permissions on the catalog?

bs-davidnagy commented 2 days ago

@arpitjasa-db Thanks for the reply.

Regarding the metastore, it is UC attached. image

the catalog created through the UI and for now all user has ALL_PRIVILEGES. Screenshot 2024-10-18 at 08 54 12

the the catalog and the underlying Schema details: Screenshot 2024-10-18 at 08 51 52

Currently I am using the job cluster specified in the resources. So effectively I did not changed anything on the workflow.

new_cluster: &new_cluster
  new_cluster:
    num_workers: 3
    spark_version: 15.3.x-cpu-ml-scala2.12
    node_type_id: Standard_D3_v2
    custom_tags:
      clusterSource: mlops-stacks_0.4

common_permissions: &permissions
  permissions:
    - level: CAN_VIEW
      group_name: users

resources:
  jobs:
    write_feature_table_job:
      name: ${bundle.target}-aifp_sample_project-write-feature-table-job
      job_clusters:
        - job_cluster_key: write_feature_table_job_cluster
          <<: *new_cluster
      tasks:
        - task_key: PickupFeatures
          job_cluster_key: write_feature_table_job_cluster
          notebook_task:
            notebook_path: ../feature_engineering/notebooks/GenerateAndWriteFeatures.py
            base_parameters:
              # TODO modify these arguments to reflect your setup.
              input_table_path: /databricks-datasets/nyctaxi-with-zipcodes/subsampled
              # TODO: Empty start/end dates will process the whole range. Update this as needed to process recent data.
              input_start_date: ""
              input_end_date: ""
              timestamp_column: tpep_pickup_datetime
              output_table_name: ${bundle.target}.aifp_sample_project.trip_pickup_features
              features_transform_module: pickup_features

The ${bundle.target} is dev

The job cluster created by the bundle-workflow is

{
    "cluster_id": "[REDACTED]",
    "creator_user_name": "[REDACTED]",
    "spark_context_id": "[REDACTED]",
    "driver_healthy": true,
    "cluster_name": "[REDACTED]",
    "spark_version": "15.3.x-cpu-ml-scala2.12",
    "azure_attributes": {
        "first_on_demand": 1,
        "availability": "ON_DEMAND_AZURE",
        "spot_bid_max_price": -1
    },
    "node_type_id": "Standard_D3_v2",
    "driver_node_type_id": "Standard_D3_v2",
    "custom_tags": {
        "clusterSource": "mlops-stacks_0.4"
    },
    "autotermination_minutes": 0,
    "enable_elastic_disk": true,
    "disk_spec": {},
    "cluster_source": "JOB",
    "enable_local_disk_encryption": false,
    "instance_source": {
        "node_type_id": "Standard_D3_v2"
    },
    "driver_instance_source": {
        "node_type_id": "Standard_D3_v2"
    },
    "effective_spark_version": "15.3.x-cpu-ml-scala2.12",
    "state": "TERMINATED",
    "state_message": "",
    "start_time": "[REDACTED]",
    "terminated_time": "[REDACTED]",
    "last_state_loss_time": 0,
    "last_activity_time": "[REDACTED]",
    "last_restarted_time": "[REDACTED]",
    "num_workers": 3,
    "default_tags": {
        "Vendor": "Databricks",
        "Creator": "[REDACTED]",
        "ClusterName": "[REDACTED]",
        "ClusterId": "[REDACTED]",
        "JobId": "[REDACTED]",
        "RunName": "[REDACTED]",
        "Team": "[REDACTED]",
        "CostCenter": "[REDACTED]",
        "Product": "[REDACTED]",
        "Owner": "[REDACTED]",
        "env": "Development"
    },
    "termination_reason": {
        "code": "JOB_FINISHED",
        "type": "SUCCESS"
    },
    "init_scripts_safe_mode": false,
    "spec": {
        "cluster_name": "[REDACTED]",
        "spark_version": "15.3.x-cpu-ml-scala2.12",
        "azure_attributes": {
            "availability": "ON_DEMAND_AZURE"
        },
        "node_type_id": "Standard_D3_v2",
        "custom_tags": {
            "JobId": "[REDACTED]",
            "RunName": "[REDACTED]",
            "clusterSource": "mlops-stacks_0.4"
        },
        "autotermination_minutes": 0,
        "enable_elastic_disk": true,
        "num_workers": 3
    }
}

I don't understand why would a job cluster created through bundle resource, would have a different access to a catalog if the bundle executed by me. (So I am the user).

bs-davidnagy commented 2 days ago

So I run, the notebook but also add the following command: print(spark.sql("SELECT CURRENT_METASTORE()"))

and I get the following error:

[[OPERATION_REQUIRES_UNITY_CATALOG](https://docs.microsoft.com/azure/databricks/error-messages/error-classes#operation_requires_unity_catalog)] Operation CURRENT_METASTORE requires Unity Catalog enabled. SQLSTATE: 0AKUD
File <command-2336575342377498>, line 3
      1 spark.sql("CREATE DATABASE IF NOT EXISTS " + output_database)
----> 3 print(spark.sql("SELECT CURRENT_METASTORE()"))
File /databricks/spark/python/pyspark/errors/exceptions/captured.py:254, in capture_sql_exception.<locals>.deco(*a, **kw)
    250 converted = convert_exception(e.java_exception)
    251 if not isinstance(converted, UnknownException):
    252     # Hide where the exception came from that shows a non-Pythonic
    253     # JVM exception message.
--> 254     raise converted from None
    255 else:
    256     raise

So I guess the cluster is not UC enabled. But I don't really understand how can I create, a non-UC Enabled cluster from a UC enabled workspace ?.

bs-davidnagy commented 2 days ago

So I managed to run, I had to change the following on the cluster creation

    spark_version: 15.4.x-cpu-ml-scala2.12
    data_security_mode: "SINGLE_USER"

I don't really understand why the default cluster is custom/shared, if it unable to access the UC catalog during DBR is ML