pytorch / data

A PyTorch repo for data loading and utilities to be shared by the PyTorch domain libraries.
BSD 3-Clause "New" or "Revised" License
1.13k stars 151 forks source link

Notes on shuffling, sharding, and batchsize #302

Open NicolasHug opened 2 years ago

NicolasHug commented 2 years ago

(I'm writing this down here to have a written trace, but I'm looking forward to discuss this with you all in our upcoming meetings :) )

I spent some time porting the torchvision training recipes to use datapipes, and I noticed that the model I trained on ImageNet with DPs was much less accurate than the one with regular datasets. After a lot of digging I came to the following conclusion:

  1. the datapipe must be shuffled before it is sharded
  2. the DataLoader does not behave in the same way with a datapipe and with a regular indexable dataset, in particular when it comes to size of the last batches in an epoch. This has a dramatic effect on accuracy (probably because of batch-norm).

Details below. Note: for sharding, I used this custom torchvision sharder which takes DDP and dataloader workers into account, + the TakerIterDataPipe below it.


Shuffle before shard

First, some quick results (training a resnext50_32x4d for 5 epochs with 8 GPUs and 12 workers per GPU): Shuffle before shard: Acc@1 = 47% -- this is on par with the regular indexable dataset version (phew!!) Shuffle after shard: Acc@1 = 2%

One way to explain this is that if we shuffle after we shard, then only sub-parts of the dataset get shuffled. Namely, each of the 8 * 12 = 96 dataloader workers receive ~1/96th of the dataset, and each of these parts get shuffled. But that means that the shuffling is far from uniform and for datasets in which the layout is all_samples_from_class1, all_samples_from_class2, ... all_samples_from_classN, it's possible that some class i is never in the same batch as class j.

So it looks like we need to shuffle before we shard. Now, if we shuffle before sharding, we still need to make sure that all of the 96 workers shuffle the dataset with the same RNG. Otherwise we risk sampling a given sample in more than one worker, or not at all. For that to happen, one can set a random seed in worker_init_fn, but that causes a second problem: the random transformations of each worker will also be the same, and this will lead to slightly less accurate results; on top of that, all epochs will start with the same seed, so the shuffling is the same across all epochs. I do not know how to solve this problem yet.

Note that TF shuffles the dataset before storing it. We might do something similar, but that would still not solve the issue for custom users datasets.


Size of the batches at the end of an epoch

Some quick results (same experiment as above):

with drop_last=True: Acc@1 = 47% with drop_last=False: Acc@1 = 11%

Near the end of the epoch, the dataloader with DP will produce a lot of batches with size 1 if drop_last is False. See the last batches of an epoch on indices from [0, len(imagenet)) with a requested batch size of 32: https://pastebin.com/wjS7YC90. In contrast, this does not happen when using an indexable dataset: https://pastebin.com/Rje0U8Dx.

I'm not too sure of why this has such a dramatic impact, but it's possible that this has to do with batch-norm, as @fmassa pointed out offline. Using drop_last will make sure that the 1-sized batches are eliminated, producing a much better accuracy.

I guess the conclusion here is that it's worth unifying the behaviour of the DataLoader both DPs and regular indexable datasets regarding the batch size, because with indexable datasets and drop_last=False we still get ~47% acc.

ejguan commented 2 years ago

Thank you for adding the detailed notes.

So it looks like we need to shuffle before we shard. Now, if we shuffle before sharding, we still need to make sure that all of the 96 workers shuffle the dataset with the same RNG. Otherwise we risk sampling a given sample in more than one worker, or not at all.

I totally understand this problem. This is one thing we want to solve during this half. For each shuffler, it should contain their own RNG rather than using a global RNG. For the same seed, I am actually not sure if we want to have the same seed across processes, I believe you want the same seed to make sure all data are covered by a single epoch. In the long run, if the seed for each shuffler is different with each other, the performance may be similar. (Need an experiment)

Near the end of the epoch, the dataloader with DP will produce a lot of batches with size 1 if drop_last is False. See the last batches of an epoch on indices from [0, len(imagenet)) with a requested batch size of 32: https://pastebin.com/wjS7YC90. In contrast, this does not happen when using an indexable dataset, where the dataloader will produce batches of size 18: https://pastebin.com/Rje0U8Dx.

