Azure / azure-sdk-for-python

This repository is for active development of the Azure SDK for Python. For consumers of the SDK we recommend visiting our public developer docs at https://learn.microsoft.com/python/azure/ or our versioned developer docs at https://azure.github.io/azure-sdk-for-python.
MIT License
4.44k stars 2.75k forks source link

Ability to publish/deploy piplines without submitting - AzureML Python SDKV2 #34745

Open obiii opened 5 months ago

obiii commented 5 months ago

Hi,

With SDKv1, we used to submit the pipeline and then in CI we could just publish the pipeline to test or prod. Now, we migrate to SDKv2, and with PipelineComponentBatchDeployment there doesn't seems to be a way to deploy without submitting the job. The pipelines we have are massive, takes lot of time to complete.

We are using command component as can be seen below in custom_ppl function. The function PipelineComponentBatchDeployment accepts either job_definition (which works if I have submitted the pipeline first) or component (if the component is loaded, but we don't register components and therefore cant use this)

Here is the code we are using:

@pipeline()
def custom_ppl(arg1, arg2):
    logging.info("Getting registry client.")
    ml_client_registry = someFunction1()´

    pipeline_job_env = someFunctToGetEnv3()

    # Creating command component
    preprocess_component = command(
        name="Prerocessing",
        display_name="Preprocessing",
        description="Performs ts preprocessing.",
        inputs={
            "someInputs": Input(type="string"),
            "someInputs2": Input(type="string")
        },
        code=e.sources_directory,
        command="python -m preprocess.preprocessor_ts",
        environment=pipeline_job_env,
    ).component(
        someInputs=arg1,
        someInputs2=arg2
    )
    return

def main(args):
    logging.info("Getting workspace client.")
    ml_client_workspace = someFunction2()
    someFunctToSetupCOmpute()

    pipeline_job = custom_ppl(
        arg1 = args.userInput1,
        arg2 = args.userInput2
    )
    pipeline_job.settings.default_datastore = e.default_datastore
    pipeline_job.settings.default_compute = e.compute_name_train_ts
    pipeline_job.settings.force_rerun = args.rerun

    if args.submit:
        logging.info(f"Submitting pipeline: {pipeline_job}")
        try:
            pipeline_run_job = ml_client.jobs.create_or_update(
                pipeline_job, experiment_name=e.experiment_name_preprocess_ts
            )
            ml_client.jobs.stream(name=pipeline_run_job.name)
        except Exception as excp:
            logging.error("Unable to complete the pipeline")
            raise excp

    if args.deploy:
        endpoint = create_endpoint(
            ml_client, e.endpoint_name_preprocess, "Preprocessing end point."
        )

        logging.info("Creating deployment profile.")
        deployment = PipelineComponentBatchDeployment(
            name=e.deployment_name_preprocess,
            description="bb preprocessing pipeline."
            "This deployment is created from a pipeline job.",
            endpoint_name=endpoint.name,
            job_definition=pipeline_run_job,  # type: ignore
            settings={
                "default_compute": e.compute_name_train_ts,  # type: ignore
                "default_datastore": e.default_datastore,
                "continue_on_step_failure": False,
            },
        )

        ml_client.batch_deployments.begin_create_or_update(deployment).result()
        logging.info(f"Deployment succeeded: {deployment}")

        logging.info("Updating default endpoint deployment.")
        endpoint = ml_client.batch_endpoints.get(endpoint.name)
        endpoint.defaults.deployment_name = deployment.name
        ml_client.batch_endpoints.begin_create_or_update(endpoint).result()

        #How do we deploy without submit the pipeline first.

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--userInput1""
    )
    parser.add_argument(
        "--userInput2"
    )
    parser.add_argument(
        "--submit", action="store_true", help="use to submit the pipeline experiment."
    )
    parser.add_argument(
        "--deploy", action="store_true", help="use to submit the pipeline experiment."
    )
    args = parser.parse_args()
    main(args)

Question is: in if args.deploy, how do we deploy the pipeline without submitting it first, as our pipelines are massive and take lot of time to complete. We now have to submit it everytime when deploying to dev, deploying to test and deploying to prod. Previously, with sdkv1, we used to submit once and then publish to dev test and prod without resubmitting.

Related post on Micrisoft: https://learn.microsoft.com/en-us/answers/questions/1159059/publishing-pipeline-endpoints-with-sdk-v2

github-actions[bot] commented 5 months ago

Thanks for the feedback! We are routing this to the appropriate team for follow-up. cc @Azure/azure-ml-sdk @azureml-github.

annatisch commented 5 months ago

Thanks for the report @obiii! I have looped in the ML team to take a closer look at your scenario.

obiii commented 5 months ago

Hi, is there any update on this please?

edgBR commented 5 months ago

Hi,

I am actually having the same problem.

Is there are way to find a work around?

BR E

banibrata-de commented 5 months ago

@santiagxf would you be able to help here!

santiagxf commented 5 months ago

Hi @edgBR and @obiii. You don't need to run the job to be able to deploy it. This is completely supported. Instead of indicating job_definition in class PipelineComponentBatchDeployment, indicate the parameter component. This example shows how to do it. If you want to avoid registering the pipeline as a component first, you can also follow this example.

In your code it would be as follows:

deployment = PipelineComponentBatchDeployment(
            name=e.deployment_name_preprocess,
            description="bb preprocessing pipeline."
            endpoint_name=endpoint.name,
                       "This will create a component out of your pipeline definition"
            component=custom_ppl().component,
            settings={
                "default_compute": e.compute_name_train_ts,  # type: ignore
                "default_datastore": e.default_datastore,
                "continue_on_step_failure": False,
            },
        )
santiagxf commented 5 months ago

Hi @obiii. sorry for the confusion. The error is indicating the component (pipeline component) doesn't exist. Batch Endpoints allows you to manage the lifecycle of the deployments and for that pipeline components are versioned as assets you can manage. Pipeline components are reusable, and you can streamline your MLOps practice by using shared registries to move these components from one workspace to another.

You need to create the component before deploying it:

pipeline_component = ml_client.components.create_or_update(
    my_ppl().component
)

Then on your arguments, you can change it as follows:

job_type_params = {
    "component": pipeline_component  # type: ignore
}

Here you have a full notebook showing the scenario.

obiii commented 5 months ago

Hi @obiii. sorry for the confusion. The error is indicating the component (pipeline component) doesn't exist. Batch Endpoints allows you to manage the lifecycle of the deployments and for that pipeline components are versioned as assets you can manage. Pipeline components are reusable, and you can streamline your MLOps practice by using shared registries to move these components from one workspace to another.

You need to create the component before deploying it:

pipeline_component = ml_client.components.create_or_update(
    my_ppl().component
)

Then on your arguments, you can change it as follows:

job_type_params = {
    "component": pipeline_component  # type: ignore
}

Here you have a full notebook showing the scenario.

Hi @santiagxf,

preprocess_component = command(
    name="cmp_preprocessing_sweden_backbook",
    display_name="PreprocessingComponent",
    description="Performs ts preprocessing.",
    inputs={
        "arg1": Input(type="string")
    },
    code=e.sources_directory,
    command="python -m preprocess.preprocessor_ts --arg1${{inputs.arg1}},
    environment=preprocess_env,
    tags={'build': e.target_env_tag}
)

@pipeline()
def my_ppl(arg1: str):
    preprocess_component(arg1=arg1)

ml_client_registry = get_client_registry(
    credentials, e.registry_name, e.registry_region
)
ppl_component = ml_client_registry.components.create_or_update(
    my_ppl().component
)

This results in:

Creating/updating registry component cmp_preprocessing_sweden_backbook with version 1 ..Done (0m 6s) azure.core.exceptions.ResourceNotFoundError: (UserError) Could not find asset with assetId azureml://registries/my-ml-registry/components/cmp_preprocessing_sweden_backbook/versions/1 in registry my-ml-registry in region westeurope.There is no pending update on this registry. Code: UserError Message: Could not find asset with assetId azureml://registries/my-ml-registry/components/cmp_preprocessing_sweden_backbook/versions/1 in registry my-ml-registry in region westeurope.There is no pending update on this registry.

And with this, I see a new command component registration in the shared ML registry but with name = azureml_anonymous and version number that was a different format i.e 4da46664-682e-1749-1930-22c41c527154. So, it seems like it registers the component but with different version and tries to look for a different one, resulting in ResourceNotFound? I am not sure.

However, it works fine if I use workspace client instead. Can you please help? Thanks!

obiii commented 5 months ago

Hi,

is there any update on this please :) ? Thanks

santiagxf commented 5 months ago

Hi @obiii. Thanks for reporting this issue with registries and components. I'm creating a new issue for this as it looks it's not related with the original problem. It's easier for us to plug the right people in. Give us some time to route it to the pipeline components team.

34891