kedro-org / kedro-viz

Visualise your Kedro data and machine-learning pipelines and track your experiments.
https://demo.kedro.org
Apache License 2.0
646 stars 106 forks source link

Refactor modular pipelines #1941

Closed ravi-kumar-pilla closed 14 hours ago

ravi-kumar-pilla commented 3 weeks ago

Description

Resolves #1899 , #1814

Development notes

To ease review process for - https://github.com/kedro-org/kedro-viz/pull/1897 , created the below PRs

QA notes

Example modular pipeline tree:

"modular_pipelines": {
        "__root__": {
            "id": "__root__",
            "name": "__root__",
            "inputs": [],
            "outputs": [],
            "children": [
                {
                    "id": "feature_engineering",
                    "type": "modularPipeline"
                },
                {
                    "id": "b5609df0",
                    "type": "parameters"
                },
                {
                    "id": "f6d9538c",
                    "type": "data"
                },
            …
            ]
        },
        "feature_engineering": {
            "id": "feature_engineering",
            "name": "feature_engineering",
            "inputs": [
                "abed6a4d",
                "f063cc82",
            …
            ],
            "outputs": [
                "23c94afb",
                "1e3cc50a"
            …
            ],
            "children": [
                {
                    "id": "8e4f1015",
                    "type": "data"
                },
                {
                    "id": "04ba733a",
                    "type": "task"
                },
            …
            ]
        },
        …
    }

Current issues in constructing the modular pipeline tree:

  1. How we determine internal_inputs/outputs, external_inputs/outputs based on namespace and not on what kedro returns. Since datasets do not have a namespace (i.e., only kedro node and pipeline have namespaces) this raised issues in determining the actual inputs/outputs of a nested modular pipeline.
  2. Inheriting input/output datasets to parent modular pipeline when nested. This made few datasets to appear in the root modular pipeline even though they are not free output datasets.
  3. Readability/Maintenance issues in case of nested modular pipelines, as we did not define rules in adding a modular pipeline child, inputs and outputs for a modular pipeline
  4. On the UI, modular pipeline focus was missing associated inputs/outputs from getting highlighted in the node menu as dataset nodes do not have namespace, the associated modular_pipelines were always empty.

Incorrect rendering of nodes :

Issues raised by users -

How does this PR resolve the issues:

  1. Determines inputs/outputs to a modular pipeline based on what kedro returns.
  2. Removes the concept of internal/external inputs/outputs datasets for modular pipelines. There are only inputs/outputs for a modular pipeline. (Thanks to @idanov)
  3. Creates helper functions with rules, to deal with adding inputs/outputs and children to a modular pipeline.
  4. Populates modular pipeline tree before creating task/data nodes, which eliminates the need to calculate modular pipelines while creating the nodes using namespaces

Core parts that changed:

  1. Added helper methods populate_tree, add_children, _add_datasets_as_children, _add_children_to_parent_pipeline to ModularPipelinesRepository. (Thanks to @rashidakanchwala)
  2. While adding each KedroPipeline to Kedro-Viz data repositories, DataAccessManager calls populate_tree to resolve the construction of modular_pipelines_tree for the registered pipeline
  3. Inputs/Outputs for a modular pipeline are calculated using public apis available via Kedro (inputs(), outputs(), all_outputs(), only_nodes_with_namespace())
  4. Calculating children now have set of rules defined in the docstrings of add_children and other helper functions

Code Flow doc:

Please find further information at Refactor_Modular_Pipelines.docx

Modular Pipelines UI Rendering:

UseCase 1: When a modular pipeline output (dataset_3) is used as an input to another function of the same modular pipeline.

def create_pipeline(**kwargs) -> Pipeline:
    new_pipeline = pipeline(
        [
            node(lambda x: x,
                 inputs="dataset_in",
                 outputs="dataset_1",
                 name="step1"),
            node(lambda x: x,
                 inputs="dataset_1",
                 outputs="dataset_2",
                 name="step2"),
            node(lambda x: x,
                 inputs="dataset_2",
                 outputs="dataset_3",
                 name="step3"),
            node(lambda x: x,
                 inputs="dataset_3",
                 outputs="dataset_out",
                 name="step4"
            )
        ],
            namespace="main_pipeline",
        inputs=None,
        outputs={"dataset_out", "dataset_3"}
    )
    return new_pipeline

Before:

image

After:

image



UseCase 2: When a nested modular pipeline output (dataset_3) is used as an input to the outer modular pipeline

def create_pipeline(**kwargs) -> Pipeline:

    sub_pipeline = pipeline(
        [
            node(lambda x: x,
                 inputs="dataset_1",
                 outputs="dataset_2",
                 name="step2"),
            node(lambda x: x,
                 inputs="dataset_2",
                 outputs="dataset_3",
                 name="step3"),
        ],
        inputs={"dataset_1"},
        outputs={"dataset_3"},
        namespace="sub_pipeline"
    )
    new_pipeline = pipeline(
        [
            node(lambda x: x,
                 inputs="dataset_in",
                 outputs="dataset_1",
                 name="step1"),
            sub_pipeline,
            node(lambda x: x,
                 inputs="dataset_1",
                 outputs="dataset_1_2",
                 name="step1_2"),
            node(lambda x: x,
                 inputs="dataset_3",
                 outputs="dataset_4",
                 name="step4"
            )
        ],
            namespace="main_pipeline",
        inputs=None,
        outputs={"dataset_3","dataset_4"}
    )
    return new_pipeline

