databrickslabs / dlt-meta

This is metadata driven DLT based framework for bronze/silver pipelines
Other
125 stars 54 forks source link

Issues/49 #56

Closed ravi-databricks closed 1 day ago

ravi-databricks commented 1 week ago

Need to provide ability so that file metadata can be added to dataframe

e.g

import dlt
@dlt.table
def bronze():
  return (spark.readStream.format("cloudFiles")
    # define the schema for the ~6 common columns across files.  All other input fields will be "rescued" into a JSON string column that can be queried via dot notation.
    .schema("Common1 string, Common2 string, _file_path string") # _file_path is a hidden auto-field but shows up in rescueData column JSON with this method.  Spoofing that I have the same column in my input file so i can drop this spoofed column later
    .option("cloudFiles.format", "csv")
    .option("cloudFiles.schemaEvolutionMode", "rescue")
    .option("cloudFiles.rescuedDataColumn","extraFields") # override default _rescuedData column name with whatever you want to call this column 
    .option("header","true")
    .load("/Volumes/vol/data/*.txt")
    .select("*","_metadata") # add file metadata information to output
    .drop("_file_path") # discard dummy input column to keep _file_path out of extraFields rescue data column
  )

Introduced select_metadata_cols inside source_details in onboarding file

      "source_metadata": {
            "include_autoloader_metadata_column": "True",
            "autoloader_metadata_col_name": "source_metadata",
            "select_metadata_cols": {
               "input_file_name": "_metadata.file_name",
               "input_file_path": "_metadata.file_path"
            }

This will be utilized to add metadata columns to target tables.

Introducing custom_transform_func so that customers can bring their own transformations: e.g. you have custom transformations defined as python code as show below which takes dataframe as input and returns transformed dataframe

from pyspark.sql import DataFrame
from pyspark.sql.functions import lit
def custom_tranform_func_test(input_df) -> DataFrame:
  return input_df.withColumn('custom_col', lit('test'))

you should able to pass this function to dlt-meta invoke function as below

layer = spark.conf.get("layer", None)
from src.dataflow_pipeline import DataflowPipeline
DataflowPipeline.invoke_dlt_pipeline(spark, layer, custom_tranform_func=custom_tranform_func_test)
codecov[bot] commented 1 week ago

Codecov Report

Attention: Patch coverage is 56.25000% with 14 lines in your changes missing coverage. Please review.

Please upload report for BASE (feature/v0.0.8@3555aaa). Learn more about missing BASE report.

Files Patch % Lines
src/pipeline_readers.py 33.33% 13 Missing and 1 partial :warning:
Additional details and impacted files ```diff @@ Coverage Diff @@ ## feature/v0.0.8 #56 +/- ## ================================================= Coverage ? 88.57% ================================================= Files ? 8 Lines ? 858 Branches ? 168 ================================================= Hits ? 760 Misses ? 46 Partials ? 52 ``` | [Flag](https://app.codecov.io/gh/databrickslabs/dlt-meta/pull/56/flags?src=pr&el=flags&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=databrickslabs) | Coverage Δ | | |---|---|---| | [unittests](https://app.codecov.io/gh/databrickslabs/dlt-meta/pull/56/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=databrickslabs) | `88.57% <56.25%> (?)` | | Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=databrickslabs#carryforward-flags-in-the-pull-request-comment) to find out more.

:umbrella: View full report in Codecov by Sentry.
:loudspeaker: Have feedback on the report? Share it here.