kedro-org / kedro

Kedro is a toolbox for production-ready data science. It uses software engineering best practices to help you create data engineering and data science pipelines that are reproducible, maintainable, and modular.
https://kedro.org
Apache License 2.0
9.91k stars 900 forks source link

IncrementalDataSet lazy loading #2456

Closed PtrBld closed 1 year ago

PtrBld commented 1 year ago

Description

We are using IncrementalDataSets since we get new data every week and want only process the new data. However, the data is very large and every week we get multiple partitions that are processed on a weekly basis. This data is then too large to be loaded into RAM at once. As far as we understand there is no way to use IncrementalDataSets but still load every partition on its own similar to the PartitionedDataSet.

Possible Implementation

Possibly this could be solved by not calling the load function in https://github.com/kedro-org/kedro/blob/05915901d43351766669eca50c3a2fcd6246bffb/kedro/io/partitioned_dataset.py#L532-L534, but return the function instead. This would be the same functionality as provided in the PartitionedDataSet and would also simplify switching between PartitionedDataSet and IncrementalDataSet without changing the node code.

astrojuanlu commented 1 year ago

Hi @PtrBld, thanks for opening this issue! Do you think https://github.com/kedro-org/kedro/issues/2374 is close enough to your use case?

PtrBld commented 1 year ago

Hi @astrojuanlu, thanks for the very quick response. I think we could create something like a LazyLoadIncrementalDataSet that inherits from IncrementalDataSet and just overrides the load function to not call load, if this is what you are hinting at. I was just wondering if there was a specific design decision that lead to the loading of the data in the IncrementalDataSet?

As I understand kedro the different data sets should be as close as possible in their behavior especially since IncrementalDataSet directly inherits from PartitionedDataSet

astrojuanlu commented 1 year ago

I don't have a good answer to that, will defer to the Kedro engineering team to address that question.

People with access to the old repository can hint to https://github.com/quantumblacklabs/private-kedro/issues/208

HOZHENWAI commented 1 year ago

Hi, I also have the same issue. I also have the same question about the different behavior, there is probably a usecase but I fail to see it.

noklam commented 1 year ago

@PtrBld Will you be happy to make a PR to fix this? From the top of my head I think it should have same behavior as Partitioned Dataset and I can't remember why it is not. Could be an oversight but I am not sure.

PtrBld commented 1 year ago

I am happy to create a PR. However, this will be breaking changes to the current behavior. Maybe we could add this as an option to partitioned and incremental data sets?

jmholzer commented 1 year ago

Tech Design 21/06/23:

Intro to problem (5 mins)

PartitionedDataSet and IncrementalDataSet are datasets used to handle data that is split across multiple files.

PartitionedDataSet: This class loads and saves data split across multiple files. These files can exist locally or remotely (handled by fsspec, like our other datasets). When loaded, the PartitionedDataSet returns a dictionary where the keys are the partitions (i.e., individual file identifiers) and the values are callables that load data from these partitions. This is useful for working with large datasets that can be processed in chunks, or when you have data that is naturally partitioned, like time-series data stored by date.

IncrementalDataSet: The IncrementalDataSet is a subclass of PartitionedDataSet. It adds the functionality of checkpoints to keep track of already processed partitions. The information about the last processed partition is stored in a so-called checkpoint file, so that subsequent pipeline runs will only load new partitions past the checkpoint. This is useful for large, frequently updated datasets where only the new partitions are processed, saving compute resources.

PartitionedDataSet handles the division of a dataset into separate, loadable partitions, while IncrementalDataSet further allows for tracking and loading of only the unprocessed partitions in subsequent runs.

Additional context (5 mins)

In PartitionedDataSet, the _load method prepares a dictionary where each key is a partition identifier, and the corresponding value is a callable (a function) that, when invoked, loads data from the respective partition. Data is not actually loaded during the _load method execution in PartitionedDataSet -- it prepares the ground to load data when needed.

In IncrementalDataSet, on the other hand, the _load method works differently. Similar to the PartitionedDataSet, it prepares a dictionary with partition identifiers as keys. But, the values are not the callables; instead, they are the actual loaded data from respective partitions. However, it only does this for partitions that are 'new', i.e., those that are not yet processed and are beyond the last saved checkpoint.

So, in essence, while PartitionedDataSet's _load method prepares a dictionary of functions to load data later (lazy loading), IncrementalDataSet's _load method provides a dictionary of already loaded data but only for new, unprocessed partitions (eager loading).

This is a problem, because eager loading of the data in IncrementalDataSet can cause the instance running Kedro to run out of memory if the amount of new data is greater than the available RAM. Moreover, this is inconsistent with the approach in PartitionedDataSet, which uses lazy loading by default.

Clarification questions on 1. and 2. (10 mins)

Investigate DEFAULT_CHECKPOINT_TYPE = "kedro.extras.datasets.text.TextDataSet" in definition of IncrementalDataSet. Lazy loading was added later as OSS contribution, could be that contributor was only using PartitionedDataSet OR because no way in Kedro to save each increment separately, all partitions would have had to be combined anyway.

Introduction to suggested solutions (5 mins)

Solution 1 (breaking change): Modify IncrementalDataSet to use lazy loading by default. The advantage of this is that both datasets will have consistent behaviour. The disadvantage is that this change is breaking and will have to wait for 0.19 to get released. Solution 2 (non-breaking change): Modify IncrementalDataSet to accept an optional flag for whether loading should be lazy or eager, with the same default behaviour as before. The advantage is that this change would not be breaking and could be released before 0.19. The disadvantage is that its behaviour is now inconsistent with PartitionedDataSet. Optionally, we could modify PartitionedDataSet to have this optional argument also.

Discussion on solution and alternatives (20 mins)

Summary and next steps (5 mins)

Four steps:

PtrBld commented 1 year ago

I have one minor additional comment. I think making lazy loading the default is good. However, if you want to keep both options and add an optional flag for that, it would make sense to not return the dictionary from the _load function but rather a class wrapper. This way the lazy and eager version of the dataset could have the same interface and the versions could be easily swapped (no more callables but rather a function to access the values)

noklam commented 1 year ago

@PtrBld we have just discussed this and I believe there will be a summary later.

In short we will move this to kedro-datasets and this allow us to have more flexible changes while not breaking anything.

jmholzer commented 1 year ago

I have one minor additional comment. I think making lazy loading the default is good. However, if you want to keep both options and add an optional flag for that, it would make sense to not return the dictionary from the _load function but rather a class wrapper. This way the lazy and eager version of the dataset could have the same interface and the versions could be easily swapped (no more callables but rather a function to access the values)

Hey @PtrBld, thanks for this. Our decision in tech design was to move these DataSets from kedro.io into kedro-datasets and to have lazy loading be the default and only option for both of these.

As such, I'll close your PR in the kedro repository, but please re-open it on kedro-plugins (or let us know if you'd like us to do it for you 🙂). If you decide to open it yourself, note that you'll have to move the whole file (and tests) tokedro-datasets, inside a new subpackage. We'll work with you to get the PR ready for release.

We'll separately open another PR on kedro (dev branch) that removes these DataSets.

jmholzer commented 1 year ago

Closing this issue in favour of the new issues: