kedro-org / kedro

Kedro is a toolbox for production-ready data science. It uses software engineering best practices to help you create data engineering and data science pipelines that are reproducible, maintainable, and modular.
https://kedro.org
Apache License 2.0
9.49k stars 874 forks source link

Toposort fails when using transcoded datasets #3799

Closed ElenaKhaustova closed 2 months ago

ElenaKhaustova commented 3 months ago

Description

The new toposort seems to not handle it well when transcoded datasets are involved.

Context

Appeared in https://github.com/kedro-org/kedro-starters/pull/215 Possible change that caused it: https://github.com/kedro-org/kedro/pull/3728

Steps to Reproduce

When you have a node like

node(
        func=load_shuttles_to_csv,
        inputs="test_data@excel",
        outputs="test_data@csv",
        name="load_shuttles_to_csv_node",
),

node_dependencies are resolved as:

{Node(load_shuttles_to_csv, 'test_data@excel', 'test_data@csv', 'load_shuttles_to_csv_node'): {Node(load_shuttles_to_csv, 'test_data@excel', 'test_data@csv', 'load_shuttles_to_csv_node')}} 

If we don't call _strip_transcoding it resolves it as: {Node(load_shuttles_to_csv, 'test_data@excel', 'test_data@csv', 'load_shuttles_to_csv_node'): set()}

The problem disappears if giving the datasets a different name before the @ so instead oftest_data@csv and test_data@excel make it: test_csv_data@csv and test_excel_data@excel

Expected Result

The error does not appear when using transcoded datasets.

noklam commented 3 months ago

Run failed: 2095e24241441f94d40fedbb2ee10f0e1e24940b

Run Success: 0fc8089b637a0679f71e2bddc91f0676fc2914a2

image

I can confirm the regression is created there.

noklam commented 3 months ago

Other than this error, if I try to print pipeline I also get error

<repr-error "unsupported operand type(s) for +: 'Pipeline' and 'Pipeline'">

image
noklam commented 3 months ago

^ the above error is flaky, I don't know why it shows up sometimes but not always. Maybe ignore it for now.

image

