kedro-org / kedro

Kedro is a toolbox for production-ready data science. It uses software engineering best practices to help you create data engineering and data science pipelines that are reproducible, maintainable, and modular.
https://kedro.org
Apache License 2.0
9.53k stars 879 forks source link

DatasetAlreadyExistsError thrown when using ThreadRunner, dataset factories #3739

Closed melvinkokxw closed 3 months ago

melvinkokxw commented 4 months ago

Description

Using ThreadRunner with dataset factories leads to a DatasetAlreadyExistsError

Context

I have a pipeline that has two nodes using the same input, both inputs should be loaded using dataset factories. When using ThreadRunner with my pipeline, kedro throws a DatasetAlreadyExistsError.

Steps to Reproduce

Here is a minimal reproducible example:

import yaml
from kedro.io import DataCatalog
from kedro.pipeline import Pipeline, node
from kedro.runner import ThreadRunner

catalog_yml = """
"{name}":
  type: MemoryDataset
"""

catalog = yaml.safe_load(catalog_yml)

io = DataCatalog.from_config(catalog)

def return_dataframe(input_df):
    return input_df.head(1)

pipeline = Pipeline(
    [
        node(
            func=return_dataframe, inputs="input_df", outputs="output_df1", name="node1"
        ),
        node(
            func=return_dataframe, inputs="input_df", outputs="output_df2", name="node2"
        ),
    ]
)
runner = ThreadRunner()
runner.run(pipeline, io)

Expected Result

Pipeline should run successfully with no errors

Actual Result

Full error logs here ``` ╭─────────────────────────────── Traceback (most recent call last) ────────────────────────────────╮ │ in :4 │ │ │ │ 1 from kedro.runner import ThreadRunner │ │ 2 │ │ 3 runner = ThreadRunner() │ │ ❱ 4 runner.run(pipeline, io) │ │ 5 │ │ │ │ /Users/user/.pyenv/versions/3.9.18/envs/evn-3.9/lib/python3.9/site-packages/kedro/runner/r │ │ unner.py:103 in run │ │ │ │ 100 │ │ │ self._logger.info( │ │ 101 │ │ │ │ "Asynchronous mode is enabled for loading and saving data" │ │ 102 │ │ │ ) │ │ ❱ 103 │ │ self._run(pipeline, catalog, hook_manager, session_id) │ │ 104 │ │ │ │ 105 │ │ self._logger.info("Pipeline execution completed successfully.") │ │ 106 │ │ │ │ /Users/user/.pyenv/versions/3.9.18/envs/evn-3.9/lib/python3.9/site-packages/kedro/runner/t │ │ hread_runner.py:133 in _run │ │ │ │ 130 │ │ │ │ done, futures = wait(futures, return_when=FIRST_COMPLETED) │ │ 131 │ │ │ │ for future in done: │ │ 132 │ │ │ │ │ try: │ │ ❱ 133 │ │ │ │ │ │ node = future.result() │ │ 134 │ │ │ │ │ except Exception: │ │ 135 │ │ │ │ │ │ self._suggest_resume_scenario(pipeline, done_nodes, catalog) │ │ 136 │ │ │ │ │ │ raise │ │ │ │ /Users/user/.pyenv/versions/3.9.18/lib/python3.9/concurrent/futures/_base.py:439 in result │ │ │ │ 436 │ │ │ │ if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: │ │ 437 │ │ │ │ │ raise CancelledError() │ │ 438 │ │ │ │ elif self._state == FINISHED: │ │ ❱ 439 │ │ │ │ │ return self.__get_result() │ │ 440 │ │ │ │ │ │ 441 │ │ │ │ self._condition.wait(timeout) │ │ 442 │ │ │ │ /Users/user/.pyenv/versions/3.9.18/lib/python3.9/concurrent/futures/_base.py:391 in │ │ __get_result │ │ │ │ 388 │ def __get_result(self): │ │ 389 │ │ if self._exception: │ │ 390 │ │ │ try: │ │ ❱ 391 │ │ │ │ raise self._exception │ │ 392 │ │ │ finally: │ │ 393 │ │ │ │ # Break a reference cycle with the exception in self._exception │ │ 394 │ │ │ │ self = None │ │ │ │ /Users/user/.pyenv/versions/3.9.18/lib/python3.9/concurrent/futures/thread.py:58 in run │ │ │ │ 55 │ │ │ return │ │ 56 │ │ │ │ 57 │ │ try: │ │ ❱ 58 │ │ │ result = self.fn(*self.args, **self.kwargs) │ │ 59 │ │ except BaseException as exc: │ │ 60 │ │ │ self.future.set_exception(exc) │ │ 61 │ │ │ # Break a reference cycle with the exception 'exc' │ │ │ │ /Users/user/.pyenv/versions/3.9.18/envs/evn-3.9/lib/python3.9/site-packages/kedro/runner/r │ │ unner.py:331 in run_node │ │ │ │ 328 │ if is_async: │ │ 329 │ │ node = _run_node_async(node, catalog, hook_manager, session_id) │ │ 330 │ else: │ │ ❱ 331 │ │ node = _run_node_sequential(node, catalog, hook_manager, session_id) │ │ 332 │ │ │ 333 │ for name in node.confirms: │ │ 334 │ │ catalog.confirm(name) │ │ │ │ /Users/user/.pyenv/versions/3.9.18/envs/evn-3.9/lib/python3.9/site-packages/kedro/runner/r │ │ unner.py:414 in _run_node_sequential │ │ │ │ 411 │ │ │ 412 │ for name in node.inputs: │ │ 413 │ │ hook_manager.hook.before_dataset_loaded(dataset_name=name, node=node) │ │ ❱ 414 │ │ inputs[name] = catalog.load(name) │ │ 415 │ │ hook_manager.hook.after_dataset_loaded( │ │ 416 │ │ │ dataset_name=name, data=inputs[name], node=node │ │ 417 │ │ ) │ │ │ │ /Users/user/.pyenv/versions/3.9.18/envs/evn-3.9/lib/python3.9/site-packages/kedro/io/data_ │ │ catalog.py:500 in load │ │ │ │ 497 │ │ │ >>> df = io.load("cars") │ │ 498 │ │ """ │ │ 499 │ │ load_version = Version(version, None) if version else None │ │ ❱ 500 │ │ dataset = self._get_dataset(name, version=load_version) │ │ 501 │ │ │ │ 502 │ │ self._logger.info( │ │ 503 │ │ │ "Loading data from '%s' (%s)...", name, type(dataset).__name__ │ │ │ │ /Users/user/.pyenv/versions/3.9.18/envs/evn-3.9/lib/python3.9/site-packages/kedro/io/data_ │ │ catalog.py:414 in _get_dataset │ │ │ │ 411 │ │ │ │ │ data_set_name, │ │ 412 │ │ │ │ ) │ │ 413 │ │ │ │ │ ❱ 414 │ │ │ self.add(data_set_name, data_set) │ │ 415 │ │ if data_set_name not in self._data_sets: │ │ 416 │ │ │ error_msg = f"Dataset '{data_set_name}' not found in the catalog" │ │ 417 │ │ │ │ /Users/user/.pyenv/versions/3.9.18/envs/evn-3.9/lib/python3.9/site-packages/kedro/io/data_ │ │ catalog.py:608 in add │ │ │ │ 605 │ │ │ if replace: │ │ 606 │ │ │ │ self._logger.warning("Replacing dataset '%s'", data_set_name) │ │ 607 │ │ │ else: │ │ ❱ 608 │ │ │ │ raise DatasetAlreadyExistsError( │ │ 609 │ │ │ │ │ f"Dataset '{data_set_name}' has already been registered" │ │ 610 │ │ │ │ ) │ │ 611 │ │ self._data_sets[data_set_name] = data_set │ ╰──────────────────────────────────────────────────────────────────────────────────────────────────╯ DatasetAlreadyExistsError: Dataset 'input_df' has already been registered ```

