kedro-org / kedro

Kedro is a toolbox for production-ready data science. It uses software engineering best practices to help you create data engineering and data science pipelines that are reproducible, maintainable, and modular.
https://kedro.org
Apache License 2.0
9.88k stars 897 forks source link

Replacing a dataset in the context.io catalogue doesn't work #632

Closed hugo-quantmetry closed 3 years ago

hugo-quantmetry commented 3 years ago

Hello!

Description

I have a parameter in a yaml file: my_param : param_from_yml

Case 1 : Trying to replace a dataset of the context.io catalog:

context = load_context(path_to_project)
dataset = MemoryDataSet(data="param_from_code")
context.io.add("params:my_param", dataset, replace=True)
print(context.io.load("params:my_param"))

Output : "param_from_yml" The replacement did not occur even though the logs says

WARNING - Replacing DataSet 'params:my_param'

Case2: Trying to replace a dataset in a "copy" (reference to?) the context.io catalog:

context = load_context(path_to_project)
dataset = MemoryDataSet(data="param_from_code")
my_catalog = context.io
my_catalog.add("params:my_param", dataset, replace=True)
print(my_catalog.load("params:my_param"))

Output : "param_from_code"

Context

I want to use the Code API to run a pipeline that combines:

I can do that using the code from "Case 2" above as follows:

my_pipeline = # define pipeline
my_runner = SequentialRunner()
my_runner.run(pipeline=my_pipeline, catalog=my_catalog)

But Ideally I would like do use the method context.run directly to benefit from functionalities such as hooks. For that purpose I need parameters/datasets values to be modified in context.io

Is there a way to do that?

Your Environment

Include as many relevant details about the environment in which you experienced the bug:

deepyaman commented 3 years ago

But Ideally I would like do use the method context.run directly to benefit from functionalities such as hooks.

Have you tried passing extra_params to the context? See https://kedro.readthedocs.io/en/latest/kedro.framework.context.KedroContext.html. This is what happens if you use kedro run --params ....

hugo-quantmetry commented 3 years ago

Just tried with the extra_params argument:

load_context(path_to_project, extra_params={"my_param": "param_from_extra"})
context.run(pipeline_name="my_pipeline")

Output : "param_from_extra" => It works :)

That solves the problem for parameters but how could I do that for a DataFrame? I can pass a DataFrame as extra_params but the original value in parameters.yml can't be a DataFrame oO

deepyaman commented 3 years ago

I'm not 100% sure I've understood your use case, but I believe adding it to catalog with replace=True is the correct approach. If you just want this on run, you can do the replacements/additions in the before_pipeline_run hook (I do this in https://github.com/deepyaman/kedro-accelerator/blob/develop/src/kedro_accelerator/plugins/__init__.py myself). Would that satisfy your use case? If I'm misunderstanding your need, please let me know.

hugo-quantmetry commented 3 years ago

Thanks for the reply, your solution works well to run pipeline in batch mode.

In my use-case I would like to run a kedro pipeline just like I execute a python function using dynamic arguments/datasets (Not known a priori, obtained at runtime).

I managed to do that using the Code API however I have the following questions:

  1. Can I use a mix of static datasets (defined through the yml files) and dynamic python objects as pipeline inputs?
  2. Can I use kedro hooks when running pipeline through the Code API?

1. Mixing static and dynamic datasets

I solved it by

context = load_context(path_to_project)
dataset = MemoryDataSet(data="param_from_code")
my_catalog = context.io
my_catalog.add("params:my_param", dataset, replace=True)
print(my_catalog.load("params:my_param"))

# Run pipeline
my_pipeline = ...
SequentialRunner().run(my_pipeline, my_catalog)

2. Using hooks in the Code API

I hope this makes my use-case clearer.

deepyaman commented 3 years ago
  • If I use context.run(pipeline_name) I do have hooks but I only manage to load the static datasets (the one defined in the yaml files). See Case 1 in my original post.

For datasets, can you use this solution + before_pipeline_run hook to replace catalog entries?

Also, as a disclaimer, I'm not a core Kedro team member and may be missing a simple solution somebody more familiar with the framework has. :)

hugo-quantmetry commented 3 years ago

Thanks for your ideas, I'm moving forward step by step ^^

Summary of what I am trying to do

Have a pipeline that works both in batch and online mode:

On top of that, I want my pipeline to run all the hooks Ex: I set some hook attributes in the before_pipeline_run hook and reuse those attributes in after_node_run hooks. If before_pipeline_run is not executed my pipeline fails.

My solution

My solution basically mimics what context.run does but using a newly defined DataCatalog that takes datasets from both 1) function arguments and 2) datasets defined in the catalog.yml files. This is fine but being able to use context.run directly would make my code more resilient to Kedro updates (and less hacky).

def run_pipeline(pipeline_name, context, runner="sequential", arg_inputs=None):
    catalog = DataCatalog()
    yaml_catalog = context.io
    pipeline = context._get_pipeline(name=pipeline_name)
    hook_manager = get_hook_manager()
    pipeline_inputs = pipeline.inputs()

    if runner == "parallel":
        runner = ParallelRunner()
    else:
        runner = SequentialRunner()

    for input_name in pipeline_inputs:
        copy_mode = None

        if input_name in arg_inputs:
            input_data = arg_inputs[input_name]

            # SQL Alchemy engine needs "assign" mode
            if type(input_data).__name__ == "Engine":
                copy_mode = "assign"

        elif input_name in yaml_catalog.list():
            input_data = yaml_catalog.load(input_name)

        else:
            raise AttributeError(f"Missing input {input_name}")

        dataset = MemoryDataSet(data=input_data, copy_mode=copy_mode)
        catalog.add(input_name, dataset, replace=True)

    run_params = {
        "run_id": None,
        "env": context.env,
        "kedro_version": context.project_version,
        "tags": None,
        "from_nodes": None,
        "to_nodes": None,
        "node_names": None,
        "from_inputs": None,
        "load_versions": None,
        "pipeline_name": pipeline_name,
        "extra_params": context._extra_params,
    }

    hook_manager.hook.before_pipeline_run(
        run_params=run_params, pipeline=pipeline, catalog=catalog
    )
    try:
        run_result = runner.run(pipeline, catalog)
    except Exception as exc:
        hook_manager.hook.on_pipeline_error(  # pylint: disable=no-member
            error=exc,
            run_params=run_params,
            pipeline=pipeline,
            catalog=catalog,
        )
        raise exc
    hook_manager.hook.after_pipeline_run(  # pylint: disable=no-member
        run_params=run_params,
        run_result=run_result,
        pipeline=catalog,
        catalog=catalog,
    )

    return run_result

Comments

stale[bot] commented 3 years ago

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.