databrickslabs / dlt-meta

Metadata driven Databricks Delta Live Tables framework for bronze/silver pipelines
https://databrickslabs.github.io/dlt-meta/
Other
156 stars 71 forks source link

Add support for dlt.apply_changes_from_snapshot #86

Open ravi-databricks opened 3 months ago

ravi-databricks commented 3 months ago

Provide support for dlt.apply_changes_from_snapshot

ravi-databricks commented 2 months ago

Implementation Details: Onboarding:

  1. Introduce snapshot format inside onboarding file
  2. Introduce bronze_apply_changes_from_snapshot config keys and scd_type are mandatory fields
    "bronze_apply_changes_from_snapshot":{
      "keys": ["id"] 
      "scd_type": "1"
      "track_history_column_list": []
      "track_history_except_column_list":[]
    }

DataflowPipeline:

  1. Add argument to dataflowpipeline to accept snapshot_reader_func
  2. snapshot_reader_func will be applied to dlt.apply_changes_from_snapshot while doing bronze write

Usage:

  1. Provide snapshot reader function in a notebook while invoking Dataflowpipeline:
  2. Introduce new method
    pip install dlt-meta
    
    import dlt
    from src.dataflow_spec import BronzeDataflowSpec

def exist(path): try: if dbutils.fs.ls(path) is None: return False else: return True except: return False

def next_snapshot_and_version(latest_snapshot_version, dataflow_spec): latest_snapshot_version = latest_snapshot_version or 0 next_version = latest_snapshot_version + 1
bronze_dataflow_spec: BronzeDataflowSpec = dataflow_spec options = bronze_dataflow_spec.readerConfigOptions snapshot_format = bronze_dataflow_spec.sourceDetails["snapshot_format"] snapshot_root_path = bronze_dataflow_spec.sourceDetails['path']
snapshot_path = f"{snapshot_root_path}{next_version}.csv" if (exist(snapshot_path)): snapshot = spark.read.format(snapshot_format).options(**options).load(snapshot_path) return (snapshot, next_version) else:

No snapshot available

    return None 

layer = spark.conf.get("layer", None) from src.dataflow_pipeline import DataflowPipeline DataflowPipeline.invoke_dlt_pipeline(spark, layer, next_snapshot_and_version=next_snapshot_and_version)