kedro-org / kedro

Kedro is a toolbox for production-ready data science. It uses software engineering best practices to help you create data engineering and data science pipelines that are reproducible, maintainable, and modular.
https://kedro.org
Apache License 2.0
9.53k stars 877 forks source link

Add `before/after_pipelines_registered` hook OR `before/after_pipeline_filtered` #3000

Open marrrcin opened 11 months ago

marrrcin commented 11 months ago

Description

It would be great to have a hook / a good place to attach some code after all pipelines have been registered but before the execution starts. I would like to do this for the sake of performing some validations on the pipeline definitions (w.r.t node tags).

Context

When doing a "Kedro-pipeline" to an "orchestrator-pipeline" translation, e.g. Airflow, there is a great pefromance optimization opportunity to "group" multiple Kedro nodes together and map multiple Kedro nodes as a single orchestrator level task instead of doing 1:1 mapping as it's done e.g. by Kedro-AzureML / Kedro-VertexAI / Kedro-Airflow. Grouping the nodes by any mechanism (e.g. by Kedro tags) requires some additional validation of pipeline structure - checking whether after grouping the pipeline will still be a DAG.

Possible Implementation

Add new hook specs like: after_pipelines_registered and before_pipelines_registered to access the pipelines before they are executed. For starters they could be a read-only, but it would be extra nice to be able to modify them on the fly too - especially when there would be a access e.g. do Kedro Context (this slightly connects to dynamic pipelines case; maybe it's too much for this issue anyway).

Possible Alternatives

Right now, we're plugging the custom grouping/validation code in the register_pipelines which has some downsides:

Updated on 2023-09-12: I think even better alternative would be to have before/after_pipeline_filtered hooks as shown below: I would expect those hooks to access:

Screenshot 2023-09-12 at 12 37 24

datajoely commented 11 months ago

I think it would have be injected here - but I'm not sure https://github.com/kedro-org/kedro/blob/0293dc15812b27330bba31a01c7b332b3165af2a/kedro/framework/project/__init__.py#L142

marrrcin commented 11 months ago

I think it would have be injected here - but I'm not sure

https://github.com/kedro-org/kedro/blob/0293dc15812b27330bba31a01c7b332b3165af2a/kedro/framework/project/__init__.py#L142

This place unfortunately has no KedroContext 😞 But yeah, it's the initial place to have this.

noklam commented 11 months ago

Looks reasonable to me, what will be the minimal signatures for these two hooks? Do you just need to executing some code before/after or you need to mutate the pipeline itself?

marrrcin commented 11 months ago

Mutating the pipeline while having KedroContext at hand would be great :D Even if no mutation is possible, then having KedroContext and/or ConfigLoader at this point would do the job - not sure whether it fits Kedro's lifecycle though...

noklam commented 11 months ago

This place unfortunately has no KedroContext 😞 But yeah, it's the initial place to have this. In case these diagram could helps. https://docs.kedro.org/en/0.18.3/faq/architecture_overview.html image

x-ref #1718 image

It's unclear to me how injecting this in _ProjectPipelines will work. I don't want to bring in the Dynamic pipeline topic here yet since this will be a huge scope, it's definitely related though. Maybe it helps if you can show some example/snippets how your current workaround works.

marrrcin commented 11 months ago

The current workaround is just "execute the code in register_pipelines() of a project" with some hard-coded values (e.g. tag prefix strings by which we group the nodes) :)

marrrcin commented 10 months ago

New insights, see Updated on 2023-09-12 section in the first post.

datajoely commented 10 months ago

@marrrcin we have the click context at this point so I think we should include this in the hook in some workable form

Lasica commented 10 months ago

I've had the need for such functionality, as I was implementing grouping node execution by tags using kedro-airflow plugin. I wanted to validate whether the tagging was correct (so no disjoint, co-dependent nodes were joined and create a cycle dependency), so I had hard coded special prefix for tag to treat it as a grouping tag i.e "airflow:". With that tag "airflow:preprocessing" would translate to an airflow node "preprocessing" containing all nodes tagged "airflow:processing".

I wished that this tag validation were done at the step of pipeline creation/validation and issue warning about incorrect tagging, so I implemented it in pipeline registry. Then I wanted to source this hard coded prefix with other parts of code that were doing the grouping and translation, but there was no mechanism for that. I couldn't refer to parameters in pipeline registry in any way. This hook would help with that.

TL;DR: I had a use case to refer to parameters during registering pipelines and validating custom behavior and was unable to do that, while this feature would solve that.