kedro-org / kedro

Kedro is a toolbox for production-ready data science. It uses software engineering best practices to help you create data engineering and data science pipelines that are reproducible, maintainable, and modular.
https://kedro.org
Apache License 2.0
9.89k stars 897 forks source link

Dynamic Pipeline #2627

Open noklam opened 1 year ago

noklam commented 1 year ago

Introduction

We had a discussion about dynamic pipelines from #1993, also partly related to #1963, this issue is to summarise the discussion and lay out the work that we need to do.

Related Issues:

A high-level, short overview of the problem(s) you are designing a solution for.

Background

Dynamic Pipeline has been one of the most asked questions, there are various solutions but often they are case-by-case. As a result the solutions comes with all fashion and it has been asked whether Kedro can provide a feature for that.

What is "Dynamic Pipeline"

When people are referring "Dynamic Pipeline", often they are talking about the same thing. We need to make a clear distinction between them before we start to build a solution for it.

We can roughly categorise them into 2 buckets

  1. Dynamic construction of Pipeline
  2. Dynamic behavior at runtime

Dynamic construction of Pipeline (easier)

Examples of these are:

  1. Time series forecasting - Pipeline make prediction for Day 1, next pipeline requires Day 1 prediction as input.
  2. Hyperparameters tuning
  3. Combined variable length of features - feature engineering combine N features into 1 DataFrame
  4. A list of countries - each need to be saved as a catalog entry, the data are then combined in a pipeline for further processing

Dynamic behavior at runtime (harder)

Examples of these are:

Why is it challenging for Kedro Users?

It needs experience with Kedro, often you need to combine advance features, i.e. TemplatedConfig + Jinja in Catalog + doing some for loop in your pipeline.py. image

In addition, each of the use cases need different solutions. As part of the Kedro's value proposition is the standardisation. There are no well-known pattern for these solution, they are hard to reason and debug with Jinja

What's not in scope

These two types of pipelines are fundamentally different from the "data-centric" approach of Kedro

What are some possible solutions?

Follow-up

Reference

stichbury commented 1 year ago

Thanks for this @noklam -- I was looking back at the technical discussion recently and wanted to flag it again, so I'm pleased you did. Would you be comfortable about writing this blog post? Maybe we can put it in a sprint in the next couple upcoming?

noklam commented 1 year ago

@stichbury Yes! I am happy to write something about this.

desmond-dsouza commented 1 year ago

The Miro board linked above seems to be not viewable by public?

noklam commented 1 year ago

@desmond-dsouza Sorry about that! You are right the link is private, as this is still in draft and there are some internal discussion in the Miro board. I will try to post some screenshot here when we discussed this.

Do you have any view on this? The board is mainly examples, I try to go through all the Slack & Discord question to summarise what are the user problem.

astrojuanlu commented 1 year ago

Another question on dynamic pipelines https://www.linen.dev/s/kedro/t/12647827/hello-thank-you-for-this-great-library-i-am-a-ds-working-in-#d170c710-8e6a-4c56-a623-058c3ec33da7

[etape 1] > [etape 2] > [if score_etape2 < X ] > [etape4]
                      > [if score_etape2 >= X ] > [etape5]
noklam commented 1 year ago

@astrojuanlu This sort of runtime generated DAGs is not supported currently. There are few possible solutions to get around this:

  1. Embed the if-else condition inside a node
  2. Use hooks or dynamic injection

We usually advise try to avoid these kind of conditional DAGs as much as possible, because it gets very complicated once you have multiple switches and it is difficult to debug. Having a conditional DAG is not much different from having a if-else node. i.e.

if score >= 0.5:
  return do_a()
else:
  return do_b()

The challenge here is the return dataset may be different or not compatible at all.

I haven't done it before, it should be more flexible to do this with the Python API. The code may look like this

# deployment_script.py
result = session.run(tag="some_pipeline")
if result["score"] >= 0.5:
  session.run(pipeline="deploy")
else:
  session.run(pipeline="retrain")

It does mean that you may lose certain feature and cannot use the Kedro CLI to run this, so use it sparingly.

datajoely commented 1 year ago

For me, if we're going to do this properly we need some sort of

noklam commented 1 year ago

X-posting some solution our users is using.

  1. Create a after_context_created after_catalog_created hook
  2. Replace the register_pipelines() function with a custom register_dynamic_pipelines(catalog: DataCatalog) function
  3. If you can pass the catalog to create pipelines, you can access datasets and parameters to dynamically build your pipeline! Use create_pipeline(catalog: DataCatalog) functions to create your pipelines !