I have a new theory that this bug is actually not new, at least the pipeline.node_dependencies looks the same even when the pipeline can be run successfully. (the above pipeline can be run successfully at https://github.com/kedro-org/kedro/commit/0fc8089b637a0679f71e2bddc91f0676fc2914a2)

I suspect we did not strip the transcoding consistency and causing some funny thing here.

toposort also takes that dependencies happily without complain.

image
noklam commented 3 months ago

At least with the old toposort, a self dependencies is not a problem. Technically it shouldn't cause any problem in the resolving order too, but it's indeed a bit weird. So I think node_dependencies isn't the problem here.

image

Here is the code that I used to test (you don't need a project so it's easy to set breakpoint etc)

from kedro.pipeline import node, pipeline
def load_shuttles_to_csv(data):
        return data

def foo(data):
        return data

def bar(data):
        return data
n1 =  node(
                func=foo,
                inputs="shuttles@excel",
                outputs="shuttles@csv",
                name="load_shuttles",
            )
n2 =             node(
                func=bar,
                inputs="shuttles@spark",
                outputs="preprocessed_shuttles",
                name="preprocess_shuttles",
            )
from kedro.pipeline import *

p = pipeline([n1,n2])
# p2 = pipeline([n1])
from toposort import toposort
toposort(p.node_dependencies)
print("Done")
noklam commented 3 months ago

On the other hand, self dependency is not allowed with TopologicalSorter, thus we are getting this error only now.

image
ElenaKhaustova commented 3 months ago

A short test to confirm @noklam outcome that the problem is not in topological sort. So the only difference is that toposort accepts self-dependencies.

from graphlib import TopologicalSorter
from toposort import toposort

def main():
    graph = {"D": {"B", "C"}, "C": {"C"}, "B": {"A"}}

    old_ts = toposort(graph)
    print(list(old_ts))

    ts = TopologicalSorter(graph)
    print(tuple(ts.static_order()))

The actual problem lies in the way we get node_dependencies and _nodes_by_input.

When getting node_dependencies we iterate through the nodes and for each node look through its outputs. If an output of node A is input to some other node - B, then we add A as a dependency of B based on _nodes_by_input dictionary.

  dependencies: dict[Node, set[Node]] = {node: set() for node in self._nodes}
  for parent in self._nodes:
      for output in parent.outputs:
          for child in self._nodes_by_input[_strip_transcoding(output)]:
              dependencies[child].add(parent)

  return dependencies

We always _strip_transcoding for node names: when calculating dependencies, _nodes_by_input and indexing the node by name.

self._nodes_by_input: dict[str, set[Node]] = defaultdict(set)
        for node in tagged_nodes:
            for input_ in node.inputs:
                self._nodes_by_input[_strip_transcoding(input_)].add(node)

So in the following example, we treat the output of preprocess_shuttles_node as the input of it. Bcs after the transcoding of "shuttles@exel" or "shuttles@csv" we get the same string "shuttles"

def create_pipeline(**kwargs) -> Pipeline:
    return pipeline(
        [
            node(
                func=preprocess_companies,
                inputs="companies@csv",
                outputs="preprocessed_companies@csv",
                name="preprocess_companies_node",
            ),
            node(
                func=preprocess_shuttles,
                inputs="shuttles@exel",
                outputs="shuttles@csv",
                name="preprocess_shuttles_node",
            ),
        ]
    )
#################### self._nodes_by_input ####################
{
'companies': {Node(preprocess_companies, 'companies@csv', 'preprocessed_companies@csv', 'preprocess_companies_node')},
'shuttles': {Node(preprocess_shuttles, 'shuttles@exel', 'shuttles@csv', 'preprocess_shuttles_node')}
}

#################### self.node_dependencies ####################
{
Node(preprocess_companies, 'companies@csv', 'preprocessed_companies@csv', 'preprocess_companies_node'): set(),
Node(preprocess_shuttles, 'shuttles@exel', 'shuttles@csv', 'preprocess_shuttles_node'): {Node(preprocess_shuttles, 'shuttles@exel', 'shuttles@csv', 'preprocess_shuttles_node')}
}
ElenaKhaustova commented 3 months ago

Just rolling back to the old toposort that ignores self-dependency doesn't seem the right solution cause if so, we still can get where the second node will have a false dependency from the first one, though it uses only free input:

node(
                func=preprocess_shuttles,
                inputs="shuttles@exel",
                outputs="shuttles@csv",
                name="preprocess_shuttles_node_1",
            ),
            node(
                func=preprocess_shuttles,
                inputs="shuttles@exel",
                outputs="shuttles_test",
                name="preprocess_shuttles_node_1",
            ),

@noklam, @astrojuanlu, @merelcht, @idanov, the solution here would be to differentiate inputs/output like shuttles@exel and shuttles@exel by simply not stripping the transcoding. But I wonder what was the logic behind the striping and what are the use cases it should serve?

noklam commented 3 months ago

Just rolling back to the old toposort that ignores self-dependency doesn't seem the right solution cause if

I agree, though I think it generates a false dependency but still resolves in the correct execution order (it would be much worse if the order is wrong).

@ElenaKhaustova Do you understand what is transcoding dataset already? https://docs.kedro.org/en/0.17.5/05_data/01_data_catalog.html#transcoding-datasets may help. Without transcoding the resulted graph will be a disconnected graph, so the striping allow connecting the nodes properly. It makes more sense if you look at the @spark @pandas example. In other words, if the nodes are disconnected, the execution order can be wrong because Kedro doesn't understand there is a dependency.

Maybe the bug is caused by stripping in the wrong place or we did not strip it consistently?

ElenaKhaustova commented 3 months ago

Another problem related to the current implementation of transcoding datasets is that we can get circular dependencies when we do not have them.

def create_pipeline(**kwargs) -> Pipeline:
    return pipeline(
        [
            node(
                func=preprocess_shuttles,
                inputs="shuttles@exel",
                outputs="shuttles@csv",
                name="preprocess_shuttles_node_1",
            ),
            node(
                func=preprocess_shuttles,
                inputs="shuttles@csv",
                outputs="shuttles@whatether",
                name="preprocess_shuttles_node_2",
            ),
        ]
    )

In this case, _nodes_by_input will contain two nodes for the shuttles key. So, each node has two nodes (itself + other) as a dependency.

If rolling back to https://github.com/kedro-org/kedro/commit/0fc8089b637a0679f71e2bddc91f0676fc2914a2, we get the following error, which raises before topological sort is applied:

    raise OutputNotUniqueError(
kedro.pipeline.pipeline.OutputNotUniqueError: Output(s) ['shuttles'] are returned by more than one nodes. Node outputs must be unique.
ElenaKhaustova commented 3 months ago

Summary:

  1. Rolling back to the toposort doesn't seem right. It solves the original case, but some others can appear anyway.
  2. We have to think about transcoding datasets and how to make them work for all the cases or at least update the docs to clarify the constraints.
  3. For now, we can update the starter to avoid the original error; for example, remove transcoding within the same node

@lrcouto The question is if we want to make a release before solving item 2.

lrcouto commented 3 months ago

I personally don't think we should release before solving this issue. We could do what @merelcht suggested yesterday and change the node names on the starters just to make it pass the CI, but that would be a band-aid fix.

astrojuanlu commented 3 months ago

We agreed to timebox this to avoid the risk of rushing an improper solution. Targeting a release ~early next week.

idanov commented 2 months ago

The issue is indeed that we have uncovered something that shouldn't be allowed in the first place, and namely using transcoding as a cheat for adding circular dependencies in nodes:

            node(
                func=load_shuttles_to_csv,
                inputs="shuttles@excel",
                outputs="shuttles@csv",
                name="load_shuttles_to_csv_node",
            ),

Transcoding should always be stripped when resolving ordering issues and validating pipelines and nodes for cycles. So the correct action here would be to fix the template. This should raise an error upon node or pipeline creation, depends how we want to define the behaviour, either will be valid. Transcoding was only made to enable hand-offs between pandas and pyspark nodes, or similar use cases. The starter here has gotten it the wrong way around - it's not the format at rest, but the format in-flight that we need to specify here.

As far as I can see, there's no bug in Kedro now, there was before due to a peculiarity of toposort allowing self-references. Currently, the starters are the broken ones and they were for a long time, because the Kedro bugged masked the bugs in the starters.

In order to make the error reporting more user-friendly, we might decide to add an extra check in the nodes creation, but that doesn't have to happen before the release.

Well done to @noklam for doing all the research and tracing it back to toposort's bug.

ElenaKhaustova commented 2 months ago

Applied changes:

Further updates required: