databricks / dbt-databricks

A dbt adapter for Databricks.
https://databricks.com
Apache License 2.0
226 stars 119 forks source link

Draft: #756 - implement python workflow submissions #762

Closed kdazzle closed 1 month ago

kdazzle commented 3 months ago

WIP - Stubs out implementation for #756

This pretty much implements what a workflow job submission type would look like, though I'm sure I'm missing something. Tests haven't been added yet.

Sample

Outside of the new submission type, models are the same. Here is what one could look like:

# my_model.py
import pyspark.sql.types as T
import pyspark.sql.functions as F

def model(dbt, session):
    dbt.config(
        materialized='incremental',
        submission_method='workflow_job'
    )

    output_schema = T.StructType([
        T.StructField("id", T.StringType(), True),
        T.StructField("odometer_meters", T.DoubleType(), True),
        T.StructField("timestamp", T.TimestampType(), True),
    ])
    return spark.createDataFrame(data=spark.sparkContext.emptyRDD(), schema=output_schema)

The config for a model could look like (forgive my jsonification...yaml data structures still freak me out):

models:
  - name: my_model
      workflow_job_config:
        email_notifications: {
          on_failure: ["reynoldxin@databricks.com"]
        }
        max_retries: 2
        timeout_seconds: 18000
        existing_job_id: 12341234  # not part of Databricks API (+ optional)
        additional_task_settings: {  # not part of Databricks API (+ optional)
          "task_key": "my_dbt_task"
        }
        post_hook_tasks: [{  # not part of Databricks API (+ optional)
          "depends_on": [{ "task_key": "my_dbt_task" }],
          "task_key": 'OPTIMIZE_AND_VACUUM',
          "notebook_task": {
            "notebook_path": "/my_notebook_path",
            "source": "WORKSPACE",
          },
        }]
        grants:  # not part of Databricks API (+ optional)
          view: [
            {"group_name": "marketing-team"},
          ]
          run: [
            {"user_name": "alighodsi@databricks.com"}
          ]
          manage: []
      job_cluster_config:
        spark_version: "15.3.x-scala2.12"
        node_type_id: "rd-fleet.2xlarge"
        runtime_engine: "STANDARD"
        data_security_mode: "SINGLE_USER"
        autoscale: {
          "min_workers": 1,
          "max_workers": 4
        }

Explanation

For all of the dbt configs that I added (in addition to the Databricks API attributes), I tried to roughly mediate between the dbt convention of requiring minimal configuration, but also allowing for the full flexibility of the Databricks API. Attribute names were trying to split the difference between the Databricks API and the dbt API. Happy to change the approach for anything.

Todo:

Description

Checklist

benc-db commented 1 month ago

@kdazzle can you rebase/target your PR against 1.9.latest? I have a couple of things that I need to wrap up, but I'm planning to take some version of this into the 1.9 release.

benc-db commented 1 month ago

Looks like some syntax you're using does not work with python 3.8 based on unit test failures.

benc-db commented 1 month ago

Going to pull/push to origin to run the existing functional tests. We should add one for this new code. Let me know if you need help with that.

benc-db commented 1 month ago

Going to merge in 1.9.latest changes (which is basically only 1.8 changes), ensure tests still pass, then merge.