getindata / kedro-azureml

Kedro plugin to support running workflows on Microsoft Azure ML Pipelines
https://kedro-azureml.readthedocs.io
Apache License 2.0
37 stars 17 forks source link

AzureMLAssetDataset's credentials are not passed correctly when used in a dataset factory #160

Open AlexandreOuellet opened 1 month ago

AlexandreOuellet commented 1 month ago

When used in a dataset factory, azureml's credentials are not setup correctly and creates a crash instead. For instance, the following crashes with missing credentials :

"{name}_csv":
    type: utils.azure_ml_dataset.AzureMLDataset
    azureml_dataset: raw
    root_dir: data/01_raw/
    dataset:
        type: pandas.CSVDataset
        filepath: "{name}.csv"

The issue is that when the catalog is created, the dataset factories are not actual datasets, but dataset_pattern, which are resolved afterward. Then the hook for after_catalog_created is called, but still no actual dataset for the dataset_pattern.

I've discussed with some people on kedro's slack channels, and they seem to agree that the best way to pass the credentials would be with a after_context_created hook.

I'll open a pull request soon to fix that, with the following pattern instead :

"{name}_csv":
    type: utils.azure_ml_dataset.AzureMLDataset
    azureml_dataset: raw
    root_dir: data/01_raw/
    credentials: azureml # added credentials, and implicited the credentials to azureml
    dataset:
        type: pandas.CSVDataset
        filepath: "{name}.csv"
AlexandreOuellet commented 1 month ago

Here's my current workaround :

I create a custom Dataset :

from typing import Any, Dict, Literal, Optional, Type, Union

from kedro.io.core import (
    VERSION_KEY,
    VERSIONED_FLAG_KEY,
    AbstractDataset,
    Version,
)

from kedro_azureml.config import AzureMLConfig

from kedro_azureml.datasets.asset_dataset import AzureMLAssetDataset

AzureMLDataAssetType = Literal["uri_file", "uri_folder"]

class AzureMLDataset(AzureMLAssetDataset):
    def __init__(
        self,
        azureml_dataset: str,
        dataset: Union[str, Type[AbstractDataset], Dict[str, Any]],
        credentials=None, # passing the credentials
        root_dir: str = "data",
        filepath_arg: str = "filepath",
        azureml_type: AzureMLDataAssetType = "uri_folder",
        version: Optional[Version] = None,
        metadata: Dict[str, Any] = None,
    ):
        super().__init__(
            azureml_dataset,
            dataset,
            root_dir,
            filepath_arg,
            azureml_type,
            version,
            metadata,
        )

        # Configure azureml_config according to credentials (for dataset factories)
        self._azureml_config = AzureMLConfig(**credentials) if credentials else None

I also have a custom hook :

from kedro.framework.hooks import hook_impl

from kedro_azureml.config import AzureMLConfig
from utils.azure_ml_dataset import AzureMLDataset
from kedro_azureml.runner import AzurePipelinesRunner

from kedro.io import DataCatalog
from kedro.pipeline import Pipeline

class AMLRunHook:

    @hook_impl
    def after_context_created(self, context) -> None:
        # Inject credentials into the context as a "credentials" key
        if "azureml" not in context.config_loader.config_patterns.keys():
            context.config_loader.config_patterns.update(
                {"azureml": ["azureml*", "azureml*/**", "**/azureml*"]}
            )

        self.azure_config = AzureMLConfig(**context.config_loader["azureml"]["azure"])

        azure_creds = {"azureml": self.azure_config.__dict__}

        context.config_loader["credentials"] = {
            **context.config_loader["credentials"],
            **azure_creds,
        }

    @hook_impl
    def before_pipeline_run(self, run_params, pipeline: Pipeline, catalog: DataCatalog):
        """Hook implementation to change dataset path for local runs.
            Modified to handle when a dataset is a pattern dataset
        Args:
            run_params: The parameters that are passed to the run command.
            pipeline: The ``Pipeline`` object representing the pipeline to be run.
            catalog: The ``DataCatalog`` from which to fetch data.
        """
        for input in pipeline.all_inputs():
            if input in catalog:
                dataset = catalog._get_dataset(input)
                if isinstance(dataset, AzureMLDataset):
                    if AzurePipelinesRunner.__name__ not in run_params["runner"]:
                        # when running locally using an AzureMLAssetDataset
                        # as an intermediate dataset we don't want download
                        # but still set to run local with a local version.
                        if input not in pipeline.inputs():
                            dataset.as_local_intermediate()
                        # when running remotely we still want to provide information
                        # from the azureml config for getting the dataset version during
                        # remote runs
                    else:
                        dataset.as_remote()

                    catalog.add(input, dataset, replace=True)

aml_run_hook = AMLRunHook()

After that, I ensure that my hook is loaded correctly in settings.py

from my_pipeline.hooks import aml_run_hook
HOOKS = (aml_run_hook,)

For usage, it looks as follow :

"{name}_csv":
  type: utils.azure_ml_dataset.AzureMLDataset
  azureml_dataset: imcd_raw
  root_dir: data/01_raw/
  credentials: azureml
  dataset:
      type: pandas.CSVDataset
      filepath: "{name}.csv"

After that, you should be able to do your regular kedro azureml run and get the results as expected in Azure ML. The PR will take a while longer to come in, as I have not yet looked into properly adding unit tests (everything else is green though), but it seems to work correctly in my use case, and at least there's the workaround for those wishing to use pipeline datasets.

AlexandreOuellet commented 1 month ago

and a bit more code for when you send the pipeline to azure :

from kedro.io import DataCatalog
import re

class CustomList(list):
    def __init__(self, catalog: DataCatalog, data) -> None:
        super().__init__(data)
        self._catalog = catalog

    def __contains__(self, data_set_name):
        """Check if an item is in the catalog as a materialised dataset or pattern"""
        return data_set_name in self._catalog

class CustomDataCatalog(DataCatalog):
    def list(self, regex_search: str | None = None) -> list[str]:
        if regex_search is None:
            return CustomList(self, self._data_sets.keys())

        if not regex_search.strip():
            self._logger.warning("The empty string will not match any data sets")
            return CustomList(self, [])

        try:
            pattern = re.compile(regex_search, flags=re.IGNORECASE)

        except re.error as exc:
            raise SyntaxError(
                f"Invalid regular expression provided: '{regex_search}'"
            ) from exc
        return CustomList(
            self,
            [dset_name for dset_name in self._data_sets if pattern.search(dset_name)],
        )

    def __contains__(self, data_set_name):
        """Check if an item is in the catalog as a materialised dataset or pattern"""
        matched_pattern = self._match_pattern(self._dataset_patterns, data_set_name)
        if data_set_name in self._data_sets or matched_pattern:
            x = self._get_dataset(data_set_name)
            return True
        return False

The main issue is that for some reason, generator.py insist on using node_input in catalog.list() instead of node_input in catalog, which has the issue of not settings some values for the AzureMLAssetDataset. I use a custom catalog to reproduce exactly the same behaviour, but instead I return a custom list, which does what generator.py is supposed to do : verify that node_input in catalog, just in a roundabout way. There must be a better way to change the code in kedro-azureml for this, but for now this works correctly for us