kedro-org / kedro

Kedro is a toolbox for production-ready data science. It uses software engineering best practices to help you create data engineering and data science pipelines that are reproducible, maintainable, and modular.
https://kedro.org
Apache License 2.0
9.88k stars 894 forks source link

Allow returning modified dataset in `after_dataset_loaded` and `before_dataset_saved` hook #3890

Open mjspier opened 4 months ago

mjspier commented 4 months ago

Description

It would be nice if we could modify the data in the after_dataset_loaded and before_dataset_saved hooks and return it so when the data arrives in the node or is saved it uses the modified data.

This is already possible with the before_node_run hook where we can return a dict with dataset name and modified data. (it is not possible in the after_node_run hook, to modify the output data)

Context

Example usage could be to validate and convert the data according to a schema in the hook and the converted dataframe is propagated to the node step or to the save step.

Possible Implementation

Possible Alternatives

merelcht commented 4 months ago

Hi @mjspier, thanks for creating this issue. Just to understand this a bit better, couldn't you just create another node to do validation or conversion? From my point of view, it seems that modifying data through a hook instead would make it harder to track what's happened and reproduce the behaviour.

mjspier commented 4 months ago

Hi @merelcht, thanks for the reply. Yes indeed I think this feature should be used carefully. But I'm currenlty working on a hook which validates datasets with pandera and I think it would be nice if the validated dataset can be returned which has the data types converted to the types defined in the pandera schema. This would allow a configuration similar to this in the catalog.yml

"data":
  type: pandas.CSVDataset
  filepath: data/01_raw/data.csv
  metadata:
    pandera:
        schema: ${pa.python:data.schema}
        convert: True

And the nodes which are loading the data would receive the dataset converted according to the schema types.

I think for this case it would be nice to have the abbility to return the converted dataset already in the hook without the need to create a node in a pipeline.

merelcht commented 4 months ago

@mjspier have you seen https://github.com/Galileo-Galilei/kedro-pandera by any chance? Also tagging @noklam who might have some thoughts on this.

noklam commented 4 months ago

It looks like @mjspier is already using kedro-pandera. I do not think modifying data in hooks is a good idea in general:

Can you give an example of this?

This is already possible with the before_node_run hook where we can return a dict with dataset name and modified data. (it is not possible in the after_node_run hook, to modify the output data)

I am not sure if this statement is true. Here is how the hooks are called:

        hook_manager.hook.before_dataset_loaded(dataset_name=dataset_name, node=node)
        return_ds = catalog.load(dataset_name)
        hook_manager.hook.after_dataset_loaded(
            dataset_name=dataset_name, data=return_ds, node=node
        )

It doesn't return any data so how would you modify data and inject this into the pipeline? (This is something Kedro try very hard to avoid, immutability is important)

mjspier commented 4 months ago

@merelcht Yes indeed I was actually working on a PR for the kedro-pandera plugin

@noklam I fully understand your point. I will think of another way how to incorporate data type convertion. Somehow it would be nice if that feature could be enabled with a plugin and a configuration in the catalog without the need of creating nodes in the pipeline. If you have any idea let me know.

About the possibility of changing the input data in the hook, it is possible with the before_node_run not the before_dataset_loaded hook.

There the hook_response is later used to update the input data

 hook_response = hook_manager.hook.before_node_run(
        node=node,
        catalog=catalog,
        inputs=inputs,
        is_async=is_async,
        session_id=session_id,
    )