kedro-org / kedro

Kedro is a toolbox for production-ready data science. It uses software engineering best practices to help you create data engineering and data science pipelines that are reproducible, maintainable, and modular.
https://kedro.org
Apache License 2.0
9.94k stars 903 forks source link

Extendable `DataCatalog` that can be imported into projects #4085

Open eduheise-andela opened 2 months ago

eduheise-andela commented 2 months ago

Description

We have different applications and we need decoupled the parameters. The query should be the same for two different environments.

To solve that, we built a library that should store the DataCatalog and both applications should load from there. All the catalog was transformed to Python code, such as:

query_example = SQLQueryDataset(
    credentials=credentials,
    sql="""
    SELECT *
    FROM example_table""",
)

CATALOG = {"query_example": query_example}

And then we tried to load it in the settings.py file like this:

"""Project settings. There is no need to edit this file unless you want to change values
from the Kedro defaults. For further information, including these default values, see
https://kedro.readthedocs.io/en/stable/kedro_project_setup/settings.html."""

# Class that manages how configuration is loaded.
from kedro.config import OmegaConfigLoader
from kedro.io import DataCatalog
from omegaconf.resolvers import oc
from dotenv import load_dotenv, find_dotenv

load_dotenv(find_dotenv())

from custom_library.catalog import CATALOG

CONFIG_LOADER_CLASS = OmegaConfigLoader

CONFIG_LOADER_ARGS = {
    "custom_resolvers": {
        "oc.env": oc.env,
    },
    "config_patterns": {
        "catalog": ["catalog*", "catalog*/**", "**/*catalog*"],
        "parameters": ["**/*parameters*"],
    },
}
DATA_CATALOG_CLASS = DataCatalog
DATA_CATALOG_ARGS = CATALOG

It turns out that Kedro is still trying to load catalogs, and failing in the process:

MissingConfigException: No files of YAML or JSON format found in 
/Users/user/kedro_application_01/conf or 
/Users/user/kedro_application_01/conf matching the glob 
pattern(s): ['catalog*', 'catalog*/**', '**/*catalog*']

Documentation page (if applicable)

https://docs.kedro.org/en/stable/data/advanced_data_catalog_usage.html https://docs.kedro.org/en/stable/api/kedro.config.OmegaConfigLoader.html https://docs.kedro.org/en/stable/kedro_project_setup/settings.html

Context

Both kedro_application_01 and kedro_application_02 should consume the data catalog from the custom_library.catalog.

noklam commented 2 months ago

Can you explains how did you run into that error? What script/command did you run?

DATA_CATALOG_CLASS = DataCatalog
DATA_CATALOG_ARGS = CATALOG

I am confused as CATALOG seems to be catalog entries rather than argument to the DataCatalog constructor. Are you trying to use Kedro project, instead of using the YAML file you want to use Python code?

eduheise-andela commented 2 months ago

The error happened when I tried to run a pipeline that consumes parameters from the catalog:

kedro run --pipeline=cache_queries

Regarding the parameters, I used in the same format as this documentation shows:

from kedro.io import DataCatalog
from kedro_datasets.pandas import (
    CSVDataset,
    SQLTableDataset,
    SQLQueryDataset,
    ParquetDataset,
)

catalog =  DataCatalog(
    {
        "bikes": CSVDataset(filepath="../data/01_raw/bikes.csv"),
        "cars": CSVDataset(filepath="../data/01_raw/cars.csv", load_args=dict(sep=",")),
        "cars_table": SQLTableDataset(
            table_name="cars", credentials=dict(con="sqlite:///kedro.db")
        ),
        "scooters_query": SQLQueryDataset(
            sql="select * from cars where gear=4",
            credentials=dict(con="sqlite:///kedro.db"),
        ),
        "ranked": ParquetDataset(filepath="ranked.parquet"),
    }
)

The final result should be a catalog unified between different Kedro applications. One of them will be made available to run in a production environment controlled by tags, and the other as development.

We don't need it as Python code, it could be YAML if it's easier. But we should be able to install it as a library into the application. I believe that transforming it into Python code would make it easier to make this move.

eduheise-andela commented 2 months ago

The reason why I was facing that error it's because I have deleted the catalog in the process. I created a different one, incomplete, and it turns out that Kedro is not loading the catalogs in the settings file.

All of these entries are present in the CATALOG variable (I changed them due to confidentially issues). They aren't present in the YAML catalog.yml file.

ValueError: Pipeline input(s) {'bikes', 'cars', 'cars_table', 
'scooters_query', 'ranked'} not found in the DataCatalog
eduheise-andela commented 2 months ago

Keeping the record, I finally made it but it's sort of a bodge.

The solution was creating a class that inherits from OmegaConfigLoader and adding new catalog entries in the constructor:

from typing import Callable, Any
from kedro.config import OmegaConfigLoader
from custom_library.catalog import CATALOG

class CustomConfigLoader(OmegaConfigLoader):

    def __init__(
        self,
        conf_source: str,
        env: str | None = None,
        runtime_params: dict[str, Any] | None = None,
        *,
        config_patterns: dict[str, list[str]] | None = None,
        base_env: str | None = None,
        default_run_env: str | None = None,
        custom_resolvers: dict[str, Callable] | None = None,
        merge_strategy: dict[str, str] | None = None,
    ):

        super().__init__(
            conf_source=conf_source,
            env=env,
            runtime_params=runtime_params,
            config_patterns=config_patterns,
            base_env=base_env,
            default_run_env=default_run_env,
            custom_resolvers=custom_resolvers,
            merge_strategy=merge_strategy,
        )
        self["catalog"] = {**self["catalog"], **CATALOG}

This class should be updated in the settings.py file as well:

"""Project settings. There is no need to edit this file unless you want to change values
from the Kedro defaults. For further information, including these default values, see
https://kedro.readthedocs.io/en/stable/kedro_project_setup/settings.html."""

# Class that manages how configuration is loaded.
from omegaconf.resolvers import oc
from dotenv import load_dotenv, find_dotenv

load_dotenv(find_dotenv())

from custom_library.config_loader import CustomConfigLoader

CONFIG_LOADER_CLASS = CustomConfigLoader
CONFIG_LOADER_ARGS = {
    "custom_resolvers": {
        "oc.env": oc.env,
    },
    "config_patterns": {
        "catalog": ["catalog*", "catalog*/**", "**/*catalog*"],
        "parameters": ["**/*parameters*"],
    },
}

Now Kedro is loading from the current project and the library with the DataCatalogunified. Please, add something about that in the documentation if there's a better way to do that. Adding a new entry into config_patterns.catalog might solve this as well, but since the path should be relative and the library is going to be installed, it might cause some errors in the process.

noklam commented 2 months ago

@eduheise-andela I have updated the title, since I don't think this is related to coupling/de-coupling. The question here seems to be that you want to use Python (or at least a mix of Python) instantiated class for DataCatalog.

The final result should be a catalog unified between different Kedro applications. One of them will be made available to run in a production environment controlled by tags, and the other as development.

We don't need it as Python code, it could be YAML if it's easier. But we should be able to install it as a library into the application. I believe that transforming it into Python code would make it easier to make this move.

I don't understand this part, can you elaborate on this? Do you mean you want to have a shareable DataCatalog that can be imported to an existing project (and enrich)?

Just want to confirm. is CATALOG datasets or definition of datasets?

catalog =  DataCatalog(
    {
        "bikes": CSVDataset(filepath="../data/01_raw/bikes.csv"),
        "cars": CSVDataset(filepath="../data/01_raw/cars.csv", load_args=dict(sep=",")),
        "cars_table": SQLTableDataset(
            table_name="cars", credentials=dict(con="sqlite:///kedro.db")
        ),
        "scooters_query": SQLQueryDataset(
            sql="select * from cars where gear=4",
            credentials=dict(con="sqlite:///kedro.db"),
        ),
        "ranked": ParquetDataset(filepath="ranked.parquet"),
    }
)

As you use

 self["catalog"] = {**self["catalog"], **CATALOG}

The first argument is dictionary of paramters (which are string), the second are dictionary of dataset class.

eduheise-andela commented 2 months ago

We don't necessarily need Python-instantiated datasets. I thought it was easier to import, but I found it quite difficult in fact. I couldn't find documentation that made importing Python-instantiated datasets into Kedro possible. Just consuming it through code with the .save() and .load() functions.

I don't understand this part, can you elaborate on this? Do you mean you want to have a shareable DataCatalog that can be imported to an existing project (and enrich)?

Exactly, we must find a way to add an external DataCatalog, shared between multiple applications, into our framework. The original catalog is coupled into Kedro's base files (conf/base/catalog.yml), and we need them decoupled in an external source (such as a Python module).

Just want to confirm. is CATALOG datasets or the definition of datasets?

The first version was datasets, but I had to change it to the definition of datasets to make it work. Now it's the definition of datasets.