uber / petastorm

Petastorm library enables single machine or distributed training and evaluation of deep learning models from datasets in Apache Parquet format. It supports ML frameworks such as Tensorflow, Pytorch, and PySpark and can be used from pure Python code.
Apache License 2.0
1.78k stars 285 forks source link

Petastorm sharding + Distributed PyTorch #508

Open megaserg opened 4 years ago

megaserg commented 4 years ago

Problem: I would like to train a PyTorch model on a Parquet dataset in a distributed (multi-GPU, multi-machine) setup, for a fixed number of epochs. For this, I need to shard the dataset and I hoped providing Petastorm's cur_shard and shard_count would be sufficient. I create Petastorm reader with num_epochs=1 each epoch (or could create once and reset()).

    for epoch in range(epochs):
        train_loader = petastorm.pytorch.DataLoader(petastorm.make_reader(
            ...
            num_epochs=1,
            cur_shard=ctx.global_rank,
            shard_count=ctx.world_size,
        ))

        for i, (inputs, targets) in enumerate(train_loader):
            inputs = inputs.to(ctx.device)
            targets = targets.to(ctx.device)

            predictions = model(inputs)
            loss = loss_fn(predictions, targets)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

But when training a DistributedDataParallel model, PyTorch expects shards to have the same number of examples, so that they have the same number of batches, so that all ranks make the same number of training steps, so that they participate in the same number of allreduces. E.g. torch.utils.data.distributed.DistributedSampler (used to implement sharding in stock PyTorch's DataLoader) wraps the dataset around to make it evenly divisible by the number of shards.

If shards are not even-sized, some ranks have less work to do, finishing their shard early, and start the next epoch while the rest of the ranks still process the previous epoch. If we're training for certain number of epochs, the "fast ranks" eventually finish first and terminate, leaving the rest of the ranks hanging because allreduce is now impossible.

In Petastorm, the even-sized shards are not guaranteed. The len of dataset is unknown and we don't have random-access to rows. Rowgroups are assigned to shards in a round-robin fashion without wraparound, so one rank can get more rowgroups than the other. Moreover, rowgroups might not have the same number of rows, and applying row predicates can change the balance further.

Possible solutions: I thought about how to make it work.

@selitvin is my assessment of the problem valid? Do you have some solutions in mind? Thank you!

selitvin commented 4 years ago

I absolutely agree with the problem definition and your analysis of possible solutions. For the scenarios our user have encountered, they typically chose option 3 as it does not introduce unintended bias for some samples and the easiest to configure.

Perhaps one more control variable relevant to Option 2, is the size of your rowgroups. Reducing it improves the granularity of the task-unit and for smaller datasets may reducing unintended bias some portions of samples may get during training.

Implementing Option 1 will be tricky since that would introduce coordination between ranks that is not needed with the naive module-N sharding.

megaserg commented 4 years ago

Thank you! Good to know I'm on the right track.

I control the size of the rowgroup, but reducing it also takes away the benefits of Parquet as a format - more reads required.

Option 4 I found in PyTorch discussions on sampling IterableDataset: make each of k ranks read the whole dataset of size n, but apply reservoir sampling to subsample n/k rows. Wastes some resources, and doesn't guarantee that a particular example is included in every epoch. Requires dataset size to be known, which can be computed during the first epoch using Option 3, or with a separate scan.

selitvin commented 4 years ago

IterableDataset is a very interesting direction. Having a more pytorch native way of parallel loading/processing could probably be used to substitute petastorm custom worker-pools and make pytorch users experience more pytorch look&feel as well as improve performance due to a better, shared-memory-based, IPC communication mechanism already inplace in pytorch.

tottenjordan commented 3 years ago

@megaserg I am trying to do something similar with TPU Pods (multi-machine), PyTorch XLA reading imagnet petastorm dataset from GCS buckets. Did you find that one of your options worked best? If so, how did you implement? Option 3? 4? something else?

My current solution reads JPEG from GCS. Came to petastorm so I can reduce training time, as my goal is to make training time for this configuration as performant as reading data from local VM (but for use cases where storing training data locally is not ideal). More on that here if interested

@selitvin appreciate any thought you may have too.

megaserg commented 3 years ago

So we went with option 3 for a while. Works fine, but epoch counting is confusing.

Then we went with option 1, but we didn't use Petastorm. Given a Parquet dataset (i.e. bunch of Parquet files), we build an "index" of Parquet rows, basically a list of triples (filename, rowgroup index within file, row index within rowgroup). So it basically becomes a MapDataset, and we can use a stock PyTorch DataLoader, with its sharding and sampling and shuffling logic. Each shard reads its own subset of these triples, fetches them via Dataset's __getitem__() (which goes to GCS and reads that filename and gets that rowgroup by index and yields that row by index and throws away the rest of the rowgroup). So this works decently fast, especially if we use large dataloader's num_workers helper processes. But the problem is that we have to throw away data we don't use, so we have to keep rowgroup size to like 1 which kind of kills the performance. But at least we can read Parquet, apply predicates, only read needed columns, and enjoy at least some compression.

So we thought how we could have larger rowgroups (benefitting from the storage format), and not throw away data. Currently, we're using a custom IterableDataset which is aware that it's being used in multi-GPU and multi-dataloader-workers setting. We still use stock PyTorch DataLoader. We still build/read the index as above; but then we divide the whole sorted dataset into equal-sized consecutive pieces, one for each shard; and then we divide each of that pieces into roughly-equal-sized consecutive pieces, one for each dataloader worker. So each dataloader worker gets allocated its own subset of rows, which are likely all from the same rowgroup, or a few consecutive rowgroups within a file, or (rarely) crossing a file boundary. Each dataloader worker then applies its own, short-distance, shuffle to its assigned subset: it only rearranges rows within a rowgroup, or rowgroups within a file, or files. Therefore, when a worker reads a rowgroup, it will yield rows from it in a shuffled order, and only then will proceed to the next rowgroup. So there's still some chance of throwing some data out, because rowgroup size might not be aligned with shard size or num_workers, but that's a question of tuning and experimenting.

Hope this helps!

weidezhang commented 2 years ago

just wondering if i use option 3(each rank loads the whole petastorm dataset), might reading the whole data set have any performance bottlenecks ? is it better to set number of row groups equal to 1 in that case ?

selitvin commented 2 years ago

There should not be performance bottlenecks since the parquet dataset is read rowgroup-by-rowgroup. What do you mean by "set number of row groups equal to 1 "? Is it about having a single row-group in the entire parquet store or having one row in each row-group? (both are not good ideas - the first one is likely to blow up RAM usage in case of a decent size dataset and the later would be slow due to a large number of roundtrips when reading data).

weidezhang commented 2 years ago

just to confirm how option 3 is configured:

in make_reader api, just simply set cur_shard=None, shard_count=None, shard_seed=None, num_epochs=real epoch number instead of None. is it fine ?

selitvin commented 2 years ago

Yes (note that cur_shard=None, shard_count=None are the defaults, so you don't need to specify this explicitly)

zxgx commented 2 years ago

I've tested solution 3, and discovered that, in order to equivalently load a global batch in each rank, I must also set worker_count=1 when invoking make_batch_reader, otherwise the batch in each rank would still be random. Another potential solution for this may be setting reader_pool_type="dummy", however, when launching multiple processes, the whole procedure would be stuck. Anyway, I suppose decreasing the number of workers might incur some performance loss. So Does it the right way to accomplish solution 3 or how could I mitigate this problem?

BTW, I notice that there is an in progress PR: https://github.com/uber/petastorm/pull/767 working on shuflle. Could you please tell me if you have any plan to fix it? @selitvin

selitvin commented 2 years ago

You are right. The order is not deterministic when multiple workers race over reading row groups and we do not have a reordering queue set up (not too hard to implement). Curious, why would you want to get the same order of rows on multiple ranks? I would assume that you do want to shuffle as much as possible anyway?

767 will land very soon (day(s) timespan)

zxgx commented 2 years ago

I'm trying to build a DLRM model containing both model parallel layers & data parallel layers. In the model parallel part, I must make sure each rank consumes the same global batch. After that, an all-to-all collective distributes the global output activations to data parallel layers in each rank. I don't know whether this explanation is clear enough, but I do need the same order in the global batch in each rank.

In fact, this is a compromise choice. Potentially, I could also generate different mini-batch in each rank, and use an all-to-all collective to recover the global batch before inputting to model parallel layers. However, duo to the communication problems in our hardware, the former is better for my own purpose.

selitvin commented 2 years ago

I see. It should be not too hard to add a reordering buffer to the reader. I think pytorch DataLoader has similar functionality. I am not sure when I'll have time to do this. If you wanna give it a shot and propose a PR, I'll be happy to work with you to get it in.

zxgx commented 2 years ago

I'm too busy to handle dataloader recently:dizzy_face: that's why I resort to petastorm. I may have a look into the source code in the future, but I would recommend fixing this load balancing issue for DDP first as this is a very common demand in large scale datasets and distributed training.