kedro-org / kedro

Kedro is a toolbox for production-ready data science. It uses software engineering best practices to help you create data engineering and data science pipelines that are reproducible, maintainable, and modular.
https://kedro.org
Apache License 2.0
10.03k stars 906 forks source link

ThreadRunner Dataset DatasetAlreadyExistsError: Dataset has already been registered #4250

Closed noklam closed 3 weeks ago

noklam commented 1 month ago

Description

Originated from https://github.com/kedro-org/kedro/pull/4210

Context

Upon investigation, I found that this error seems to be related to dataset factory pattern only.

  1. If dataset are registered dataset in catalog, I can run without any error from 0.18.12 (didn't go earlier since factory not introduced, to current release)
  2. If dataset factory is used, it starts breaking since 0.18.12

The current conclusion is that this is not an error introduced recently. Though there seems to be partial fix previously but it doesn't works for my test case.

Related:

Steps to Reproduce

Using a similar test written in https://github.com/kedro-org/kedro/pull/4210 from benchmark_runner.py for ThreadRunner.

This is the snippet that I use:

# Write the benchmarking functions here.
# See "Writing benchmarks" in the asv docs for more information.

import time
from pathlib import Path

import yaml

from kedro.io.data_catalog import DataCatalog
from kedro.pipeline import node
from kedro.pipeline.modular_pipeline import pipeline
from kedro.runner import ParallelRunner, SequentialRunner, ThreadRunner

# Simulate an I/O-bound task
def io_bound_task(input_data):
    time.sleep(2)  # Simulate an I/O wait (e.g., reading from a file)
    output = input_data
    return output

# Simulate a compute-bound task (matrix multiplication)
def compute_bound_task(input_data) -> str:
    # Simulate heavy compute that are not using multicore (not pandas/numpy etc)
    ans = 1
    for i in range(1, 50000):
        ans = ans * i
    return "dummy"

def create_data_catalog():
    """
    Use dataset factory pattern to make sure the benchmark cover the slowest path.
    """
    catalog_conf = """

'output_{pattern}':
    type: pandas.CSVDataset
    filepath: benchmarks/data/'{pattern}.csv'

'numpy_{pattern}':
    type: pickle.PickleDataset
    filepath: benchmarks/data/'{pattern}.pkl'

'{catch_all}':
    type: pandas.CSVDataset
    filepath: benchmarks/data/data.csv

# dummy_1:
#     type: pandas.CSVDataset
#     filepath: benchmarks/data/data.csv

# dummy_8:
#     type: pandas.CSVDataset
#     filepath: benchmarks/data/data.csv

# dummy_2:
#     type: pandas.CSVDataset
#     filepath: benchmarks/data/data.csv

# dummy_3:
#     type: pandas.CSVDataset
#     filepath: benchmarks/data/data.csv

# dummy_4:
#     type: pandas.CSVDataset
#     filepath: benchmarks/data/data.csv

# dummy_5:
#     type: pandas.CSVDataset
#     filepath: benchmarks/data/data.csv

# dummy_6:
#     type: pandas.CSVDataset
#     filepath: benchmarks/data/data.csv

# dummy_7:
#     type: pandas.CSVDataset
#     filepath: benchmarks/data/data.csv
"""
    catalog_conf = yaml.safe_load(catalog_conf)
    catalog = DataCatalog.from_config(catalog_conf)
    return catalog

def create_io_bound_node(inputs=None, outputs=None, name=None):
    io_node = node(io_bound_task, inputs=inputs, outputs=outputs, name=name)
    return io_node

def create_io_bound_pipeline():
    dummy_pipeline = pipeline(
        [
            create_io_bound_node("dummy_1", "output_1"),
            create_io_bound_node("dummy_2", "output_2"),
            create_io_bound_node("dummy_3", "output_3"),
            create_io_bound_node("dummy_4", "output_4"),
            create_io_bound_node("dummy_5", "output_5"),
            create_io_bound_node("dummy_6", "output_6"),
            create_io_bound_node("dummy_7", "output_7"),
            create_io_bound_node("dummy_1", "output_8"),
            create_io_bound_node("dummy_1", "output_9"),
            create_io_bound_node("dummy_1", "output_10"),
        ]
    )
    return dummy_pipeline

if __name__ == "__main__":
    catalog = create_data_catalog()
    test_pipeline = create_io_bound_pipeline()
    runner_obj = ThreadRunner()
    runner_obj.run(test_pipeline, catalog=catalog)

Run this multiple times to confirm it fails (non-deterministic fail due to race condition). Then uncomment the dummy_x dataset to pre-register it, now it always pass.

Expected Result

Actual Result

-- If you received an error, place it here.
-- Separate them if you have more than one.

Your Environment

noklam commented 1 month ago

We discussed this in private, the conclusion is that it's exactly the same issue users report before. https://github.com/kedro-org/kedro/pull/4093/ attempts to fix this, but it only fix the path where user start a KedroSession, i.e. kedro run. This is why it's breaking for the benchmark tests, and the unit tests that users created.

We agreed the temporary fix should goes into ThreadRunner. For longer term, that may goes into catalog instead. In parallel there are needs for listing catalog with pattern, so it's something we need to consider for Catalog & Runner re-design:

ElenaKhaustova commented 1 month ago

Based on the solution proposed for lazy loading https://github.com/kedro-org/kedro/issues/3935#issuecomment-2433144639 we suggest moving the warm-up to the AbstractRunner before we call _run() and making this logic common for all runners - https://github.com/kedro-org/kedro/blob/a5d9bb40380c598bf7d03cb16623026892844ed4/kedro/runner/runner.py#L115

Further, we replace this logic with lazy loading warm-up which will be common for all the runners as well.

ElenaKhaustova commented 3 weeks ago

Solved in https://github.com/kedro-org/kedro/pull/4262