Galileo-Galilei / kedro-mlflow

A kedro-plugin for integration of mlflow capabilities inside kedro projects (especially machine learning model versioning and packaging)
https://kedro-mlflow.readthedocs.io/
Apache License 2.0
203 stars 34 forks source link

Stable run names #579

Open lvijnck opened 3 months ago

lvijnck commented 3 months ago

Description

We're leveraging Argo Workflows to orchestrate our pipeline, which results in each of the nodes being executed as an individual kedro run -n NODE invocation. With the vanilla setup of kedro-mlflow this results in a new run id for each of the nodes, which is highly undesirable.

Context

Being able to run large pipelines in a distributed manner

Possible Implementation

To overcome this limitation, we introduced an additional constraint that enforces uniqueness of the run name (code below). We've then implemented a hook:

  1. If run-name is defined, verify run with name exists
  2. If run exists, set the run id
  3. If not exists, create run and set the run id
"""Kedro project hooks."""
from kedro.framework.hooks import hook_impl
from pyspark import SparkConf
from pyspark.sql import SparkSession
from kedro.pipeline.node import Node
from datetime import datetime
from typing import Any
import pandas as pd
import termplotlib as tpl
from omegaconf import OmegaConf

import mlflow

class MLFlowHooks:
    """Kedro MLFlow hook.

    Mlflow supports the concept of run names, which are mapped
    to identifiers behind the curtains. However, this name is not
    required to be unique and hence multiple runs for the same name
    may exist. This plugin ensures run names are mapped to a single
    identifier.
    """

    @hook_impl
    def after_context_created(self, context) -> None:
        """Initialise MLFlow run.

        Initialises a MLFlow run and passes it on for
        other hooks to consume.
        """
        cfg = OmegaConf.create(context.config_loader["mlflow"])

        if cfg.tracking.run.name:
            # Set tracking uri
            mlflow.set_tracking_uri(cfg.server.mlflow_tracking_uri)
            experiment_id = self._create_experiment(cfg.tracking.experiment.name)
            run_id = self._create_run(cfg.tracking.run.name, experiment_id)

            # Update catalog
            OmegaConf.update(cfg, "tracking.run.id", run_id)
            context.config_loader["mlflow"] = cfg

    @staticmethod
    def _create_run(run_name: str, experiment_id: str) -> str:
        """Function to create run for given run_name.

        Args:
            run_name: name of the run
            experiment_id: id of the experiment
        Returns:
            Identifier of created run
        """
        # Retrieve run
        runs = mlflow.search_runs(
            experiment_ids=[experiment_id],
            filter_string=f"run_name='{run_name}'",
            order_by=["start_time DESC"],
            output_format="list",
        )

        if not runs:
            with mlflow.start_run(
                run_name=run_name, experiment_id=experiment_id
            ) as run:
                mlflow.set_tag("created_by", "kedro")
                return run.info.run_id

        return runs[0].info.run_id

    @staticmethod
    def _create_experiment(experiment_name: str) -> str:
        """Function to create experiment.

        Args:
            experiment_name: name of the experiment
        Returns:
            Identifier of experiment
        """
        experiments = mlflow.search_experiments(
            filter_string=f"name = '{experiment_name}'"
        )

        if not experiments:
            return mlflow.create_experiment(experiment_name)

        return experiments[0].experiment_id

My suggestion would be to add a flag to the mlflow configuration, e.g..,

run:
    id: null
    name: "unique-run"
    stable_run_name: True  # ensures writing to run with specified name if exists

Possible Alternatives

Supplying a static run-id is not possible, as this results in the a ResourceNotFoundError. The API is also limited in the sense that it is not possible to create a specific run-id.

Galileo-Galilei commented 1 month ago

Hi, I understand the need for such a feature, and it would be a great addition.

However, mlflow does not let external orchestrator defines the run id on their own, and it seems really wrong to use the run name to do it, because by design it may not be unique. This would require a lot of custom logic on kedro-mlflow's side, and I am not sure this is the correct way to do it.

I think before rushing into an implementation we should investigate how people handle this for orchestrators like airflow, and eventually make such request directly in the mlflow repo. I d'ont close the issue because it's worth keeping track of this feature request, but I don't see it implemented as is.

lvijnck commented 1 month ago

Hi, I absolutely see you point. I think it's an ugly workaround, but I could not think of any other way. We use it on a daily basis now, and the RUN_NAME is injected directly from Argo Workflows, making it unique.

I do however think that the plugin should be able to allow a setup like this, otherwise this would render the plugin useless for pipelines that run distributed.