kedro-org / kedro

Kedro is a toolbox for production-ready data science. It uses software engineering best practices to help you create data engineering and data science pipelines that are reproducible, maintainable, and modular.
https://kedro.org
Apache License 2.0
9.86k stars 893 forks source link

`PartitionedDataset` caching prevents from finding files generated by prior nodes with `ParallelRunner` #4164

Open gtauzin opened 1 week ago

gtauzin commented 1 week ago

Description

I have a pipeline in which the first node generate files that are then picked up by a follow up node using a partition dataset. If I run this pipeline with ParallelRunner, the partition file list is created before the whole pipeline is ran, thus making it impossible to find the files created by the prior node.

Context

I have a pipeline with two nodes that is applied by source buckets using namespaces. I would like to have it run in parallel (one process per source).

The way I achieve this with Kedro is:

Node 1 will also return a boolean that node 2 takes an input so that the resulting DAGS has a dependency link from node 1 to node 2.

Steps to Reproduce

Here is what the pipeline code looks like:

def create_pipeline(**kwargs) -> Pipeline:
    def get_pipeline(namespace: str):
        template_pipeline = pipeline(
            [
                node(
                    concatenate_increment,
                    inputs="data_increment",
                    outputs=["concatenated_data_increment", "data_increment_concatenated"],
                    name="concatenate_increment",
                    confirms=f"{namespace}.data_increment", # This is needed as the incremental dataset is namespaced
                ),
                node(
                    concatenate_partition,
                    inputs=[
                        "partitioned_concatenated_data",
                        "data_increment_concatenated",
                    ],
                    outputs="extracted_data",
                    name="concatenate_partition",
                ),
            ],
        )

        return template_pipeline

    pipelines = pipeline(
        pipe=get_pipeline(namespace=SOURCES[0]),
        namespace=SOURCES[0],
    )
    for source in SOURCES[1:]:
        pipelines += pipeline(
            pipe=get_pipeline(namespace=source),
            namespace=source,
        )

    return pipelines

And the catalog:

"{source}.data_increment":
  type: partitions.IncrementalDataset
  path: data/01_raw//{source}/
  dataset:
    type: pandas.CSVDataset
  filename_suffix: ".csv"

"{source}.data_increment_concatenated":
  type: MemoryDataset

"{source}.concatenated_data_increment":
  type: pandas.ParquetDataset
  filepath: data/02_intermediate/{source}/concatenated_data.pq
  versioned: true

"{source}.partitioned_concatenated_data":
  type: partitions.PartitionedDataset
  path: data/02_intermediate/{source}/concatenated_data.pq/
  dataset:
    type: pandas.ParquetDataset
  load_args:
    withdirs: true
    max_depth: 1
  filename_suffix: ".pq"

"{source}.extracted_data":
  type: pandas.ParquetDataset
  filepath: data/02_intermediate/{source}/extracted_data.pq

Putting even a single parquet file for a single source in data/01_raw/source_1 and creating a pipeline from the template pipeline method for namespace set to source_1 allows to reproduce the bug. For the sake of clarity, I did not provide the concatenate_increment and concatenate_partition node function, but I can provide them if needed. They are basically just calling pd.concat.

Expected Result

Pipeline runs successfully and results running this pipeline using SequentialRunner or ParallelRunner are identical.

Actual Result

Pipeline runs fine with SequentialRunner, but when ran with ParallelRunner, it complains that there are no files in the partition:

kedro.io.core.DatasetError: No partitions found.

Your Environment

gtauzin commented 1 week ago

From @deepyaman on the kedro slack:

This seems quite possible, though, as _list_partitions is cached, so anything that attempts to access it may have hit this code: https://github.com/kedro-org/kedro-plugins/blob/main/kedro-datasets/kedro_datasets/partitions/partitioned_dataset.py#L257-L264 There is an exists check that was introduced in https://github.com/kedro-org/kedro/pull/3332 that is triggered before pipeline run, that can populate this cache. @Merel may have some more familiarity, since she worked on #3332

He also suggested to create a custom PartitionedDataset and remove the caching decorator on_list_partitions as a workaround. I can confirm that this workaround worked for me.