kedro-org / kedro

Kedro is a toolbox for production-ready data science. It uses software engineering best practices to help you create data engineering and data science pipelines that are reproducible, maintainable, and modular.
Apache License 2.0
10.03k stars 906 forks source link

[DataCatalog]: Catalog to config #4329

Open ElenaKhaustova opened 2 weeks ago

ElenaKhaustova commented 2 weeks ago


Implement KedroDataCatalog.to_config()method as a part of catalog serialization/deserialization feature




Solution description

We consider 3 different ways of loading datasets:

  1. Lazy datasets loaded from the config — in this case, we store the dataset configuration at the catalog level; the dataset object is not instantiated.
  2. Materialized datasets loaded from the config — we store the dataset configuration at the catalog level and use dataset.from_config() method to instantiate dataset which calls the underlying dataset constructor.
  3. Materialized datasets added to the catalog — instantiated datasets' objects are passed to the catalog, dataset configuration is not stored at the catalog level.

1 - can be solved at the catalog level 2 and 3 require retrieving dataset configuration from instantiated dataset object

Solution for 2 and 3 avoiding existing datasets' modifications (as per requirements)

  1. Use AbstractDataset.__init_subclass__ which allows to change the behavior of subclasses from inside the AbstractDataset:
  2. Create a decorator for child init
  3. Save the original child init
  4. Replace original child init with decorated init calling post init where we save call args:
  5. Save call args at the level of AbstractDataset in the _init_args field.
  6. Implement AbstractDataset.to_config() to retrieve configuration from the instantiated dataset object based on the object's _init_args.

Implement KedroDataCatalog.to_config

Once 2 and 3 are solved, we can implement a common solution at the catalog level. For that, we need to consider cases when we work with lazy and materialized datasets and retrieve configuration from the catalog or using AbstractDataset.to_config().

After the configuration is retrieved, we need to "unresolve" the credentials and keep them in a separate dictionary, as we did when instantiating the catalog. For that CatalogConfigResolver.unresolve_config_credentials() method can be implemented to undo the result of CatalogConfigResolver._resolve_config_credentials().

Excluding parameters and MemoryDatasets

We need to exclude MemoryDatasets as well as parameters

Not covered cases

Issues blocking further implementation

Tested with

How to test

astrojuanlu commented 1 week ago

Non-serializable objects or objects required additional logic implemented at the level of dataset to save/load them:

Wouldn't it be possible to force datasets to only have static, primitive properties in the __init__ method so that serialising them is trivial?

For example, rather than having

class GBQQueryDataset:
    def __init__(self, ...):
        self._credentials = google.oauth2.credentials.Credentials(**credentials)
        self._client =

    def _exists(self) -> bool:
        table_ref = self._client...

we do

class GBQQueryDataset(pydantic.BaseModel):
    credentials: dict[str, str]

    def _get_client(self) ->
        return bigquery.Client(credentials=google.oauth2.credentials.Credentials(**self.credentials))

    def _exists(self) -> bool:
        table_ref = self._get_client()...


(I picked Pydantic here given that there's prior art but dataclasses would work similarly)

ElenaKhaustova commented 1 week ago

Non-serializable objects or objects required additional logic implemented at the level of dataset to save/load them:

Wouldn't it be possible to force datasets to only have static, primitive properties in the __init__ method so that serialising them is trivial?

That would be an ideal option, as a common solution would work out of the box without corner cases. However, it would require more significant changes on the datasets' side.

As a temporal solution without breaking change, we can try extending parent AbstractDataset.to_config() at the dataset level for those datasets and serializing such objects. However, I cannot guarantee that we'll be able to cover all the cases.

ElenaKhaustova commented 1 week ago

Test example

from import KedroDataCatalog, Version
from kedro_datasets.pandas import ExcelDataset

config = {
    "cached_ds": {
        "type": "CachedDataset",
        "versioned": "true",
        "dataset": {
            "type": "pandas.CSVDataset",
            "filepath": "data/01_raw/reviews.csv",
            "credentials": "cached_ds_credentials",
        "metadata": [1, 2, 3]
    "cars": {
        "type": "pandas.CSVDataset",
        "filepath": "data/01_raw/reviews.csv"
    "{dataset_name}": {
        "type": "pandas.CSVDataset",
        "filepath": "data/01_raw/{dataset_name}.csv"
    "boats": {
        "type": "pandas.CSVDataset",
        "filepath": "data/01_raw/companies.csv",
        "credentials": "boats_credentials",
        "save_args": {
            "index": False
    "cars_ibis": {
        "type": "ibis.FileDataset",
        "filepath": "data/01_raw/reviews.csv",
        "file_format": "csv",
        "table_name": "cars",
        "connection": {
            "backend": "duckdb",
            "database": "company.db"
        "load_args": {
            "sep": ",",
            "nullstr": "#NA"
        "save_args": {
            "sep": ",",
            "nullstr": "#NA"

credentials = {
    "boats_credentials": {
        "client_kwargs": {
            "aws_access_key_id": "<your key id>",
            "aws_secret_access_key": "<your secret>"
    "cached_ds_credentials": {
        "test_key": "test_val"

version = Version(
    load="fake_load_version.csv",  # load exact version
    save=None,  # save to exact version

versioned_dataset = ExcelDataset(
    filepath="data/01_raw/shuttles.xlsx", load_args={"engine": "openpyxl"}, version=version

def main():
    catalog = KedroDataCatalog.from_config(config, credentials)
    _ = catalog["reviews"]
    catalog["versioned_dataset"] = versioned_dataset
    catalog["memory_dataset"] = "123"
    print("-" * 20, "Catalog", "-" * 20)
    print(catalog, "\n")
    print("-" * 20, "Catalog to config", "-" * 20)

    _config, _credentials, _load_version, _save_version = catalog.to_config()
    print(_config, "\n")
    print(_credentials, "\n")
    print(_load_version, "\n")
    print(_save_version, "\n")
    print("-" * 20, "Catalog from config", "-" * 20)

    _catalog = KedroDataCatalog.from_config(_config, _credentials, _load_version, _save_version)
    # Materialize datasets
    for ds in _catalog.values():
    print(_catalog, "\n")
    print("-" * 20, "Catalog from config to config", "-" * 20)

    _config, _credentials, _load_version, _save_version = _catalog.to_config()
    print(_config, "\n")
    print(_credentials, "\n")
    print(_load_version, "\n")
    print(_save_version, "\n")

if __name__ == "__main__":