Azure / azure-sdk-for-python

This repository is for active development of the Azure SDK for Python. For consumers of the SDK we recommend visiting our public developer docs at https://learn.microsoft.com/python/azure/ or our versioned developer docs at https://azure.github.io/azure-sdk-for-python.
MIT License
4.44k stars 2.75k forks source link

Enable creation of StepSequence in AzureML SDK 2.0 #33288

Open edgBR opened 9 months ago

edgBR commented 9 months ago

Is your feature request related to a problem? Please describe.

Hi, one of my team members is working very hard to to migrate Pipelines from Azure Machine Learning SDK V1 to V2, but we are still missing quite a lot of features from SDK 1.0. These features sometimes are in preview and sometimes they do not exist.

In V1, I just had to create PythonScriptStep and wrap it into a StepSequence and deploy the pipeline. My scripts are simple, no input, no outputs. We store data in ADLS Gen2. This is why I don't have any inputs/outputs.

With V2, we need to create a "node" in a component that will be use by a pipeline. Currently we can do this with a dummy input:

step_1 = command(
environment=azureml_env,
command="python step1.py",
code="./my_project_folder",
outputs=dummy_output
)
step_2 = command(
environment=azureml_env,
command="python step2.py",
code="./my_project_folder",
inputs=dummy_output,
outputs=dummy_output2
)

Describe the solution you'd like

Bring back StepSequence(steps)

Describe alternatives you've considered

This amazing solution provided by https://stackoverflow.com/users/5462743/begreen:

from collections import OrderedDict
from pathlib import Path
from typing import List
import random
import string

from azure.ai.ml import Input, Output, command
from azure.ai.ml.dsl import pipeline
from azure.ai.ml.entities import BuildContext, Environment

class StepsGraph:
    def __init__(self):
        """
        Initialize a StepsGraph instance to manage step dependencies.

        The StepsGraph uses an ordered dictionary to store steps and their dependencies.
        """
        self.steps = OrderedDict()

    def add_edges(self, step_1, step_2):
        """
        Add a dependency relationship between two steps.

        Args:
            step_1: The first step.
            step_2: The step that depends on the first step.
        """
        if step_1 not in self.steps:
            self.steps[step_1] = []
        if step_2 not in self.steps:
            self.steps[step_2] = []
        self.steps[step_1].append(step_2)

    def get_dependency(self):
        """
        Get the steps in the order of their dependencies.

        Returns:
            List: A list of steps in the order they need to be executed to satisfy all dependencies.
        """

        def dfs(node, visited, result):
            visited[node] = True
            if node in self.steps:
                for neighbor in self.steps[node]:
                    if not visited[neighbor]:
                        dfs(neighbor, visited, result)
            result.append(node)

        visited = {step: False for step in self.steps}
        result = []

        for step in self.steps:
            if not visited[step]:
                dfs(step, visited, result)

        return result[::-1]

    def get_parents(self, step):
        """
        Get the steps that are dependent on a given step.

        Args:
            step: The step to find dependent steps for.

        Returns:
            List: A list of steps that depend on the given step.
        """
        parents = []
        for s, connections in self.steps.items():
            if step in connections:
                parents.append(s)
        return parents

    def print_steps(self):
        for step, edges in self.steps.items():
            print(f"Step {step} -> {edges}")

def create_input(step):
    """
    Create an input dictionary for a step.

    Args:
        step (str): The name of the step for which to create an input.

    Returns:
        dict: A dictionary representing the input for the specified step with the following structure:
            {step: Input(type="uri_folder", mode="rw_mount")}
    """
    return {f"{step}": Input(type="uri_folder", mode="rw_mount")}

def create_output(step):
    """
    Create an output dictionary for a step.

    Args:
        step (str): The name of the step for which to create an output.

    Returns:
        dict: A dictionary representing the output for the specified step with the following structure:
            {step: Output(type="uri_folder", mode="rw_mount")}
    """
    return {f"{step}": Output(type="uri_folder", mode="rw_mount")}

def create_pipeline(steps_graph, default_compute, name, experiment_name):
    """
    Create a pipeline with specified steps and dependencies.

    Args:
        steps_graph (Step or StepsGraph): A Step or StepsGraph object representing the steps and their dependencies in the pipeline.
            If a Step is provided, it will be treated as a standalone step.
        default_compute: The default compute target for the pipeline (or None for serverless execution).
        name (str): The name of the pipeline.
        experiment_name (str): The name of the experiment associated with the pipeline.

    Returns:
        Callable: A callable function representing the created pipeline.

    Raises:
        ValueError: If `name` or `experiment_name` is not provided.

    Note:
        - The `steps_graph` argument can be a single Step or a StepsGraph object.
        - The pipeline's structure is determined by the dependencies defined in the `steps_graph`.
        - The pipeline is created as a Python function and can be executed when called.

    Example:
        # Create a pipeline with specific steps and dependencies

        steps_graph = StepsGraph()
        step1 = Step(...)
        step2 = Step(...)
        step3 = Step(...)

        steps_graph.add_edges(step_1, step_2)
        steps_graph.add_edges(step_2, step_3)
        steps_graph.add_edges(step_2, step_4)
        steps_graph.add_edges(step_2, step_6)
        steps_graph.add_edges(step_4, step_5)
        steps_graph.add_edges(step_3, step_7)
        steps_graph.add_edges(step_6, step_7)
        steps_graph.add_edges(step_5, step_7)

        pipeline_job = create_pipeline(steps_graph, default_compute="my_compute", name="my_pipeline", experiment_name="my_experiment")
    """
    # default_compute = None => Serverless
    if not name:
        raise ValueError("Please provide a `name` for your pipeline.")
    if not experiment_name:
        raise ValueError("Please provide an `experiment_name` for your pipeline.")

    @pipeline(
        default_compute=default_compute,
        name=experiment_name,
        experiment_name=experiment_name,
    )
    def default_pipeline():
        if isinstance(steps_graph, Step):
            steps_graph.build()()
            return
        dependency_oder = steps_graph.get_dependency()
        command_dict = {}
        parent_dict = {}

        for step, edges in steps_graph.steps.items():
            print(f"Step {step} -> {edges}")
            parent_dict[str(step)] = steps_graph.get_parents(step)

        print(f"parent_dict : {parent_dict}")
        print(f"dependency_oder: {dependency_oder}")
        for step in dependency_oder:
            print(f"step : {step}")
            inputs_dict = {}
            step.update_link(
                outputs=create_output(step),
            )
            for parent_node in reversed(parent_dict[str(step)]):
                step.update_link(
                    inputs=create_input(parent_node),
                )
                custom_output = getattr(
                    command_dict[str(parent_node)].outputs, str(parent_node)
                )
                input_name = list(parent_node.outputs.keys())[
                    0
                ]  # Because we know we have only one output per steps
                inputs_dict[input_name] = custom_output

            print(inputs_dict)

            for key, value in inputs_dict.items():
                print(key, value._port_name)

            print(step.inputs)
            command_dict[str(step)] = step.build()(**inputs_dict)

    return default_pipeline()

def generate_custom_uuid(length=8, parts=4):
    custom_uuid = ""
    for _ in range(parts):
        part = "".join(random.choices(string.ascii_letters + string.digits, k=length))
        custom_uuid += part + "_"

    custom_uuid = custom_uuid[:-1]
    return custom_uuid

class Step:
    """
    Represents a step in a StepsGraph.

    This class is used to define and manage the properties of a step,
    including its inputs and outputs. It provides methods for updating
    the input and output links and for building the step's command.

    Attributes:
        inputs (dict): A dictionary of input values for the step.
        outputs (dict): A dictionary of output values for the step.

    Methods:
        __init__(self, **kwargs): Initializes a Step object with optional
            keyword arguments to set initial properties.
        __str__(self): Returns a string representation of the step.
        update_link(self, inputs=None, outputs=None): Updates the input and
            output links with the provided dictionaries.
        build(self): Builds and returns the command for executing the step.

    Example usage:
    >>> my_step = Step(name="Sample Step", inputs={"input_1": "value1"})
    >>> my_step.update_link(outputs={"output_1": "result"})
    >>> command = my_step.build()
    >>> # Then you need to call the command to build the inputs/outputs. Use `create_pipeline` for this.
    """

    def __init__(self, **kwargs):
        self.inputs = None
        self.outputs = None
        self.__dict__.update(kwargs)
        self.uuid = self.display_name + "_" + generate_custom_uuid()

    def __str__(self):
        return self.uuid

    def update_link(self, inputs=None, outputs=None):
        if self.inputs and inputs:
            self.inputs.update(inputs)
        elif inputs:
            self.inputs = inputs
        if self.outputs and outputs:
            self.outputs.update(outputs)
        elif outputs:
            self.outputs = outputs

    def build(self):
        return command(**self.__dict__)

Additional context

We should have feature parity from 1.x to 2.x

github-actions[bot] commented 9 months ago

Thanks for the feedback! We are routing this to the appropriate team for follow-up. cc @Azure/azure-ml-sdk @azureml-github.

github-actions[bot] commented 9 months ago

Thanks for the feedback! We are routing this to the appropriate team for follow-up. cc @Azure/azure-ml-sdk @azureml-github.

edgBR commented 9 months ago

Hi any updates?

dnbmilano commented 1 month ago

It would be great if this were to be implemented!

cloga commented 1 month ago

Hi @edgBR , In v2, we rely data dependence to define the running order of steps, please add dummy data dependency between steps to define the running order.