Dataset produces 8 batches with size 18. IterDataPipe produces 96 batches with size 1 at the last iteration. This should be some discrepancy over sampling mechanism in DataLoader. Will take a look

ejguan commented 2 years ago

Dataset produces 8 batches with size 18. IterDataPipe produces 96 batches with size 1 at the last iteration. This should be some discrepancy over sampling mechanism in DataLoader. Will take a look

I went through the example again and found the reason about this discrepancy. For Dataset, it relies on the sampler on main process to distribute the workload. For the last batch, the main process would send a batch of indices to a single worker process per distributed process. That's why each rank would send 1 batch with size 18 at the end. For IterDataPipe, each process would own 1/96 dataset on each process evenly. Each worker process would take care of the last batch by themselves -> 1 sample. Then, each rank (distributed process) would return 12 batches with size 1. In order to solve it, IMHO, we have two options:

  1. Make sure batching only happens on a single process a.k.a. non-shardable DataPipe
  2. At the end of the pipeline, create a non-shardable DataPipe to aggregate samples from workers to generate batches. (This would be similar to the internal data source structure -> making sure batching only happens at the end.)

cc: @VitalyFedyunin

ejguan commented 2 years ago

Do you think adding a batch_size to DataLoaderV2 to guarantee batching at the end can be an option? Then, users can still do batching/unbatching in the pipeline. cc: @VitalyFedyunin

datumbox commented 2 years ago

I believe you want the same seed to make sure all data are covered by a single epoch. In the long run, if the seed for each shuffler is different with each other, the performance may be similar. (Need an experiment)

@ejguan I think ensuring that each image is processed only once per training epoch is a very common and reasonable assumption that we should maintain. Note that there techniques such as Repeated Augmentation that rely on agreeing on using a single subset of data across processes and then generate augmentations on top of them. Though there might be a way to go around it with your new API/approach, this technique shows that using different seeds across processes has a material effect on accuracies and can't be assumed to be necessarily similar.

I'm not too sure of why this has such a dramatic impact, but it's possible that this has to do with batch-norm, as @fmassa pointed out offline.

@NicolasHug This is a very reasonable guess especially since you proven that many batches get a sample size of 1. This is very likely to make the mini-batch statistics extremely noisy and break training. There is a way to confirm that this is the problem. You can replace BN with SyncBatchNorm to synchronize the statistics across the different batches. It's going to be slower (so it's not a viable solution to your problem) but it's going to give you evidence on whether the BN is to blame here. Not sure how useful this would be for you though as I agree with you that at any case we should consider aligning the behaviour with the old DataLoader.

ejguan commented 2 years ago

@datumbox Thanks for your insights on the effect of random seed. I agree the shuffler should share the same seed across processes to make sure all images are processed by an epoch.

Here is my thinking about the random seed. Whatever random operations before sharding should use the same random seed to make sure the whole datasets are the same across process. I wish I can gather your opionion on the random operations like random augmentation after sharding? Do you expect they use different random seed? For example, we have two shards:

datumbox commented 2 years ago

@ejguan Thanks for getting back to me so promptly.

I would expect that shard1 and shard2 have different augmentation strategies similar to what the current DataLoader is doing. As Nicolas said earlier, having the same augmentations systematically across batches can lead to introduction of biases and thus less accurate results. I think many of the assumptions of the current DataLoader have become common assumptions on research and moving away from them can lead unexpected results.

abhi-mosaic commented 2 years ago

I think ensuring that each image is processed only once per training epoch is a very common and reasonable assumption that we should maintain.

@datumbox Hi! I'm an external user and was just following this thread as I am also very interested in sharding/shuffling behavior with DataPipes. I noticed your comment above and just wanted to clarify one thing:

The assumption of "each image is processed only once per training epoch" is not guaranteed true when DDP is used along with drop_last: False, even for indexed datasets. PyTorch requires equal device batch sizes at each step, and so at the end of the epoch, if some GPUs have a smaller final batch than others, they repeat some samples to get a full final batch.