Your Environment

melvinkokxw commented 4 months ago

Here is a temporary fix, by creating all the catalog entries required before the pipeline starts running.

class ResolveDatasetsHooks:
    @hook_impl
    def before_pipeline_run(self, pipeline, catalog):

        data_sets = set()
        for node in pipeline.nodes:
            data_sets.update(node.outputs)
            data_sets.update(node.inputs)

        for ds in data_sets:
            catalog._get_dataset(ds)
noklam commented 4 months ago

Hey @melvinkokxw, love that you provide a clean script instead of a scaffold project, it's very easy for me to run this, appreciate your effort a lot ✨!

I suspect this is related to:

Can you try to change {name} -> {abc}? I try to change the Runner to SequentialRunner which is still failing, so maybe there is something wrong in the script. After I change the {name}, I get different error message and that may solve your problem already.

noklam commented 4 months ago

I manage to run this successfully, it is more of a problem of your script. Was it copied from old version of Kedro? Can you explain a little bit what you are trying to do? Maybe that will give us more context to come up with a better solution.

There are few problems:

  1. The catalog format is wrong, the type key is not intent properly.
  2. you need to wrap things under if __name__ == '__main__', which is a python thing when deal with multi-process/thread
  3. input_df doesn't exist, you cannot use default dataset with no data, I monkeypatch this with a lambda function to bypass the MemoryDataset checking.
import yaml
from kedro.io import DataCatalog
from kedro.pipeline import Pipeline, node
from kedro.runner import ThreadRunner
from kedro.runner.parallel_runner import ParallelRunner
from kedro.runner.sequential_runner import SequentialRunner

if __name__ == "__main__":
    catalog_yml = """
    "{name}":
        type: MemoryDataset
    """

    from kedro.io.memory_dataset import MemoryDataset
    MemoryDataset._load = lambda x: print("lambda!")
    catalog = yaml.safe_load(catalog_yml)

    io = DataCatalog.from_config(catalog)

    def return_dataframe(input_df):
        return "return!"

    pipeline = Pipeline(
        [
            node(
                func=return_dataframe, inputs="input_df", outputs="output_df1", name="node1"
            ),
            node(
                func=return_dataframe, inputs="input_df", outputs="output_df2", name="node2"
            ),
        ]
    )
    runner = ThreadRunner()
    # runner = ThreadRunner()
    runner.run(pipeline, io)
noklam commented 3 months ago

I am closing this issue due to no activity, I tried to reproduce this last time and it work expected. Please reopen with an valid example.