Galileo-Galilei / kedro-mlflow

A kedro-plugin for integration of mlflow capabilities inside kedro projects (especially machine learning model versioning and packaging)
https://kedro-mlflow.readthedocs.io/
Apache License 2.0
196 stars 30 forks source link

MLflow Child Runs per Pipeline #448

Open lemonhead94 opened 11 months ago

lemonhead94 commented 11 months ago

Description

I was wondering if there was a clever way to start a nested mlflow run per executed pipeline. With the parent run being „default“.

Context

This would allow for a clear seperation of artifacts and metrics, for ETL, preprocessing, training and evaluation stage inside MLflow.

Galileo-Galilei commented 11 months ago

Hi @lemonhead94, this is a good suggestion, but how do you expect kedro-mlflow decide when to start and end sub run? Using namespaced pipelines?

Note that you can "hack" it with a custom hook:


import mlflow

from kedro.framework.hooks import hook_impl

Class MlflowSubRunHook:

    @hook_impl
    def before_node_run(self, node, catalog, inputs, is_async, session_id) -> None:
        if node.name=="<your-node-name>: 
            mlflow.start_run(nested=true)

    @hook_impl
    def after_node_run(self, node, catalog, inputs, outputs, is_async, session_id) -> None:
        if node.name=="<your-node-name>: 
            mlflow.end_run()

and then in your settings.py:


from my_project.hooks import MlflowSubRunHook

HOOKS = (MlflowSubRunHook(),)
lemonhead94 commented 11 months ago

Hi @Galileo-Galilei, thanks for the prompt response!

Yes, what I tried to do was simply cache the pipeline names in the pipeline registry file. Then start nested mlflow runs using node.namespace.

However, I came up with two problems on my end. Firstly your before_node_run hook was being executed before my own, hence the params would still be logged to the parent mlflow run. What I ended up trying was hooking your hook since you expose the instance here. This worked; however, for some reason if the artifacts are defined in the kedro catalog, they are still being logged to the parent run. Secondly, I didn't know that kedro doesn't execute linearly so I saw stuff like, preprocessing - training - preprocessing - training - evaluate ... Which in hindsight makes sense from a efficiency perspective..

So what I probably will end up doing, since you look for an existing mlflow run, is simply using subprocesses, however that is rather ugly.

import mlflow
import subprocess

pipeline_names = ["preprocessing", "training", "evaluate", "inference"]

with mlflow.start_run(run_name="RUN_001") as parent_run:
    for pipeline_name in pipeline_names:
        with mlflow.start_run(run_name=pipeline_name, nested=True) as child_run:
            kedro_command = f"kedro run --pipeline {pipeline_name}"
            process = subprocess.Popen(kedro_command, shell=True)
            process.wait()
Galileo-Galilei commented 11 months ago

Your workaround works fine, but I understand this is a bit frustrating to need to tweak kedro for this!

lemonhead94 commented 11 months ago

Sorry for the late response, I didn't have time to play around with it again until today. I think I'm going with the hard coded order of pipeline solution, maybe I have time in the future to work on a neatly integrated solution.

For now I just leave this here for anybody stumbling across this issue. This is a src/package_name/run.py file which simply runs all stages in order and creates the wanted child run behaviour:

import os
from pathlib import Path

import mlflow
from kedro.config import OmegaConfigLoader
from kedro.framework.project import configure_project
from kedro.framework.session import KedroSession
from kedro.framework.startup import bootstrap_project
from kedro.utils import load_obj

def run(pipeline: str) -> None:
    runner = load_obj("SequentialRunner", "kedro.runner")
    with KedroSession.create() as session:
        session.run(
            pipeline,
            runner=runner(is_async=False),
            pipeline_name=pipeline,
        )

def main() -> None:
    bootstrap_project(project_path=Path.cwd())
    configure_project(package_name=os.path.basename(os.path.dirname(__file__)))
    config_loader = OmegaConfigLoader(
        conf_source=f"{os.getcwd()}/conf",
        config_patterns={"mlflow": ["mlflow*", "mlflow/**", "**/mlflow*"]},
    )
    mlflow.set_experiment(
        experiment_name=config_loader["mlflow"]["tracking"]["experiment"]["name"]
    )

    pipeline_names = ["preprocess", "train", "evaluate", "inference"]
    with mlflow.start_run():
        for pipeline_name in pipeline_names:
            mlflow.start_run(run_name=pipeline_name, nested=True)
            run(pipeline_name)
            mlflow.end_run()

if __name__ == "__main__":
    main()
Galileo-Galilei commented 11 months ago

It's nice to achieve your goal, but it is a pity it comes at a price of redefining the run function and losing many kedro advantages (e.g. hooks).

I'll try to come out with a solution one day, so I let the issue open. Feel free to open a PR if you come up with some neat integration!