You can see the codepath in DistributedSampler here: https://github.com/pytorch/pytorch/blob/9d05ce602e1610e43f2aaa904b34d6946f8e7a1a/torch/utils/data/distributed.py#L104-L110

Dataset with 10 samples, 4 GPUs, device_batch_size=2, global_batch_size=8, shuffle=False, drop_last=False:
Step 1: device batches are {0: [0, 4], 1: [1, 5], 2: [2, 6], 3: [3, 7]}
Step 2: device batches are {0: [8], 1: [9], 2: [0], 3: [1]}

Devices 2 and 3 have to repeat some samples on Step 2 in order for PyTorch DDP to work. So within a single epoch, a total of 12 samples have been processed, not 10. Note that if only 1 or 2 GPUs had been used, then exactly 10 samples would have been processed.

Anyways, just wanted to share, the difference in behavior is pretty small, especially when shuffe=True, the 2 repeated samples are drawn randomly each epoch. But basically I think it would be OK for DataPipes to also repeat some samples per-device in DDP mode as we never had that guarantee to begin with.

datumbox commented 2 years ago

@abhi-mosaic Thanks for the input.

Though I agree with the technical details you provided, it's important to note how the two points being made here differ:

  1. Currently it's true that Batch padding is very common and at the end of the epoch more samples are drawn if drop_last=False. In this case as you correctly noted above, it's going to be a very small number of images as the number of duplicate images can't exceed the total batch size. This is not likely to affect training much but still creates issues on the inference side (@NicolasHug has this documented nicely at https://github.com/pytorch/vision/issues/4559).
  2. This discussion focuses primarily on whether the seed on the shuffler should be the same across all processes. If this is not true, then the amount of duplicate images sampled within the epoch will be much more significant than on the above.

So the assumption of having each image is processed only once per training epoch, is in practice true at the moment, with exception the corner-case of batch padding listed above and that's what I'm advocating we should maintain. If we don't keep the seed the same, we risk being unable to reproduce past results using the new API, introducing large systematic biases on training and being unable to implement techniques similar to Repeated Augmentation properly.

I think it would be OK for DataPipes to also repeat some samples per-device in DDP mode

Agreed and that's why I think we should make sure that the new solution behaves similarly and predictably to the old implementation.

NicolasHug commented 2 years ago

@ejguan I think your suggestion of keeping the seed constant across workers up until the sharding is reasonable. As @datumbox re-itereated, we need different seeds after that. Let's discuss this more in depth in today's meeting, along with a robust user-friendly API, following up on https://github.com/pytorch/data/issues/352#issuecomment-1091758594

franchesoni commented 2 years ago

The user friendly API would be welcome :smile:

My understanding was that DataLoader could be freed from shuffling if using datapipes, but this is not the case as shown in the example below.

I'm trying to implement a quite simple datapipe (which involves only shuffling and mapping) and it doesn't really work. In the following simplified example, I can't get the shuffling behavior (nor the expected worker behavior, but this was already reported in #352)

from torch.utils.data.datapipes.iter import IterableWrapper
from torch.utils.data import DataLoader

def dummy_dp(n=10):
    dp = IterableWrapper(range(n))
    dp = dp.shuffle()
    return dp

dp = dummy_dp(10)
dl = DataLoader(dp, batch_size=None, shuffle=False, num_workers=1, pin_memory=False, drop_last=False)
print([e for e in dl])

would give

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

I would really appreciate a simple working example of the above. This is, with shuffling in the datapipe only and num_workers>1. Or an explanation of what to do before DataLoaderV2. Thank you :smile:

franchesoni commented 2 years ago

Answering my own question, setting shuffle=False in the DataLoader will stop the shuffling in the IterDataPipe. Setting shuffle=None will let things be as they should be. This is now the default.

What is non intuitive to me is the false separation between the datapipe and the dataloader. The dataloader still has control over the shuffling! I leave this only as an anecdote, you know way better than me the reasons behind this choice

franchesoni commented 2 years ago

Find here one example showing how things are not yet solved :cry: I would love to know if you came up with a solution in your meeting

from typing import Iterator, Any
from torch.utils.data.datapipes.iter import IterableWrapper
from torch.utils.data import DataLoader
import torch
from torch.utils.data.datapipes.datapipe import IterDataPipe
import torch.distributed as dist

