Azure / azureml-examples

Official community-driven Azure Machine Learning examples, tested with GitHub Actions.
https://docs.microsoft.com/azure/machine-learning
MIT License
1.75k stars 1.43k forks source link

AzureML Custom Preprocessing Component as SDK variant #3132

Closed hmrc87 closed 5 months ago

hmrc87 commented 6 months ago

Describe your suggestion

Hi @ahughes-msft ,

in cli/monitoring/components/custom_preprocessing/spec.yaml you are using a CLI Spark component description for the custom preprocessing component.

Is this also possible with the SDK? I am using the command_component decorator but I don't know how to specify the Spark type

Background: I am really against using the yaml based descriptions everywhere. It provides no type safety and detaches from the source code

Additional details

No response

ahughes-msft commented 6 months ago

Hi @hmrc87 ,

Here is some information on how to do it in the SDK. We are working on adding this to our documentation:

The reference data you defined is an uri_folder, but you did not provide the data window, so we consider it as a fixed data. We only add the preprocessor for a dynamically changing uri_folder, which means the data in the uri_folder will increase as time goes by. And you need to define the data window so that each monitor run will pick up the right data. If you do not specify the data_window, we will treat it as a fix data and consider it as a fix mltable. So please refer to the following example for the reference data: reference_data_training = ReferenceData( input_data=Input( type="uri_folder", path="azureml:uri_folder_log:1" ), data_context=MonitorDatasetContext.MODEL_OUTPUTS, pre_processing_component="azureml:custom_preprocessor:1.0.0", data_window=BaselineDataRange( lookback_window_offset="P0D", lookback_window_size="P10D", ) )

For production data, as we consider it as an output of model and we automatically add the a default data_window which is the last one day. You do not need to add the datawindow. Of course, you can specify the production data window as well.

hmrc87 commented 6 months ago

Hi @ahughes-msft I think you mixed up the questions and answered another question

ahughes-msft commented 5 months ago

Hi @hmrc87,

You can use this link to build your Spark component with the SDK.

Then you can use this link to observe how to register the component to your workspace. Then you can reference it from your Monitoring pipeline job.

hmrc87 commented 5 months ago

Thanks!!!

emrynHofmannElephant commented 3 months ago

Hi, sorry to add to an older issue. I've not used a spark pre-processing component, but I've made one that takes my input data (a single parquet file in a directory), filters using the time window, and then outputs an MLTable.

However, when I try to use this for monitoring, it fails as you can't currently supply an Output object as a parameter? Any ideas?

Getting:

usage: create_mltable_from_prep_data_component.py [-h]
                                                  [--data_window_start DATA_WINDOW_START]
                                                  [--data_window_end DATA_WINDOW_END]
                                                  [--input_data INPUT_DATA]
                                                  [--preprocessed_input_data PREPROCESSED_INPUT_DATA]
create_mltable_from_prep_data_component.py: error: argument --preprocessed_input_data: expected one argument

within my monitoring pipeline.

Code used to create component:

command_string = (
    "python create_mltable_from_prep_data_component.py" +
    " --data_window_start ${{inputs.data_window_start}}" + 
    " --data_window_end ${{inputs.data_window_end}}" + 
    " --input_data ${{inputs.input_data}}" + 
    " --preprocessed_input_data ${{outputs.preprocessed_input_data}}"
)
# Next define your inputs & outputs
command_input = {
    "data_window_start": Input(type="string"),
    "data_window_end": Input(type="string"),
    "input_data": Input(type="uri_folder")
}
command_output = {
    "preprocessed_input_data": Output(type="mltable", mode="direct")
}

mltable_component = CommandComponent(
    name="project_monitoring_mltable",
    display_name="MLTable creation for Azure Monitoring",
    description="",
    tags={"tag1":"val1"},
    auto_increment_version=True,
    inputs=command_input,
    outputs=command_output,
    command=command_string,
    code="./",
    environment="env@latest",
)

ml_client.components.create_or_update(mltable_component)

Python source file: (create_mltable_from_prep_data_component.py - adapted from the example repo.)

import argparse
import pandas as pd
import mltable
import tempfile
from azureml.fsspec import AzureMachineLearningFileSystem
from datetime import datetime
from dateutil import parser

def preprocess(
    data_window_start,
    data_window_end,
    input_data,
    preprocessed_input_data,
):
    format_data = "%Y-%m-%d %H:%M:%S"
    start_datetime = parser.parse(data_window_start)
    start_datetime = datetime.strptime(
        str(start_datetime.strftime(format_data)), format_data
    )

    end_datetime = parser.parse(data_window_end)
    end_datetime = datetime.strptime(
        str(end_datetime.strftime(format_data)), format_data
    )

    tbl = mltable.from_parquet_files([{"file": input_data}])

    tbl.traits.timestamp_column = "Date"
    tbl.traits.index_columns = ["ID", "ID2"]

    filterStr = f"Date >= datetime({start_datetime.year}, {start_datetime.month}, {start_datetime.day}) and Date <= datetime({end_datetime.year}, {end_datetime.month}, {end_datetime.day})"
    tbl.filter(filterStr)
    # Save it to the folder: Note you need to give it a directory, not a file path
    tbl.save(path=preprocessed_input_data)

def run():
    """Compute data window and preprocess data from MDC."""
    # Parse arguments
    parser = argparse.ArgumentParser()
    parser.add_argument("--data_window_start", type=str)
    parser.add_argument("--data_window_end", type=str)
    parser.add_argument("--input_data", type=str)
    parser.add_argument("--preprocessed_input_data", type=str)
    args = parser.parse_args()

    preprocess(
        args.data_window_start,
        args.data_window_end,
        args.input_data,
        args.preprocessed_input_data,
    )

if __name__ == "__main__":
    run()

Monitoring Data code:

#define target dataset (production dataset)
production_data = ProductionData(
    input_data=Input(
        type="uri_folder",
        path="azureml:<path_to_prod_dir_asset>:1"
    ),
    data_context=MonitorDatasetContext.MODEL_INPUTS,
    pre_processing_component="azureml:project_monitoring_mltable:1"
)

# training data to be used as reference dataset
reference_data_training = ReferenceData(
    input_data=Input(
        type="uri_folder",
        path="azureml:<path_to_training_dir_asset>:1",
    ),
    data_context=MonitorDatasetContext.TRAINING,
    pre_processing_component="azureml:project_monitoring_mltable:1"
)