pytorch / data

A PyTorch repo for data loading and utilities to be shared by the PyTorch domain libraries.
BSD 3-Clause "New" or "Revised" License
1.12k stars 149 forks source link

Update the document to keep non-replicable DataPipe in the main process for MPRS #1079

Open sehoffmann opened 1 year ago

sehoffmann commented 1 year ago

🚀 The feature

Highlight the fact that the MPRS attaches non-replicable datapipe branches at the end of it in the documentation. Also mention the currently undocumented / obscure is_replicable() interface for IterDataPipe and MapDataPipe.

Old Issue

I would like to make the use of MultiProcessingReadingService as opaque as possible. In particular, it should not affect the shuffling behavior. For that, I need more control and be able to manually expand the internal pipeline of the MultiProcessingReadingService AFTER worker batches are received in a round-robin fashion such that I could e.g. add an additonional shuffle buffer. It would be good to have an extension point for this. Potentially, the Adapter protocol could be expanded for this.

Also, on a side note: There should possibly be an Adapter that divides the shuffle buffer size of shuffle operations, so that the overall memory consumption does not depend on the amount of IO workers. The existing Shuffle adapter could be expanded for this. While I am aware that the documentation recommends placing shuffling operations as early as possibly where they still might act on a logical level, there can be situations where this is simply not possible and shuffling has to be done at the end.

Motivation, pitch

I'm working with horovod which spawns one MPI process per GPU on each node. Logical sharding of the dataset takes place in this MPI setting and usage of the MultiProcessingReadingService (per process/gpu/shard) should be seen as an implementation detail. As such, I don't want it to affect the shuffling behavior within a shard, i.e. there should be no difference in randomness between training with 1 IO worker or training with 8 IO workers per GPU.

In my particular case, shuffling also needs to be performed right at the end of the pipeline.

sehoffmann commented 1 year ago

In particular, I would like to be able to do

end_datapipe = self._worker_consumer_datapipe.unbatch().shuffle(n_workers*batchsize).batch(batchsize)

immediately after https://github.com/pytorch/data/blob/main/torchdata/dataloader2/reading_service.py#L297

Since I'm not that familiar with the codebase you probably have a better approach, but something along the lines of a Adapter.finalize_main_pipeline() and a Adapter.finalize_worker_pipeline() function would be a good solution for this IMO.

This would also allow us to implement the main process and worker process prefetchs as Adapters which then can be reused with other ReadingServices as well and would be more in-line with the existing design IMO.

ejguan commented 1 year ago

There is a way to achieve that without relying on Adapter. When you define your pipeline, you can specify the DataPipe that you want to keep in the main process by attaching a method of is_replicable() and returning False. Then, MPRS would launch workers with the replicable eDataPipe graph prior to this non-replicable DataPipe and re-connect the "end_datapipe` to the non-replicable DataPipe

sehoffmann commented 1 year ago

@ejguan That's some great news! Is it possible to wrap a sub-graph into a DataPipe (this is some question I had anyways), so that I could reuse the already existing implementations? I.e. specifically I would need to wrap .unbatch().shuffle(n_workers*batchsize).batch(batchsize) into a new DataPipe (class) and then implement is_replicable() to return false.

sehoffmann commented 1 year ago

Also, on a side note: I think it would be good to highlight this option in the documentation. Disregard this comment if that is already the case and I just overread it.

ejguan commented 1 year ago

Is it possible to wrap a sub-graph into a DataPipe (this is some question I had anyways), so that I could reuse the already existing implementations?

Technical speaking, yes you can. But, in the traverse_dps function, we will treat them as separate DataPipes rather than a single one.

I.e. specifically I would need to wrap .unbatch().shuffle(n_workers*batchsize).batch(batchsize) into a new DataPipe (class) and then implement is_replicable() to return false.

I should clarify a little bit. The MultiProcessingReadingService will go through the DataPipe graph and find the first non-replicable DataPipe, and keep this DataPipe with subsequent DataPipes in the main process, by that I mean you only need to add is_replicable() function to unbatch.

ejguan commented 1 year ago

Also, on a side note: I think it would be good to highlight this option in the documentation. Disregard this comment if that is already the case and I just overread it.

Yeah. This is a new feature and we slightly touch this topic in our doc. We should definitely update our document.

sehoffmann commented 1 year ago

@ejguan Great, thanks a lot!

Some additional thoughts on this:

If I understood you correctly wrt. composability the canonical way would look something like this?

class ComposedPipe(dp.iter.IterDataPipe):
    def __init__(datapipe):
          self.datapipe =  datapipe.map(lambda x: x).unbatch() #etc
     def __iter__(self):
           for x in self.datapipe:
                  yield x + 42

And as such, traverse_dps picks this up as 3 pipes. Ideally, there should be a way in the future that allows us to hide this from the outside world. I.e. ComposedPipe should appear as a single pipe in the graph, since the sub-pipes are an implementation detail (code reuse). Adding pipes after our own implementation is also a bit contrived, since we would need a separate iterator object, pass that object to the "suffix" pipe, and finally in __iter__ return the iterator of the suffix pipe.

If you want, you can close this issue :)

ejguan commented 1 year ago
  • This might not hold in the future (multiple replicable branches would actually be a cool feature!), and thus a proper wrapping pipe would become necessary.

Yeah. That's something in my mind as well. But, I might do it in a separate ReadingService as it requires more complicated arguments to set number of workers per replicable branch.

  • I think something along the lines of a "SynchronizerPipe(*pipes)" should become part of the standard API; where *pipes denotes a part of the graph that should we non-replicable.

Could you please elaborate it? I do not fully understand. As a reference, we do have zip DataPipe to combine multiple DataPipes.

If I understood you correctly wrt. composability the canonical way would look something like this?

Yeah. Just want to callout that lambda function is not recommended in TorchData as it's non-picklable.

And as such, traverse_dps picks this up as 3 pipes. Ideally, there should be a way in the future that allows us to hide this from the outside world. I.e. ComposedPipe should appear as a single pipe in the graph, since the sub-pipes are an implementation detail (code reuse). Adding pipes after our own implementation is also a bit contrived, since we would need a separate iterator object, pass that object to the "suffix" pipe, and finally in __iter__ return the iterator of the suffix pipe.

I am not convinced for a few reasons:

Technically speaking, I would hope users don't need to do anything with traverse_dps by themselves. TorchData should provide all utility graph functions. So, I guess I would dive deeper on this if any user reports their use causes.

If you want, you can close this issue :)

Do you mind updating the summary of Issue to update the document to keep non-replicable DataPipe in the main process for MPRS? Then, we don't need to close this Issue.

Thank you!