class SharderDataPipe(torch.utils.data.datapipes.iter.grouping.ShardingFilterIterDataPipe):
    def __init__(self, source_datapipe: IterDataPipe) -> None:
        super().__init__(source_datapipe)
        self.rank = 0
        self.world_size = 1
        if dist.is_available() and dist.is_initialized():
            self.rank = dist.get_rank()
            self.world_size = dist.get_world_size()
        self.apply_sharding(self.world_size, self.rank)

    def __iter__(self) -> Iterator[Any]:
        num_workers = self.world_size
        worker_id = self.rank
        worker_info = torch.utils.data.get_worker_info()
        if worker_info is not None:
            worker_id = worker_id + worker_info.id * num_workers
            num_workers *= worker_info.num_workers
        self.apply_sharding(num_workers, worker_id)
        yield from super().__iter__()

def dummy_dp_0(n, num_workers):
    dp = IterableWrapper(range(n))
    dp = dp.shuffle()
    dp = SharderDataPipe(dp)
    return dp

def dummy_dp_1(n, num_workers):
    dp = IterableWrapper(range(n))
    dp = SharderDataPipe(dp)
    dp = dp.shuffle()
    return dp

def dummy_dp_2(n, num_workers):
    dp = IterableWrapper(range(n))
    dp = dp.shuffle(buffer_size=n)
    dp = SharderDataPipe(dp)
    dp = dp.shuffle()
    return dp

num_workers = 2
for dp in [dummy_dp_0(n=10, num_workers=num_workers),
           dummy_dp_1(n=10, num_workers=num_workers),
           dummy_dp_2(n=10, num_workers=num_workers)]:
    torch.manual_seed(0)
    dl = DataLoader(dp, batch_size=None, shuffle=None, num_workers=num_workers, pin_memory=False, drop_last=False)
    print([e for e in dl])

gives

[3, 7, 0, 1, 5, 6, 6, 0, 9, 4]  # shuffled but with repeated items
[2, 3, 4, 7, 0, 5, 6, 1, 8, 9]  # shuffled but with even number at even indices
[5, 7, 9, 0, 3, 4, 0, 6, 6, 1]  # shuffled but with same repeated items as in the first one

What seems intuitive to me is that each worker has access to a different local seed and to a global seed. Then in .shuffle the user could be able to define which seed to use. I know managing random numbers is not easy, but this could be useful for other purposes (e.g. if we want to apply the same random operation over the whole batch). It would be nice to know how you solved this

ejguan commented 2 years ago

@franchesoni Thank you for taking exploration of DataPipe and DataLoader.

dl = DataLoader(dp, batch_size=None, shuffle=False, num_workers=1, pin_memory=False, drop_last=False)

Answering my own question, setting shuffle=False in the DataLoader will stop the shuffling in the IterDataPipe. Setting shuffle=None will let things be as they should be. This is now the default.

Yeah. Either setting shuffle=True or shuffle=None would make turn on shuffle in DataPipe graph. I think the reanson that the default value if shuffle=None (enable shuffle by default) is that it should reflect the same behavior if users iterate directly over DataPipe rather than using DataLoader.

ejguan commented 2 years ago

@franchesoni

Find here one example showing how things are not yet solved 😢 I would love to know if you came up with a solution in your meeting

from typing import Iterator, Any
from torch.utils.data.datapipes.iter import IterableWrapper
from torch.utils.data import DataLoader
import torch
from torch.utils.data.datapipes.datapipe import IterDataPipe
import torch.distributed as dist

class SharderDataPipe(torch.utils.data.datapipes.iter.grouping.ShardingFilterIterDataPipe):
    def __init__(self, source_datapipe: IterDataPipe) -> None:
        super().__init__(source_datapipe)
        self.rank = 0
        self.world_size = 1
        if dist.is_available() and dist.is_initialized():
            self.rank = dist.get_rank()
            self.world_size = dist.get_world_size()
        self.apply_sharding(self.world_size, self.rank)

    def __iter__(self) -> Iterator[Any]:
        num_workers = self.world_size
        worker_id = self.rank
        worker_info = torch.utils.data.get_worker_info()
        if worker_info is not None:
            worker_id = worker_id + worker_info.id * num_workers
            num_workers *= worker_info.num_workers
        self.apply_sharding(num_workers, worker_id)
        yield from super().__iter__()

