adidas / lakehouse-engine

The Lakehouse Engine is a configuration driven Spark framework, written in Python, serving as a scalable and distributed engine for several lakehouse algorithms, data flows and utilities for Data Products.
https://adidas.github.io/lakehouse-engine-docs/
Apache License 2.0
198 stars 36 forks source link

[FEATURE] I would like to have the capability develop using python classes instead of acons #14

Open callmesora opened 4 days ago

callmesora commented 4 days ago

Is your feature request related to a problem? Please describe. Acons are harder to debug and to be validated on the user end. Doing a class based solution would allow us to use validations for example with pydantic and have a better flow as a developer and allow the use of a debuger Describe the solution you'd like

This is a rough sketch (classes I used here don't make much sense) just to get the idea but something along this line of

def main():
    options = Options(
        badRecordsPath="s3://my-data-product-bucket/badrecords/order_events_with_dq/",
        header=False,
        delimiter="\u005E",
        dateFormat="yyyyMMdd"
    )

    input_spec = InputSpec(
        spec_id="orders_bronze",
        read_type="streaming",
        data_format="csv",
        schema_path="s3://my-data-product-bucket/artefacts/metadata/bronze/schemas/orders.json",
        with_filepath=True,
        options=options,
        location="s3://my-data-product-bucket/bronze/orders/"
    )

    transformers = [
        Transformer(function="with_row_id"),
        Transformer(
            function="with_regex_value",
            args={
                "input_col": "lhe_extraction_filepath",
                "output_col": "extraction_date",
                "drop_input_col": True,
                "regex": ".*WE_SO_SCL_(\\d+).csv"
            }
        )
    ]

    transform_spec = TransformSpec(
        spec_id="orders_bronze_with_extraction_date",
        input_id="orders_bronze",
        transformers=transformers
    )

    dq_functions = [
        DQFunction(dq_function="expect_column_values_to_not_be_null", args={"column": "omnihub_locale_code"}),
        DQFunction(dq_function="expect_column_unique_value_count_to_be_between", args={"column": "product_division", "min_value": 10, "max_value": 100}),
        DQFunction(dq_function="expect_column_max_to_be_between", args={"column": "so_net_value", "min_value": 10, "max_value": 1000}),
        DQFunction(dq_function="expect_column_value_lengths_to_be_between", args={"column": "omnihub_locale_code", "min_value": 1, "max_value": 10}),
        DQFunction(dq_function="expect_column_mean_to_be_between", args={"column": "coupon_code", "min_value": 15, "max_value": 20})
    ]

    dq_spec = DQSpec(
        spec_id="check_orders_bronze_with_extraction_date",
        input_id="orders_bronze_with_extraction_date",
        dq_type="validator",
        result_sink_db_table="my_database.my_table_dq_checks",
        fail_on_error=False,
        dq_functions=dq_functions
    )

    merge_options = MergeOptions(
        merge_predicate="""
            new.sales_order_header = current.sales_order_header
            and new.sales_order_schedule = current.sales_order_schedule
            and new.sales_order_item=current.sales_order_item
            and new.epoch_status=current.epoch_status
            and new.changed_on=current.changed_on
            and new.extraction_date=current.extraction_date
            and new.lhe_batch_id=current.lhe_batch_id
            and new.lhe_row_id=current.lhe_row_id
        """,
        insert_only=True
    )

    output_spec = OutputSpec(
        spec_id="orders_silver",
        input_id="check_orders_bronze_with_extraction_date",
        data_format="delta",
        write_type="merge",
        partitions=["order_date_header"],
        merge_opts=merge_options,
        db_table="my_database.my_table_with_dq",
        location="s3://my-data-product-bucket/silver/order_events_with_dq/",
        with_batch_id=True,
        options={"checkpointLocation": "s3://my-data-product-bucket/checkpoints/order_events_with_dq/"}
    )

    terminate_spec = TerminateSpec(
        function="optimize_dataset",
        args={"db_table": "my_database.my_table_with_dq"}
    )

    exec_env = ExecEnv(spark_databricks_delta_schema_autoMerge_enabled=True)

    engine = LakehouseEngine(
        input_specs=[input_spec],
        transform_specs=[transform_spec],
        dq_specs=[dq_spec],
        output_specs=[output_spec],
        terminate_specs=[terminate_spec],
        exec_env=exec_env
    )

    engine.run()

if __name__ == "__main__":
    main()

As oposed to full acon based:

from [lakehouse_engine.engine](https://adidas.github.io/lakehouse-engine-docs/lakehouse_engine/engine.html) import load_data

acon = {
  "input_specs": [
    {
      "spec_id": "orders_bronze",
      "read_type": "streaming",
      "data_format": "csv",
      "schema_path": "s3://my-data-product-bucket/artefacts/metadata/bronze/schemas/orders.json",
      "with_filepath": True,
      "options": {
        "badRecordsPath": "s3://my-data-product-bucket/badrecords/order_events_with_dq/",
        "header": False,
        "delimiter": "\u005E",
        "dateFormat": "yyyyMMdd"
      },
      "location": "s3://my-data-product-bucket/bronze/orders/"
    }
  ],
  "transform_specs": [
    {
      "spec_id": "orders_bronze_with_extraction_date",
      "input_id": "orders_bronze",
      "transformers": [
        {
          "function": "with_row_id"
        },
        {
          "function": "with_regex_value",
          "args": {
            "input_col": "lhe_extraction_filepath",
            "output_col": "extraction_date",
            "drop_input_col": True,
            "regex": ".*WE_SO_SCL_(\\d+).csv"
          }
        }
      ]
    }
  ],
  "dq_specs": [
    {
      "spec_id": "check_orders_bronze_with_extraction_date",
      "input_id": "orders_bronze_with_extraction_date",
      "dq_type": "validator",
      "result_sink_db_table": "my_database.my_table_dq_checks",
      "fail_on_error": False,
      "dq_functions": [
        {
          "dq_function": "expect_column_values_to_not_be_null",
          "args": {
            "column": "omnihub_locale_code"
          }
        },
        {
          "dq_function": "expect_column_unique_value_count_to_be_between",
          "args": {
            "column": "product_division",
            "min_value": 10,
            "max_value": 100
          }
        },
        {
          "dq_function": "expect_column_max_to_be_between",
          "args": {
            "column": "so_net_value",
            "min_value": 10,
            "max_value": 1000
          }
        },
        {
          "dq_function": "expect_column_value_lengths_to_be_between",
          "args": {
            "column": "omnihub_locale_code",
            "min_value": 1,
            "max_value": 10
          }
        },
        {
          "dq_function": "expect_column_mean_to_be_between",
          "args": {
            "column": "coupon_code",
            "min_value": 15,
            "max_value": 20
          }
        }
      ]
    }
  ],
  "output_specs": [
    {
      "spec_id": "orders_silver",
      "input_id": "check_orders_bronze_with_extraction_date",
      "data_format": "delta",
      "write_type": "merge",
      "partitions": [
        "order_date_header"
      ],
      "merge_opts": {
        "merge_predicate": """
            new.sales_order_header = current.sales_order_header
            and new.sales_order_schedule = current.sales_order_schedule
            and new.sales_order_item=current.sales_order_item
            and new.epoch_status=current.epoch_status
            and new.changed_on=current.changed_on
            and new.extraction_date=current.extraction_date
            and new.lhe_batch_id=current.lhe_batch_id
            and new.lhe_row_id=current.lhe_row_id
        """,
        "insert_only": True
      },
      "db_table": "my_database.my_table_with_dq",
      "location": "s3://my-data-product-bucket/silver/order_events_with_dq/",
      "with_batch_id": True,
      "options": {
        "checkpointLocation": "s3://my-data-product-bucket/checkpoints/order_events_with_dq/"
      }
    }
  ],
  "terminate_specs": [
    {
      "function": "optimize_dataset",
      "args": {
        "db_table": "my_database.my_table_with_dq"
      }
    }
  ],
  "exec_env": {
    "spark.databricks.delta.schema.autoMerge.enabled": True
  }
}

load_data(acon=acon)
jmcorreia commented 3 days ago

Hi @callmesora,

thanks for opening the idea for a new Feature and for the sketch/suggestion 💯. This is definitely something that I feel would make lots of sense and I have been having it on my personal backlog for a while, but which did not collect enough interest or demand to pursue, for now. The idea would always be to keep the current approach, but also provide an additional interface (along the suggested lines). We actually already have some of the ingredients there, so the work needed is really on this "interface" solely.

I will keep this open, so that we can collect more interest or even ideas until we have enough capacity to take it. Otherwise, please also feel free to contribute and I can help you out if you face any problems or want to discuss it further.