Before:

image

After:

image


UseCase 3: When a nested modular pipeline output (dataset_3) is used as an input to the outer modular pipeline and also used as an input to another external modular pipeline

def create_pipeline(**kwargs) -> Pipeline:

    sub_pipeline = pipeline(
        [
            node(lambda x: x,
                 inputs="dataset_1",
                 outputs="dataset_2",
                 name="step2"),
            node(lambda x: x,
                 inputs="dataset_2",
                 outputs="dataset_3",
                 name="step3"),
        ],
        inputs={"dataset_1"},
        outputs={"dataset_3"},
        namespace="sub_pipeline"
    )
    new_pipeline = pipeline(
        [
            node(lambda x: x,
                 inputs="dataset_in",
                 outputs="dataset_1",
                 name="step1"),
            sub_pipeline,
            node(lambda x: x,
                 inputs="dataset_1",
                 outputs="dataset_1_2",
                 name="step1_2"),
            node(lambda x: x,
                 inputs="dataset_3",
                 outputs="dataset_4",
                 name="step4"
            )
        ],
            namespace="main_pipeline",
        inputs=None,
        outputs={"dataset_3","dataset_4"}
    )

    other = pipeline([
        node(lambda x: x,
                 inputs="dataset_3",
                 outputs="dataset_5",
                 name="step5"
            )
    ],
    namespace="other_pipeline",
    inputs={"dataset_3"},
    outputs={"dataset_5"}
    )

    return new_pipeline + other

Before:

image

After:

image



UseCase 4: When an output of a namespace function (using node namespaces) (dataset_7, dataset_9) is an input to another function in the same namespace

def create_pipeline(**kwargs) -> Pipeline:
    return pipeline(
        [
            node(
                func=lambda dataset_1, dataset_2: (dataset_1, dataset_2),
                inputs=["dataset_1", "dataset_2"],
                outputs="dataset_3",
                name="first_node",
            ),
            node(
                func=lambda dataset_1, dataset_2: (dataset_1, dataset_2),
                inputs=["dataset_3", "dataset_4"],
                outputs="dataset_5",
                name="second_node",
            ),
            node(
                func=lambda dataset_1, dataset_2: (dataset_1, dataset_2),
                inputs=["dataset_5", "dataset_6"],
                outputs="dataset_7", 
                name="third_node",
                namespace="namespace_prefix_1",
            ),
            node(
                func=lambda dataset_1, dataset_2: (dataset_1, dataset_2),
                inputs=["dataset_7", "dataset_8"],
                outputs="dataset_9",
                name="fourth_node",
                namespace="namespace_prefix_1",
            ),
            node(
                func=lambda dataset_1, dataset_2: (dataset_1, dataset_2),
                inputs=["dataset_9", "dataset_10"],
                outputs="dataset_11",
                name="fifth_node",
                namespace="namespace_prefix_1",
            ),
        ]
    )

Before:

image

After:

image



UseCase 5: When an output of a nested modular pipeline (model_inputs) is an input to another nested modular pipeline

def create_pipeline(**kwargs) -> Pipeline:
    data_processing_pipeline = pipeline(
        [
            node(
                lambda x: x,
                inputs=["raw_data"],
                outputs="model_inputs",
                name="process_data",
                tags=["split"],
            )
        ],
        namespace="uk.data_processing",
        outputs="model_inputs",
    )
    data_science_pipeline = pipeline(
        [
            node(
                lambda x: x,
                inputs=["model_inputs"],
                outputs="model",
                name="train_model",
                tags=["train"],
            )
        ],
        namespace="uk.data_science",
        inputs="model_inputs",
    )
    return data_processing_pipeline + data_science_pipeline

Before:

image

After:

image



UseCase 6: Nested namespace pipelines with single input (input_to_processing) and single output (output_from_processing)

def _get_generic_pipe() -> Pipeline:
    return Pipeline([
        node(
            func=lambda x: x,
            inputs="input_df",
            outputs="output_df",
        ),
    ])

def create_pipeline(**kwargs) -> Pipeline:
    pipe = Pipeline([
        pipeline(
            pipe=_get_generic_pipe(),
            inputs={"input_df": "input_to_processing"},
            outputs={"output_df": "post_first_pipe"},
            namespace="first_processing_step",
        ),
        pipeline(
            pipe=_get_generic_pipe(),
            inputs={"input_df": "post_first_pipe"},
            outputs={"output_df": "output_from_processing"},
            namespace="second_processing_step",
        ),
    ])
    return pipeline(
        pipe=pipe,
        inputs="input_to_processing",
        outputs="output_from_processing",
        namespace="processing",
    )

Before:

image

After:

image



Modular Pipelines expand and collapse in action:

Before:

UseCase 1-4:

UseCase1-4

UseCase 5-6:

UseCase5-6

After:

UseCase 1-4:

UseCase1-4_after

UseCase 5-6:

UseCase5-6_after

Checklist

ravi-kumar-pilla commented 2 weeks ago

Hi Team,

Please let me know if the split PRs are hard to review. I could not find a better way to split as the initial changes were interlinked. If the PR as a whole is easy to review, I am happy to shift this info to #1897 . Thank you