kedro-org / kedro

Kedro is a toolbox for production-ready data science. It uses software engineering best practices to help you create data engineering and data science pipelines that are reproducible, maintainable, and modular.
https://kedro.org
Apache License 2.0
9.88k stars 893 forks source link

DataCatalog.shallow_copy() destroys any `CustomDataCatalog` type object #3857

Closed tzdanowicz closed 3 months ago

tzdanowicz commented 4 months ago

Description

kedro runner calls data catalog shallow_copy that always return new DataCatalog object type, destroying any CustomDataCatalog object being copied.

Context

I created custom PickleDataCatalog class in order to handle dynamically multiple pickle objects on after_node_run hook event. My custom datacatalog and hooks were properly set in src/my_project_name/settings.py

from .hooks import ModelSavingHook  # noqa: F401
HOOKS = (ModelSavingHook(),)
...
from .pickle import PickleDataCatalog
DATA_CATALOG_CLASS = PickleDataCatalog

Turned out that preset DATA_CATALOG_CLASS reference is lost during pipeline lifecycle and the catalog param is DataCatalog type instead of expected custom PickleDataCatalog

Steps to Reproduce

1) create simple custom PickleDataCatalog class

class PickleDataCatalog(DataCatalog):
    def __init__(self, pickle_directory : str =  'data/models/', *args, **kwargs):
        super().__init__(*args, **kwargs)
        logging.info(f"PickleDataCatalog instance created, id: {id(self)}")
        self._pickle_directory = pickle_directory

    def save(self, name: str, data: Any):
        if name not in self._datasets:
            self.add(name, PickleDataset(filepath=os.path.join(self._pickle_directory, f"{name}.pickle")))
        super().save(name, data)

2) create hooks.pywith custom ModelSavingHook handler, to properly maintain files:

class ModelSavingHook:
    """Hook to save models to disk after they are run."""

    @hook_impl
    def before_node_run(  # noqa: PLR0913
        self,
        node: Node,
        catalog: DataCatalog,
        inputs: dict[str, Any],
        is_async: bool,
        session_id: str,
    ) :
        pass # load everything that is required

    @hook_impl
    def after_node_run(self, catalog: PickleDataCatalog, outputs: Dict[str, Any], node, inputs: Dict[str, Any]):
        logging.info(f"Catalog type in hook: {type(catalog)}, id: {id(catalog)}")
        if not isinstance(catalog, PickleDataCatalog):
                raise TypeError(
                    f"Expected `PickleDataCatalog`, got {type(catalog)}"
                )
        # iterating output and saving objects to passed `catalog` param

3) update src/my_project_name/settings.py

from .hooks import ModelSavingHook  # noqa: F401
HOOKS = (ModelSavingHook(),)
...
from .pickle import PickleDataCatalog
DATA_CATALOG_CLASS = PickleDataCatalog

4) kedro run

Expected Result

While investigating pipeline lifecycle, i can see that custom DataCatalog is properly propagated among following events:

Unfortunatelly it is lost on before_node_run and after_node_run, expected :

@hook_impl
    def before_node_run(  # noqa: PLR0913
        self,
        node: Node,
        catalog: DataCatalog,
        inputs: dict[str, Any],
        is_async: bool,
        session_id: str,
    ) :

   assert isinstance(catalog, PickleDataCatalog) == True

Actual Result

Received catalog object on before_node_run is different than on after_catalog_created and before_pipeline_run! instead of expected PickleDataCatalog there is passed default kedro.io.DataCatalog (check id(catalog)} on all events)

Cause of a problem

Turned out in kedro.runner there is custom shallow_copy call on catalog

def run(
        self,
        pipeline: Pipeline,
        catalog: DataCatalog,
        hook_manager: PluginManager | None = None,
        session_id: str | None = None,
    ) -> dict[str, Any]:
    ..
    catalog = catalog.shallow_copy()
    ...

where

def shallow_copy(
        self, extra_dataset_patterns: Patterns | None = None
    ) -> DataCatalog:
        """Returns a shallow copy of the current object.

        Returns:
            Copy of the current object.
        """
        ...
        return DataCatalog(
            datasets=self._datasets,
            dataset_patterns=dataset_patterns,
            load_versions=self._load_versions,
            save_version=self._save_version,
        )

In order to fix that bug we need to use the actual type of catalog object - which would be:

def shallow_copy(
        self, extra_dataset_patterns: Patterns | None = None
    ) -> DataCatalog:
        """Returns a shallow copy of the current object.

        Returns:
            Copy of the current object.
        """
        ...
        return self.__class__(
            datasets=self._datasets,
            dataset_patterns=dataset_patterns,
            load_versions=self._load_versions,
            save_version=self._save_version,
        )

Your Environment

merelcht commented 4 months ago

Hi @tzdanowicz, thank you for creating this issue. This sounds like a bug on our end, so I'll make sure we address it and put it on our backlog.

merelcht commented 3 months ago

@tzdanowicz this has been fixed in #3950 and will be released in our next release 0.19.7