Galileo-Galilei / kedro-mlflow

A kedro-plugin for integration of mlflow capabilities inside kedro projects (especially machine learning model versioning and packaging)
https://kedro-mlflow.readthedocs.io/
Apache License 2.0
194 stars 29 forks source link

Catalog entries with factory patterns are not recognized by PipelineML #516

Closed sebastiandro closed 5 months ago

sebastiandro commented 5 months ago

Description

Hello there! First off, I want to thank you for this great plugin :)

The problem

I'm using modular pipelines and dataset factories. When using the pipeline_ml_factory to deploy pipelines to MLFlow, I've encountered issues with KedroMLFlow not recognizing catalogue entries with factory patterns.

Use case example:

Let's say we have a model pipeline that we want to run on different datasets. To differentiate the datasets, we use namespaces.

If we have the current setup:

model_pipeline = pipeline(
    [
        node(
            func=train,
            inputs="data",
            outputs="model",
            tags=["training"]
        ),
        node(
            func=infer,
            inputs=["model", "model_input"]
            outputs="prediction",
            tags=["inference"]
        ),
    ]
)

model_pipeline_a = pipeline(
    pipe=model_pipeline,
    namespace="dataset_a",
)

model_pipeline_b = pipeline(
    pipe=model_pipeline,
    namespace="dataset_b",
)

Our catalogue contains the following entries to make sure we are persisting the data and model separated by namespace:

{namespace}.data:
  type: pandas.CSVDataset
  filepath: data/01_raw/{namespace}/data.csv

{namespace}.model:
  type: pickle.PickleDataset
  filepath: data/02_models/{namespace}/model.pkl

If we setup the PipelineML instance for pipeline a:

deploy_pipeline_a = pipeline_ml_factory(
        training=model_pipeline_a.only_nodes_with_tags("training"),
        inference=model_pipeline_a.only_nodes_with_tags("inference"),
        input_name="dataset_a.model_input",
        log_model_kwargs=dict(
            artifact_path="artifacts",
            conda_env={
                "python": 3.10,
                "dependencies": [f"my_project=={PROJECT_VERSION}"],
            },
            signature=None,
        ),
    )

When running the pipeline, KedroMLFlow does not recognise dataset_a.model as an artefact that should be uploaded. It instead throws an error:

kedro_mlflow.mlflow.kedro_pipeline_model.KedroPipelineModelError: The provided catalog must contains 'dataset_a.model' dataset since it is the input of the pipeline.

It works if I add the full name to the catalogue:

dataset_a.model:
  type: pickle.PickleDataset
  filepath: data/02_models/dataset_a/model.pkl

But then I lose the benefits of the nice naming patterns :)

Context

This is important since I am working on a project where we re-use the model pipelines across many datasets. We separate the datasets using namespaces. Hence, it would greatly help to use the naming patterns to reduce the work every time a new dataset is added.

Possible Implementation

This line raises the exception: https://github.com/Galileo-Galilei/kedro-mlflow/blob/master/kedro_mlflow/mlflow/kedro_pipeline_model.py#L119

It turns out that the DataCatalog passed to the KedroPipelineModel constructor in the after_pipeline_run hook does not contain catalogue entries with factory patterns: https://github.com/Galileo-Galilei/kedro-mlflow/blob/e0033c5072c929a4c26cfaeaf61fcedf93d36522/kedro_mlflow/framework/hooks/mlflow_hook.py#L353C1-L366

My workaround for now is to update the DataCatalog to resolve the factory patterns before sending it to KedroPipelineModel:

I use this approach from kedro catalog resolve. I add a helper function to mlflow_hooks.py:

def _trim_filepath(self, project_path: str, file_path: str) -> str:
    return file_path.replace(project_path, "", 1)

def _resolve_catalog(self, context: KedroContext, catalog: DataCatalog, pipeline: Pipeline):
    catalog_config = context.config_loader["catalog"]
    explicit_datasets = {
        ds_name: ds_config
        for ds_name, ds_config in catalog_config.items()
        if not catalog._is_pattern(ds_name)
    }

    datasets = pipeline.datasets()
    for ds_name in datasets:
        is_param = ds_name.startswith("params:") or ds_name == "parameters"
        if ds_name in explicit_datasets or is_param:
            continue

        matched_pattern = catalog._match_pattern(
            catalog._dataset_patterns, ds_name
        )
        if matched_pattern:
            ds_config_copy = copy.deepcopy(
                catalog._dataset_patterns[matched_pattern]
            )

            ds_config = catalog._resolve_config(
                ds_name, matched_pattern, ds_config_copy
            )

            ds_config["filepath"] = self._trim_filepath(
                str(context.project_path) + "/", ds_config["filepath"]
            )
            explicit_datasets[ds_name] = ds_config

    return DataCatalog.from_config(explicit_datasets)

And use it like this in the after_pipeline_run hook:

...
resolved_catalog = self._resolve_catalog(self.context, catalog, pipeline)

with TemporaryDirectory() as tmp_dir:
    # This will be removed at the end of the context manager,
    # but we need to log in mlflow before moving the folder
    kedro_pipeline_model = KedroPipelineModel(
        pipeline=pipeline.inference,
        catalog=resolved_catalog,
        input_name=pipeline.input_name,
        **pipeline.kpm_kwargs,
    )

...

Could this be a potential solution to the problem? Or is there a simpler way that I have totally missed :)

Galileo-Galilei commented 5 months ago

Glad to see you're using the pipeline_ml_factory, this is an underestimated feature of the plugin which is not well know I guess ;)

Good catch, I've seen a bunch of issues like this since the release of dataset factories. I think we can just call dataset.exists() for each datasets to force the catalog to materialize them, which should make your code much simpler, something like this :

https://github.com/takikadiri/kedro-boot/commit/ac758fce16ad81cb01b6ac5b549c104100a13bb0

I'll try to push a fix soon, but PR are welcome!

sebastiandro commented 5 months ago

Thank you for that suggestion, @Galileo-Galilei; that seemed to do the trick! I opened a PR: https://github.com/Galileo-Galilei/kedro-mlflow/pull/519, but unfortunately, I missed linking to this issue or set you as a reviewer 🙈 I can't seem to edit it now after I opened it.