uber / petastorm

Petastorm library enables single machine or distributed training and evaluation of deep learning models from datasets in Apache Parquet format. It supports ML frameworks such as Tensorflow, Pytorch, and PySpark and can be used from pure Python code.
Apache License 2.0
1.76k stars 281 forks source link

make_torch_dataloader using TransformSpec applies transformation on entire dataframe (not lazy loading) #799

Closed davegabe closed 10 months ago

davegabe commented 10 months ago

Since my dataset (Nsynth dataset by Magenta Google) has really big size because of many audio files present and it can't be fully loaded in memory, I'm trying to lazy loading it. Currently I'm loading a JSON in a dataframe containing path to audio files and some other midi data (pitch, velocity and sammple rate) in order to synth audio from them. So I have a dataframe with the following schema:

note_str pitch velocity sample_rate
guitar_acoustic_01_C3.wav 89 50 16000
guitar_acoustic_02_F2.wav 41 127 16000
guitar_acoustic_03_A#5.wav 106 127 16000
... ... ... ...

So my idea is to use as TransformSpec a function which loads the audio files and also make the audios from midi and returns the following: spectrogram from original audio, spectrogram from synth audio, phase from original audio, phase from synth audio.

The problem comes when I use:

with converter_train.make_torch_dataloader(transform_spec=get_transform_spec(training=True), batch_size=64, workers_count=1) as train_dataloader:
    train_dataloader_iter = iter(train_dataloader)
    trainer.fit(model, train_dataloader_iter)

Basically when the make_torch_dataloader function runs, it fills up the memory and it seems like the TransformSpec is applied on the entire dataframe instead of lazy loading batches (in fact the only way to make it not fill up the memory and crash is to reduce the number of rows in the original dataframe). Since I have tried different approaches and I still get the same behaviour I would like to understand the problem and be sure that my idea is right or this is the expected behaviour.

selitvin commented 10 months ago

TransformSpec is applied to the entire row-group at once. Although, it's not the entire dataset, but it is likely to be pretty large, say hundreds of megabytes of compressed data.

davegabe commented 10 months ago

Thank you! I just realized that since my dataframe is small in size but it increase a lot after the transformation, I have to decrease the row group size in order to not fill the memory after that. In fact, as you said, it wasn't on the entire dataset but instead it was applied on the default group size (which was way too large in my case).

I share with you my solution in case it can be helpful to others in the future:

converter_train = make_spark_converter(df_train, parquet_row_group_size_bytes=avg_row_size*n)

I simply used as parameter parquet_row_group_size_bytes the number of rows n I want in a group times the average row size (computed before).