def dummy_dp_0(n, num_workers):
    dp = IterableWrapper(range(n))
    dp = dp.shuffle()
    dp = SharderDataPipe(dp)
    return dp

def dummy_dp_1(n, num_workers):
    dp = IterableWrapper(range(n))
    dp = SharderDataPipe(dp)
    dp = dp.shuffle()
    return dp

def dummy_dp_2(n, num_workers):
    dp = IterableWrapper(range(n))
    dp = dp.shuffle(buffer_size=n)
    dp = SharderDataPipe(dp)
    dp = dp.shuffle()
    return dp

num_workers = 2
for dp in [dummy_dp_0(n=10, num_workers=num_workers),
           dummy_dp_1(n=10, num_workers=num_workers),
           dummy_dp_2(n=10, num_workers=num_workers)]:
    torch.manual_seed(0)
    dl = DataLoader(dp, batch_size=None, shuffle=None, num_workers=num_workers, pin_memory=False, drop_last=False)
    print([e for e in dl])

gives

[3, 7, 0, 1, 5, 6, 6, 0, 9, 4]  # shuffled but with repeated items
[2, 3, 4, 7, 0, 5, 6, 1, 8, 9]  # shuffled but with even number at even indices
[5, 7, 9, 0, 3, 4, 0, 6, 6, 1]  # shuffled but with same repeated items as in the first one

What seems intuitive to me is that each worker has access to a different local seed and to a global seed. Then in .shuffle the user could be able to define which seed to use. I know managing random numbers is not easy, but this could be useful for other purposes (e.g. if we want to apply the same random operation over the whole batch). It would be nice to know how you solved this

As a work around for now, you have to provide worker_init_fn to DataLoader to make sure the global seed is shared across workers. And, shuffle is using python random rather than torch.random.

For

[2, 3, 4, 7, 0, 5, 6, 1, 8, 9]  # shuffled but with even number at even indices

This is the result if you do sharding before shuffle as you are shuffling each shard of dataset rather than the whole dataset.

And, the ultimate solution should be something like the following (This needs DataLoader2 landed into TorchData):

dp = IterableWrapper(range(n))
dp = dp.shuffle()
dp = SharderDataPipe(dp)
dl = DataLoader2(dp, ...)

for epoch in range(2):
    dl.set_seed(0)
    print(list(dl)) 

There are a few features need to be done:

franchesoni commented 2 years ago

Thank you for the explanation. If I understand well, in the future, a shuffle datapipe will carry a random number generator (RNG) that will be in sync when the datapipe is distributed across workers. This RNG will be initialized with a seed provided by DataLoader2. This would also solve the augmentation problem, as by default will depend on different seeds on different workers (this solution is unchanged).

Do we have today any solution that solves both shuffling and augmentation?

A modification of your example

dp = IterableWrapper(range(n))
dp = dp.shuffle()
dp = SharderDataPipe(dp)
dp = dp.map(lambda x: random_augment(x))  # franchesoni added this line
dl = DataLoader2(dp, ...)

for epoch in range(2):
    dl.set_seed(0)
    print(list(dl)) 

should work well. But where (or when) can I access all these new cool functionalities?

ejguan commented 2 years ago

@franchesoni My current plan is to add a DataPipe to wrap a RNG for argumentation. So the example will become:

dp = IterableWrapper(range(n))
dp = dp.shuffle()
dp = SharderDataPipe(dp)
dp = dp.map(lambda x: random_augment(x))  # franchesoni added this line
dp = dp.random_op(rng=random.Random())  # the API is not determined yet.
dl = DataLoader2(dp, ...)

for epoch in range(2):
    dl.set_seed(0)
    print(list(dl)) 

LMK if this design works for you. And, for random_op, DataLoader2 will provide different seed based on workers. And, this functionality only applies to DataLoader2. For BC of DataLoader, I might provide another API for shuffle to set seed for the isolated RNG, and you can still use worker_init_fn to control the random state for augmentation. The example should be:

dp = IterableWrapper(range(n))
dp = dp.shuffle().set_seed(0)
dp = SharderDataPipe(dp)
dp = dp.map(lambda x: random_augment(x))  # franchesoni added this line
dp = dp.random_op(rng=random.Random())  # the API is not determined yet.
dl = DataLoader2(dp, ...)

for epoch in range(2):
    dl.set_seed(0)
    print(list(dl)) 

But where (or when) can I access all these new cool functionalities?

Adding isolated RNG and API would be my high-pri next week. DataLoader2 should be landed to TorchData around this week.

NicolasHug commented 2 years ago
dl.set_seed(0)

This might be ambiguous with all the other types of RNGs that are at play here. It's not clear from the API that this only affects the shuffling seed. Perhaps set_epoch() would work here, and be consistent with the DistributedSampler()?

ejguan commented 2 years ago

This might be ambiguous with all the other types of RNGs that are at play here. It's not clear from the API that this only affects the shuffling seed.

The seed for DataLoader2 is not designed to only provide seed for shuffling ops. The seed is used by DataLoader2 to generate seeds for both shuffle and random_op. And, the difference would be the seeds varies per process for random_op but shuffle would share the same seed for all processes.

ejguan commented 2 years ago

But, this requires users to add this random_op wrapper DataPipe to let DataLoader2 that there is random operation in the wrapped DataPipe.

NicolasHug commented 2 years ago

Is there a way for the DataLoader2 to be aware of a specific shuffling seed? The proposed solution above exposes various implementation details that IMHO are still fairly complex and easy for users to get wrong.

In the case of map-style datasets, all these details are properly handled by the sampler or by the DataLoader, and users don't really need to worry about the RNG at all (except maybe with DistributedSampler.set_epoch()). From a user perspective, it'd be fantastic if the datapipe API could be as simple as for map-style datasets.

ejguan commented 2 years ago

In the case of map-style datasets, all these details are properly handled by the sampler or by the DataLoader, and users don't really need to worry about the RNG at all

I am not sure about it. Users still need to specify seeds for all global RNGs using worker_init_fn right? And, even with Sampler, if users want to control shuffle state, users have to manually set seed to torch like:

def seed_worker(worker_id):
    random.seed(worker_id)
    ...

ds = Dataset() # Dummy Map-style Dataset
for epoch in range(2):
    torch.manual_seed(epoch)
    dl = DataLoader(ds, shuffle=True, worker_init_fn=seed_worker, num_workers=2)

For DataLoader2, if users don't want to manually set seed for shuffle and other random ops, we would let DataLoader2 to get seed based on time and make sure all shuffle ops sharing the same seeds.

dp = IterableWrapper(range(n))
dp = dp.shuffle()
dp = SharderDataPipe(dp)
dl = DataLoader2(dp, ...)

for epoch in range(2):
    # dl.set_seed(0)
    print(list(dl))  # dl will use time as the seed in the main process, and generate the seed for shuffle

WDYT?

NicolasHug commented 2 years ago

I am not sure about it. Users still need to specify seeds for all global RNGs using worker_init_fn right?

I don't believe so. In the torchvision training references for example (which currently use map-style datasets), we don't need to worry about seeds at all. And yet:

Ideally the same simplicity would be available with datapipes.

And, even with Sampler, if users want to control shuffle state, users have to manually set seed to torch

Are you sure? I believe that the 2 points above are still verified even if we were to call torch.manual_seed() somewhere in the training script.

ejguan commented 2 years ago
  • we're sure that all workers properly shuffle the data, and

This is not the mechanism for Map-style Dataset + DataLoader. Shuffling is carried out in main process using Sampler rather than in worker process.

Are you sure? I believe that the 2 points above are still verified even if we were to call torch.manual_seed() somewhere in the training script.

You mean users don't need to call torch.manual_seed() right? See RandomSampler: https://github.com/pytorch/pytorch/blob/9bfa7e9791a13e786d39d938ccce0798d3f18ffd/torch/utils/data/sampler.py#L117-L124

If not specified, the seed for the generator object is created by tensor.random_(), which relies on torch.manual_seed(seed) to control the reproducibility in the main process.

If you don't do manual seed to torch, the order of data will vary for different runs of the training script.

  • we're sure that all the transforms performs within the datasets __getitem__() methods all use different seeds.

I am not 100% sure about how TorchVision augmentation works. But, if all random augmentation relies on the global Torch RNG, to control reproducibility, you have to specify worker_init_fn to make the random augmentations deterministic in worker processes.

If users don't want to preserve reproducibility, users don't need to specify any of torch.manual_seed or worker_init_fn. It's same for DataLoader2, users don't need to specify anything. But, in order to achieve reproducibility, we do need some hurdles in the code.

NicolasHug commented 2 years ago

I agree with everything you wrote @ejguan , but I think we're not entirely talking about the same thing.

My main concern here is to provide an API for datapipes that is just as simple as what we currently have for map-style datasets. Considering the non-reproducible use-case:

Now considering the reproducible use-case, e.g. when users call torch.manual_seed():

In both these use-cases with map-style datasets, things are kept extremely simple and hidden from the user. At no point do they need to worry about a specific seed for shuffling, or a specific seed for transforms. They can just call manual_seed() once if they want to for reproducibility, but that's it.

Perhaps I'm missing some important detail?

ejguan commented 2 years ago

Great summary! And, thanks for bringing this discussion. It triggers me some thinking about BC @NicolasHug For non-reproducible use case

For reproducible use case

You can see the only difference from users perspective is when they want to have an isolated RNG per random op (IterDataPipe). And, you might ask why I would come up with this design - IterDataPipe becomes modular components rather than monolithic object like Dataset, we want to have each component having their own state. It's more deterministic and easier for doing checkpoint. Considering we don't want to limit users to only use torch.random using the torch random generator, we need to have a way for us to register all different RNGs user want to use that need to be serialized.

LMK if this makes more sense.

NicolasHug commented 2 years ago

@ejguan and I just had a chat offline where we discussed some of the points above. Here's a summary of our discussion thus far. The points below are either re-hashing, or updating / correcting the ones above. @ejguan please feel free to edit / correct if this isn't accurate. And thanks again for your time and all your work on this!

ejguan commented 2 years ago
  • quick Q: currently, the Shuffler's RNG comes from Python's random builtin module. Do you think it would make sense to allow torch.manual_seed() to also control that RNG?

Yeah. It will be one TODO in the proposal. I will make the RNG attached to shuffler can be seeded by torch

franchesoni commented 2 years ago

LMK if this design works for you

Sorry for the delay. You figured things out already in a nice way. Given that shuffle is most of the times used to shuffle the full input data it makes sense that it is handled specially, sounds great!

If I understood well, the last proposal

dp = IterableWrapper(range(n))
dp = dp.shuffle().set_seed(0)
dp = SharderDataPipe(dp)
dp = dp.map(lambda x: random_augment(x))  # franchesoni added this line
dp = dp.random_op(rng=random.Random())  # the API is not determined yet.
dl = DataLoader2(dp, ...)

for epoch in range(2):
    dl.set_seed(0)
    print(list(dl)) 

changed and it is now

n = 10
dp = IterableWrapper(range(n))
dp = dp.shuffle()  # one per worker using shared seed
dp = SharderDataPipe(dp)  # data subsampling depending on worker num (function defined way above)
dp = dp.map(lambda x: random_augment(x))  # one per worker using worker seed
dl = DataLoader2(dp, ...)

for epoch in range(2):  # will print the same correctly shuffled range twice
    torch.manual_seed(0)
    print(list(dl))  

1- is this right? 2- could you implement an official .shard method so the users can import it instead of defining SharderDataPipe function? 3- calling shuffle twice will use different shared seeds, right? 4- although I can not think right now about any other function that could benefit from a shared seed, there is probably some usecase. In such a case it would be nice to still provide the user the option to use shared or worker seed for each operation if she wants to (e.g. I imagine a seed='shared'|'worker' param). Is it too much?

I came here because my code said

    assert (
        num_workers <= 1
    ), "this should be 1 until https://github.com/pytorch/data/issues/302 is solved (check if it is)"

happy to see it has advanced. Let me know when I can remove this assertion! :stuck_out_tongue_closed_eyes: