pytorch / data

A PyTorch repo for data loading and utilities to be shared by the PyTorch domain libraries.
BSD 3-Clause "New" or "Revised" License
1.12k stars 149 forks source link

Make `IterToMap` loading more lazily #454

Open ejguan opened 2 years ago

ejguan commented 2 years ago

🚀 The feature

Currently, IterToMap starts to load all data from prior IterDataPipe when the first __getitem__ is invoked here. https://github.com/pytorch/data/blob/13b574c80e8732744fee6ab9cb7e35b5afc34a3c/torchdata/datapipes/iter/util/converter.py#L78

We can stop loading data from prior IterDataPipe whenever we find the requested index. And, we might need to add a flag to prevent loading data multiple times.

Motivation, pitch

This would improve the performance if users simply iterate over the MapDataPipe as we don't need to pre-load everything at the beginning of the iteration, basically, simulating the behavior of IterDataPipe.

Alternatives

No response

Additional context

No response

linminhtoo commented 2 years ago

This would be very much appreciated for my use-case. I also would like to ask a related question: I start with a .csv/.parquet file which contains the unique ID of each data sample as well as some other metadata. I then apply a series of maps & filters as needed by my use-case. Finally, I need to generate the tensors needed by my model for training. The last step is expensive in both compute & memory.

I am using IterDataPipe for all the operations until the last step. For the last step, I have tried 3 cases: 1) define my own IterDataPipe 2) define my own MapDataPipe 3) use my defined IterDataPipe but run .to_map_datapipe() to convert it to MapDataPipe AFTER generating the tensors.

I found, somewhat to my surprise, that 3) gives the fastest training speed (in terms of number of samples per second) compared to 1) and even 2). In fact, 3) could be 2-4 times faster than 1). However, 3) is very memory intensive and simply infeasible even for a relatively small dataset (100k samples), since we need to load the whole IterDataPipe into memory in self._load_map().

I also found that 2) is slower than 3), but still faster than 1). The main problem with 2) is that handling errors during tensor generation in __getitem__ is much more complicated than just not yielding in __iter__ for 1). For example, I could return None if an error happened, but I have to tackle this None in collate_fn, and I may also encounter empty batches if the whole batch is None.

Preferably, I would have expected that 1) should be similarly as fast as 3), perhaps just a little slower, e.g. maybe collating an Iterator into batches may need some extra overhead. Please help me understand if I am missing anything here. I have tried num_workers = 0 or > 0, but these did not change my conclusions.

Also, should I open a new Issue for this?

ejguan commented 2 years ago

@linminhtoo Thanks for providing feedback on your use case. Could you please share the operation you are using for the last step? I am trying to understand why generating Tensor would require a custom DataPipe.

In terms of speed, could you please also share how you measure the speed? I think the overall time, 3 should be the same as 1. If we are talking about the speed per iteration, the (3) should be faster as all the loading logics happen at the beginning of the epoch, but (1) is loading data lazily per iteration.

linminhtoo commented 2 years ago

Thanks for your very prompt reply. Before the last step, the samples only contain the identity and metadata, such as paths. I'm in the cheminformatics space where we generate various features for a given molecule or a protein. We may, for example, generate morgan fingerprints of dimension 2048, or a large 3d surface mesh of the protein. These can be both computationally and memory expensive. I define a new class myself because there are many different features we want to generate for each sample.

As for the timing, I'm simply measuring the time taken for an epoch of training a very simple model (such that the bottleneck is more likely to be shifted to the dataloading) over about 100k samples. 3) can be as fast as 30 seconds for the 1st epoch, and even faster at 20 seconds from the 2nd epoch onwards. But it almost maxes my 32 gb RAM and is unscalable. For 1, it keeps my RAM nice and low, but could take 2 to 3 mins per epoch. I realized I have not tried 2 on 100k samples (on a smaller set of 3k samples, it was faster than 1), but I can double check that.

I may be misunderstanding something. If I define the computationally expensive step as 1 or 2, there should also be little difference in speed and memory right? 3 is not really usable in practice, but its sheer speedup relative to 1 is a little concerning.

linminhtoo commented 2 years ago

Hi, I have run 2) on the larger 100k samples dataset and I discovered the root cause why it is faster than 1).

For 2): MapDataPipe

For 1): IterDataPipe

As you can see, clearly, IterDataPipe is not utilizing the num_workers properly, in fact, it is slowed down by multiprocessing. In contrast, MapDataPipe can leverage multiprocessing. With num_workers = 0, both take about the same time per epoch, which is in line with expectations.

To be clear, I am doing exactly the same last step (the expensive step) in both cases, just the difference of whether it's a __iter__ or __getitem__ and whether it inherits from IterDataPipe or MapDataPipe respectively.

For sharding the IterDataPipe, I am simply following the instructions from the latest docs. The worker_init_fn is taken from https://github.com/pytorch/data/pull/458 This is a summary of 1), all remain IterDataPipe

# load data & parse as dict
dp = dp.parse_csv_as_dict()
# my maps & filters
...
# shuffle
dp = dp.shuffle()
# my expensive step, still as an IterDataPipe
dp = dp.get_features_as_iter(my_args)
# prepare for sharding
dp = dp.sharding_filter()
# make dataloader with custom worker_init_fn
dataloader = DataLoader(dp, num_workers=8, batch_size=48, pin_memory=True, worker_init_fn=worker_init_fn)

For 2), as MapDataPipe: the difference is here:

# dp is still IterDataPipe here
dp = dp.shuffle()
# i define custom IterDataPipe to assign integer as key for .to_map_datapipe(), similar to dp.add_index()
dp = dp.assign_integer_as_key()
# no RAM blow-up as I have not generated any feature tensors yet
dp = dp.to_map_datapipe()
# my expensive step, as a MapDataPipe
dp = dp.get_features_as_map(my_args)
# make dataloader
dataloader = DataLoader(dp, num_workers=8, batch_size=48, pin_memory=True)
linminhtoo commented 2 years ago

Sorry for another comment. I had a weird intuition to modify the order of operations for 1). I changed from

# shuffle
dp = dp.shuffle()
# my expensive step, still as an IterDataPipe
dp = dp.get_features_as_iter(my_args)
# prepare for sharding
dp = dp.sharding_filter()  # <--------
# make dataloader with custom worker_init_fn
dataloader = DataLoader(dp, num_workers=8, batch_size=48, pin_memory=True, worker_init_fn=worker_init_fn)

to

# shuffle
dp = dp.shuffle()
# prepare for sharding 
dp = dp.sharding_filter()  # <--------
# my expensive step, still as an IterDataPipe
dp = dp.get_features_as_iter(my_args)
# make dataloader with custom worker_init_fn
dataloader = DataLoader(dp, num_workers=8, batch_size=48, pin_memory=True, worker_init_fn=worker_init_fn)

Now, For 2): MapDataPipe

For 1): IterDataPipe

By placing the .sharding_filter() op BEFORE the expensive step, it can finally utilize the multiple cores properly. However, it is still slower than MapDataPipe (1.5min vs 1min).

ejguan commented 2 years ago

By placing the .sharding_filter() op BEFORE the expensive step, it can finally utilize the multiple cores properly.

It is definitely the right way to place shuffle and sharding as early as possible to get rid of redundant operations. We should add a few words on our tutorial https://pytorch.org/data/beta/tutorial.html#working-with-dataloader

However, it is still slower than MapDataPipe (1.5min vs 1min).

When you measure the total time per epoch, do you include the time creating iterator of DataLoader?

linminhtoo commented 2 years ago

By placing the .sharding_filter() op BEFORE the expensive step, it can finally utilize the multiple cores properly.

It is definitely the right way to place shuffle and sharding as early as possible to get rid of redundant operations. We should add a few words on our tutorial https://pytorch.org/data/beta/tutorial.html#working-with-dataloader

However, it is still slower than MapDataPipe (1.5min vs 1min).

When you measure the total time per epoch, do you include the time creating iterator of DataLoader?

I went back to my code to do more extensive and more rigorous speed benchmarks. As I realized the thread is geting a little long, I decided to create a new Issue and link to this one.