Same Hook as above. But, instead of creating pipelines by changing the parameter and dataset names directly we create namespaced pipelines. Along with dataset factories: https://docs.kedro.org/en/stable/data/data_catalog.html#example-3-generalise-datasets-using-namespaces-into-one-dataset-factory - have a look at the catalog.yml. This way , not only is the pipeline dynamic but also your catalog!

Detail solution can be found here.

Lodewic commented 1 year ago

@noklam To pitch in here, thank you for sharing my solution to this issue tracker! You asked for an example repository, you can find that here: https://github.com/Lodewic/kedro-dynamic-pipeline-hook-example

inigohidalgo commented 1 year ago
  • [not entirely sure] A way of accessing the catalog in the pipeline registry

@datajoely we are currently looking for a workaround to be able to access parameters from the pipeline registry. In one of our projects (not sure exactly what kedro version was current when it was implemented, def <0.16) we had this as a workaround:

class ProjectContext(KedroContext):
    """Users can override the remaining methods from the parent class here,
    or create new ones (e.g. as required by plugins), indicate in settings.py
    """

    def _get_pipelines(self) -> Dict[str, Pipeline]:
        params = {} if self._extra_params is None else self._extra_params
        return create_pipelines(**params)

We are looking for a current "kedroic" way of handling this. I am looking into @Lodewic 's implementation to see how well it fits our usecase, it looks very promising, but a more out of the box solution might be preferable as it requires a certain amount of knowledge about kedro's inner workings which we probably shouldn't expect from general users.

datajoely commented 1 year ago

This is neat @inigohidalgo !

noklam commented 1 year ago

This look like the 0.16.x or 0.17.x style of creating pipeline, I actually don't know what happened and why we moved away from this. It used to be possible to access paramaters in create_pipeline, thus the template is still create_pipeine(**kwargs).

astrojuanlu commented 11 months ago

Relevant: https://getindata.com/blog/kedro-dynamic-pipelines/ by @marrrcin

In other news, found this on the Dagster Slack:

Seems that the cost of repetitive tedious logic around i/o is much less than trying to piece together the dagster abstraction, given the examples/docs. Feels like every time I try to use frameworks like airflow, kedro, dagster, they all let me down as soon as I start doing anything dynamic.

https://discuss.dagster.io/t/13882234/hello-world-for-dynamic-partition-etl-is-a-nightmare-the-ina#78fea568-9198-48a0-a0ba-42a141f440be

astrojuanlu commented 11 months ago

After sharing the blog post on #1606, I was thinking that we should find a more descriptive name for the use case addressed in it. "Dynamic pipelines" seems to imply that the pipelines themselves have some data-dependent runtime behavior or data-dependent structure (the 2 buckets originally devised by @noklam), but taking a pipeline and reusing it with different configurations is hardly "dynamic". We should call this "pipeline reuse" or investigate how other projects (DVC Pipelines, MLFlow recipes, Airflow DAGs) call this concept.

datajoely commented 11 months ago

In CI/CD world this sort of thing is often called Matrix job in our examples we want to run something like an "experiment array"

We should note Hydra calls this a multi-run but they also have integration with "Sweepers" which is more intuitive to what we're doing here. The next question raised is how we can make something like Optuna work for our purposes.

inigohidalgo commented 11 months ago

The next question raised is how we can make something like Optuna work for our purposes.

When I've used optuna within Kedro I've defined the search space from config but the output was basically just a pickle of the finished optuna run alongside a json with the optimal hyper parameters. Which of optunas features would you see as useful for Kedro?

datajoely commented 11 months ago

@inigohidalgo it's more that Hydra counts Optuna, Ax and Nevergrad in the 'sweeper' category

astrojuanlu commented 11 months ago

Today @datajoely recommended @marrrcin's approach as an alternative to Ray Tune for parameter sweep https://linen-slack.kedro.org/t/16014653/hello-very-much-new-to-the-ml-world-i-m-trying-to-setup-a-fr#e111a9d2-188c-4cb3-8a64-37f938ad21ff

Are we confident that the DX offered by this approach can compete with this?

search_space = {
    "a": tune.grid_search([0.001, 0.01, 0.1, 1.0]),
    "b": tune.choice([1, 2, 3]),
}

tuner = tune.Tuner(objective, param_space=search_space)
datajoely commented 11 months ago

