Azure / azure-sdk-for-python

This repository is for active development of the Azure SDK for Python. For consumers of the SDK we recommend visiting our public developer docs at https://learn.microsoft.com/python/azure/ or our versioned developer docs at https://azure.github.io/azure-sdk-for-python.
MIT License
4.6k stars 2.81k forks source link

StreamError sharing inputs form Sweep Job to Command Job - AzureML SDKv2 #33298

Open obiii opened 11 months ago

obiii commented 11 months ago

Describe the bug Unabel to connect steps in azureml. StreamError for connected inputs/outputs

To Reproduce Steps to reproduce the behavior:

  1. A pipeline with at least two command component steps, first one has to be a sweep job. First step produces output that has to be used as input for the second step.

In this case, we have three step pipeline: image

@pipeline()
def train_with_hyperparameters(reg_targets, clf_targets):
    ml_client_registry = get_client_registry()
    ml_client = get_client_workspace(..params...)
    pipeline_job_env = get_env_from_shared_registry(...params..)

    try:
        # Creating command component
        train_component = command(
            name="train",
            display_name="Bayesian HPO",
            description="Performs training and produces model",
            inputs={
                "reg_targets": Input(type='string'),
                "clf_targets": Input(type='string')
                ...other inputs here...
            },
            code=e.sources_directory,
            command="python -m train.main_portfolio",
            environment=pipeline_job_env,
            outputs=dict(
                auxilary_output_training = Output(type='uri_file')
            ),
        ).component(
            reg_targets=reg_targets,
            clf_targets=clf_targets,
            ml_datafile=ml_datafile,
            preprocessing_config_file=preprocessing_config_file,
            port_agg_col=port_agg_col,
            tgt_period_col=tgt_period_col,
            gpu_flag=gpu_flag,
            dataset_deploy_flag=dataset_deploy_flag
        ) # type: ignore

        search_space = {
            ...lot of parameters here...
        }
        sweep_step = train_component.sweep(
            primary_metric="mean_cv",
            ..settings here...
        )
        sweep_step.set_limits(...limits settings here...)

        #image creator component
        image_creator_component = command(
            name="image-creator",
            display_name="Image Creator",
            description="Image creator step",
            inputs={
                "port_agg_col": Input(type='string'),
                ..other inputs here...,
                "auxilary_link":Input(type='uri_file')
            },
            outputs=dict(
                auxilary_output_image_creator = Output(type='uri_file')
            ),
            code=e.image_creator_sources_directory,
            command="python -m image_creator.image_creator",
            environment=pipeline_job_env,
        ).component(
            port_agg_col=port_agg_col,
            tgt_period_col=tgt_period_col,
            auxilary_link=sweep_step.outputs.auxilary_output_training
        )#type:ignore

        #model selector component
        model_selector_component = command(
            name="sweden-backbook-model-selector",
            display_name="Model Selector",
            description="Model selector step",
            inputs={
                "model_name": Input(type='string'),
                ...other inputs here...,
                "auxilary_link":Input(type='uri_file')
            },
            code=e.best_model_sources_directory,
            command="python -m best_model_selector.register_best_model",
            environment=pipeline_job_env,
        ).component(
            model_name=model_name, #'sweden_backbook_model_day' + snapshot doesnt work, why?
            ..other values for inputs...,
            auxilary_link=image_creator_component.outputs.auxilary_output_image_creator
        )#type:ignore
        return
    except Exception as excp:
        logging.error('Unable to create command component.')
        raise excp

The train script has arparrse argument like this to output a file:

parser.add_argument('--auxilary_output_training', type=str,
                        help="Auxiliar output for following steps in aml pipeline")
joblib.dump(final_fit_zir, args.auxilary_output_training)

and the image creator step has arguments like this to handle incoming output as input: (auxilary_output_image_creator is for the output from image creator to next step model selection)

parser.add_argument('--auxilary_link',
                        type=str,
                        help="""Just a dummy input to connect steps in AML pipeline""")
parser.add_argument('--auxilary_output_image_creator',
                        type=str,
                        help="""Auxiliar output for following steps in aml pipeline""")

The Sweep Job produces the output correctly: image

and the input of image creator is also there.

image

The data asset id of outputs in sweepJob parent as well as child runs and id of input in image creator steps are same.

Expected behavior The expected behavior is that the steps should be connected using these input and outputs. But we get StreamError: image

Additional context There is no support in SDK V2 to order pipeline steps unlike in SDK V1 where we had SequenceSteps. Anyway, linking of these inputs and outputs should work.

pvaneck commented 11 months ago

Thanks for the feedback.

@azureml-github - would any of you be able to provide assistance?

github-actions[bot] commented 11 months ago

Thanks for the feedback! We are routing this to the appropriate team for follow-up. cc @Azure/azure-ml-sdk @azureml-github.

cloga commented 3 months ago

Hi @obiii , Would you help to double confirm the type of auxilary_output_training from train_component in your job in Azure machine learning?