kedro-org / kedro-plugins

First-party plugins maintained by the Kedro team.
Apache License 2.0
94 stars 89 forks source link

PartitionedDataset - Allow for parallelization when saving and allow logging of exceptions #928

Open crash4 opened 2 years ago

crash4 commented 2 years ago

Description

Lately I have been working with PartitionedDataset a lot in a setting where I am processing many small files (think 30k+ files), all together > 30GB. Processing them sequentially in a node would require loading each one into memory, processing it and then keeping it in memory while processing all other files only to return it in a dict at the end for all files to be saved at once.

To solve this problem, I am only returning functions which are called when saving the dataset (to avoid memory problems). Since by definition, files in a PartitionedDataset should be independent (i.e. processing of one file should not influence the processing of others), we can save several at one time rather than saving them sequentially as is done right now in PartitionedDataset.

Another pain point is that when processing the files this way (only returning functions which do the processing at time of saving) doesnt allow me to drop a file if the processing fails (imagine having an assert for something inside the processing function). Right now, if this happens, the whole processing fails (for all files (that have not yet been run)). Instead, we could just have the call of the processing function in a try-except statement that tries to do the processing and if it fails, it logs the exception.

Context

This change would significantly speed up processing of PartitionedDatasets and handle several pain points I am having (described above).

Possible Implementation

(Optional) Suggest an idea for implementing the addition or change. I believe all of this can be done just in the PartitionedDataset class. I have "hacked" my own implementation using joblib, which kinda works. Unfortunately, joblib doesnt work well with the logging module, so it breaks logging functionality (the global logger is not propagated to child workers when using joblib).

A minimum working example (reimplementation of PartitionedDataset._save function for PartitionedDataset):


`    
from joblib import Parallel, delayed

def _save_partition(self, partition_data, partition_id):
        kwargs = deepcopy(self._dataset_config)
        partition = self._partition_to_path(partition_id)
        # join the protocol back since tools like PySpark may rely on it
        kwargs[self._filepath_arg] = self._join_protocol(partition)
        dataset = self._dataset_type(**kwargs)  # type: ignore
        if callable(partition_data):
            try:
                partition_data = partition_data()
            except Exception as e:
                logging.error(e)            
        dataset.save(partition_data)
def _save(self, data: Dict[str, Any]) -> None:
    if self._overwrite and self._filesystem.exists(self._normalized_path):
        self._filesystem.rm(self._normalized_path, recursive=True)
    Parallel(n_jobs=1, verbose=10)(delayed(self._save_partition)(partition_data, partition_id) for partition_id, partition_data in sorted(data.items()))
    self._invalidate_caches()

`

The n_jobs parameter specifies how many cpu cores to use. As I mentioned before, joblib breaks the logging functionality and this would have to be solved (I have only tried joblib, maybe multiprocessing or other libs may work better).

Also: the dataset should only be saved when partition_data = partition_data() doesnt fail.

Possible Alternatives

Using other multiprocessing libraries like multiprocessing.

datajoely commented 2 years ago

Hi @crash4 I completely get your reasoning here and also like your solution. In general parallelism in Python can be a pain and my fear is that it would be really difficult to mix this with the ParallelRunner.

For now I think you implementing a custom dataset is exactly the right thing and thank you for sharing your approach with the community. Our view is that in cases where users need to go a little off-piste from the 'general case' Custom/Derived datasets are absolutely the right call - from the Kedro core side this feels like something we won't implement centrally unless lots of people start demanding so on this issue!

roumail commented 2 years ago

+1 for having the possibility for enabling parallelism option for partitioned datasets..

edhenry commented 1 month ago

Just ran into this issue over the last few weeks (again) and just want to give this a +1. :)

Galileo-Galilei commented 1 month ago

As @datajoely said, this feels unlikely this ends up in the central codebase in the short run, but we definitely would accept a contribution to kedro_datasets as an experimental dataset. The contribution process is much lighter, I think your code can almost be released "as is". This would be shipped quickly and we can gather feedback before considering making it more "official".

EDIT : just saw that the issue is 3 years old ^^' but the comment still stands if someone wants to contribute with above code

astrojuanlu commented 1 week ago

My understanding is that the ask is very specific.

I'm adding this to our Inbox so that we decide whether this is something we'll do ourselves or let the community do it. In the meantime, as @Galileo-Galilei says, a contribution as an experimental dataset is more than welcome.