No but it's does provide a budget version of it - this is what I'm saying about the lack of sweeper integration with dedicated "sweepers" in this comment

astrojuanlu commented 11 months ago

I'm now convinced that we should cleanly separate the "dynamic pipelines" conversation as originally stated by @noklam from the parameter sweeping/experimentation/multi-run use case, which is conceptually way simpler and has very clear boundaries and expectations. I propose we continue in https://github.com/kedro-org/kedro/issues/1606

datajoely commented 11 months ago

Yup - I think there are two categories as Nok says at the top:

  1. "Deterministic pipeline structure generation" (sweeps fall into this, but selection may be the latter)
  2. "Runtime dynamic pipeline structure" (Conditional logic introduces combinatorial complexity and possibly makes Kedro turing complete)

    There is significant user validation in terms of demand and competitor validation since we see other tools in the space offering this functionality.

inigohidalgo commented 11 months ago

I've been a bit outside this discussion, although I'm super interested in the topic. To make sure I understand the two options, I have the following usecase:

I have a pipeline which predicts total demand for a product in a given day, with the day specified as an input parameter to the pipeline.

Some days due to data issues, the prediction will fail, but once the issues are solved in the past, we would like to see how the model would have performed. In order to do this, we have a backfill pipeline set up which loads the predictions dataset, checks for gaps, and launches a pipeline for each missing day. This pipeline as I've described it, is more of an example of the second--harder--view, right? Since the structure of the final pipeline depends on the state of a dataset.

But if on the other hand I simply wanted to define a pipeline which will loop through the last 10 days and run the pipeline with all those last 10 days, regardless of the status of the predictions dataset, would that be an example of 1, where I am just defining a pipeline in a for loop, potentially using code to construct that pipeline based on today's date and whatever number (10 in the example) of days backwards I would want to go which I define through config?

datajoely commented 11 months ago

Nice talk on how to do hyperparameter tuning and selection in Flyte https://www.youtube.com/watch?v=UO1gsXuSTzg (key bit starts around 12 mins in)

datajoely commented 11 months ago

Optuna + W&B https://colab.research.google.com/drive/1WxLKaJlltThgZyhc7dcZhDQ6cjVQDfil#scrollTo=sHcr30CKybN7

astrojuanlu commented 11 months ago

with your permission @datajoely I'm going to copy your comments to #1606, since they're relevant there, and mark them as off-topic here

stichbury commented 11 months ago

Docs/content update:

inigohidalgo commented 11 months ago

Another usecase which I'm not sure where it would fall:

I have a time-series problem where I compute a lot of lags, rolling statistics etc. When designing my training pipeline, I have a target number of days I want my master table to include.

Due to the way lags are carried out in pandas, we need to pad our initial queries by the maximum length of lag, as otherwise we would get nulls at the start. This maximum would then be an input to some initial nodes which filter sql tables.

Technically there is no "data" dependency, since it would purely be based on prespecified parameters, but there is a point where a "max" or something needs to be calculated.

datajoely commented 11 months ago

On this last point @inigohidalgo a lot of users ask can I run something like kedro run --params target_date:2023-11-01 and whilst its technically possible it's not nice to feed runtime arguments into catalog definitions to dynamically change load behaviour.

gitgud5000 commented 4 months ago

Hi everyone,

I wanted to share a somewhat hacky method we implemented for creating a dynamic pipeline. Our pipeline required reprocessing a dataset for previous dates based on the runtime parameter run_date. Here's a simplified representation of the process: image I'll describe what we ended up doing below. I would appreciate any feedback or recommendations you might have.

Modular Pipelines and Namespaces

First, we leveraged modular pipelines and namespaces to create a dynamic reprocessing capability. The goal was to reprocess datasets for previous dates without rerunning certain parts of the pipeline (specifically the feature engineering boxes, labeled as FE1, FE2, and FE3).

The Reprocess pipelines were instantiated as follows:

pipes = []
for i in range(1, 6):
    t_version = pipeline(
        pipe=check_requirements + shape_target + shape_master_table,
        namespace=f"t-{i}",
        tags=["delta_t"],
    )
    pipes.append(t_version)
t_n_pipelines = sum(pipes)

In this setup, each reprocessing pipeline (t-1 to t-5) is created with a unique namespace. This allows us to isolate the processing for different time periods. Notably, the feature engineering steps (FE1, FE2, FE3) do not run in the reprocess parts of the pipeline, as they are only relevant for the initial processing (t=0).

Next, we created these entries in the catalog.yml for the Δ_t versions of the dataset:

# A type of SQLQueryDataset used in "Some ETL" box
## t=0 / Original Version
EX_DATASET:
  type: "${_datasets.sqlscript}"
  credentials: oracle_credentials
  filepath: queries/EX_DATASET.sql
  query_args:
      run_date: ${runtime_params:run_date}
      use_case: ${runtime_params:use_case}
## t=Δ Version
"{namespace}.EX_DATASET":
  type: "${_datasets.sqlscript}"
  credentials: oracle_credentials
  filepath: queries/EX_DATASET.sql
  query_args:
      run_date: "{namespace}"
      use_case: ${runtime_params:use_case}

# The same for other types like D_n or resulting GenMT
## t=0 / Original Version
D1:
  type: "${_datasets.parquet}"
  filepath: "${_azure_base_path}/04_feature/${runtime_params:use_case}/${runtime_params:run_date}/D1.parquet"
  credentials: azure_credentials
## t=Δ Version
"{namespace}.D1":
  type: "${_datasets.parquet}"
  filepath: "${_azure_base_path}/04_feature/${runtime_params:use_case}/{namespace}/D1.parquet"
  credentials: azure_credentials

We initially thought this approach would suffice if we could somehow perform a nested interpolation of the namespace to its value. However, the resolution of the config happens when the config is loaded before a session is run. The dataset factory placeholders are resolved later when the pipeline is being executed (see Kedro issue 3086).

So Hooks🪝...

Since hooks are stateful objects (see Kedro issue 2690), we created a DatesDeltaToContextHook to handle the dynamic aspects. Here's what it does:

  1. after_context_created: Creates and stores the namespaced run_date parameters.
    def after_context_created(self, context) -> None:
        """
        Create t-0 -> t-5 of rundate and add to context to generate catalog.
        """
        run_date = context.params.get("run_date", None)
        self.delta_pattern = r"t-\d+"
        if run_date:
            # Calculate t-0 to t-5 of run_date to use in catalog generation
            self.formatted_dates = self._gen_time_delta(run_date)
            context.config_loader["parameters"] = {
                **context.config_loader["parameters"],
                **self.formatted_dates,
            }
  2. after_catalog_created: Modifies the dataset instances that match the namespace pattern.

    def after_catalog_created(self, catalog: DataCatalog) -> None:
        # return None
        """
        Modify dataset filepaths/sql in the catalog based on the delta run_dates from the parameters.
        """
        _pipelines: Dict[str, Pipeline] = dict(pipelines)
    
        LOGGER.info("Enforcing data set pattern discovery...")
        # Capture all data set names from all pipelines
        data_set_names = {
            data_set_name
            for pipeline in _pipelines.values()
            for data_set_name in pipeline.datasets()
        }
        # filter based on match the pattern t-number (e.g. t-1, t-2, t-3 ...) excluding the ones with `params:`
        data_set_to_alter = {
            data_set_name
            for data_set_name in data_set_names
            if re.search(self.delta_pattern, data_set_name)
            and "params:" not in data_set_name
        }
    
        for data_set_name in data_set_to_alter:
            try:
                t_delta, _ = data_set_name.split(".")
                # Enforce data set pattern discovery
                dataset = catalog._get_dataset(data_set_name)  # pylint: disable=protected-access
                run_date = self.formatted_dates.get(f"{t_delta}.run_date")
                match dataset:
                    case SQLScriptDataset():
                        self._update_dataset_sql_query(dataset, run_date)
                    case MemoryDataset():
                        pass
                    case _:
                        self._update_dataset_filepath(dataset, run_date)
            except DatasetNotFoundError:
                continue

Feel free to provide any feedback or suggestions. Thank you!

datajoely commented 4 months ago

Thank you for such a clear write up @gitgud5000 - I'm so keen to make this use-case ergonomic and this is so helpful

inigohidalgo commented 4 months ago

This is a really cool use of filtering and namespaces thanks for sharing @gitgud5000

This is just a thought, not at all a practical change in your case as it only addresses a subset of the behavior you are building but:

one of the major "needs" you're solving with the after_catalog_created hook "forcing dataset pattern discovery" is is providing the dynamically-generated run_date to the parametrized sql queries, right? A more ergonomic implementation of this could go through the newly-released Ibis TableDataset, but this only helps with whatever parametrized filtering you're doing on the sql query side, not with the directory name for the parquet files.