pytorch / rl

A modular, primitive-first, python-first PyTorch library for Reinforcement Learning.
https://pytorch.org/rl
MIT License
2.2k stars 290 forks source link

Correct construction of TensorDictReplayBuffer in DDP #1397

Open patchmeifyoucan opened 1 year ago

patchmeifyoucan commented 1 year ago

Hello,

I have a question regarding the construction of replay buffers in distributed training (DDP). Across multiple workers, I would like to use a single, large prioritized replay buffer. With uniform sampling, one can easily get away with using worker-local buffers of smaller size. However, in the prioritized case, using a per-worker buffer would not yield the same distribution as a single buffer, since statistics are calculated based on a local subset of all information.

I hoped that if I simply use a shared data location, some lower-level synchronization would write to the correct indices, which turned out is not the case. After 10.000 steps across 4 workers, len(buffer) returns only 2500 (I think I understand why, see below).

Even though I would find a workaround using some form of inter-process locking, I would like to hear your opinion on this issue. In particular, I am using the code below which is a modification that used beta scheduling.

class ScheduledTensorDictPrioritizedReplayBuffer(TensorDictPrioritizedReplayBuffer):
    def __init__(
            self,
            alpha: float,
            beta_start: float,
            beta_end: float,
            beta_steps: int,
            storage: LazyMemmapStorage,
            batch_size: int,
            prefetch: int | None
    ):
        super().__init__(
            storage=storage,
            alpha=alpha, beta=beta_start,
            priority_key="priority",
            pin_memory=True,
            batch_size=batch_size,
            prefetch=prefetch
        )

        self.beta_start = beta_start
        self.beta_end = beta_end
        self.beta_steps = beta_steps
        self.batch_size = batch_size
        self.prefetch = prefetch

        beta_diff = abs(beta_end - beta_start)
        beta_diff *= +1 if beta_start < beta_end else -1
        self.comp_fn = min if beta_diff > 0 else max

        self._beta_current = beta_start
        self._beta_step_size = beta_diff / beta_steps

    def can_sample(self):
        return len(self) >= self.batch_size

    def step(self):
        self._beta_current = self.comp_fn(self._beta_current + self._beta_step_size, self.beta_end)
        self._sampler._beta = self._beta_current

Right now, the buffer is created as follows, where I actually would like the storage to be global.

buffer = ScheduledTensorDictPrioritizedReplayBuffer(
        storage=LazyMemmapStorage(max_size=1_000_000 // n_workers, scratch_dir=f"data/{uuid.uuid4()}"),
        alpha=0.6, beta_start=0.4, beta_end=1.0, beta_steps=1_000_000 // n_workers,
        batch_size=128 // n_workers, prefetch=128
    )

Is there something like a DistributedRoundRobinWriter? Since this needs at least one synchronization point, what would be the "best" one? From the implementation I can see that it would be sufficient to keep the cursor in sync, right? Any suggestions how to accomplish that without any (noticable) loss of performance while not changing any semantics?

Thank you very much.

vmoens commented 1 year ago

Hey Thanks for raising this. So what I understand is that each worker is stepping on each other's foot right? We have an RPC-version of the replay buffer here, have you had the chance to look at it? Happy to help if that does not suit your use case or if it is not clear!

patchmeifyoucan commented 1 year ago

Hello,

thanks for the quick reply.

Yes, the issue is simply that the workers don't know about each other and overwrite their data according to their local cursors.

"Distributed" was a bit ambiguous. Currently, I am on a single SLURM node with multiple GPUs. In principle I could use RPC calls as in the distributed example but I not really need remote calls. I also would expect impacts on performance and also stability (is this even correct?) but I have not worked with the RPC framework yet. So in principle it would be nice if I could avoid changing my usage of the replay buffer.

But reading some of the docs of the RPC framework it might be a self-caused issue due to my synchronous implementation of interaction and learning across workers. I could avoid this issue by making rollouts (writes via async RPC) and parameter updates (reads) asynchronous, which is more standard in large-scale RL I guess.

Is there any more general pattern you would suggest to avoid this issue? I would also be open to reconsider my setup if I can avoid such issues in the future.

vmoens commented 1 year ago

"Distributed" was a bit ambiguous. Currently, I am on a single SLURM node with multiple GPUs. In principle I could use RPC calls as in the distributed example but I not really need remote calls. I also would expect impacts on performance and also stability (is this even correct?) but I have not worked with the RPC framework yet. So in principle it would be nice if I could avoid changing my usage of the replay buffer.

Got it, I see your concern. The reason we use RPC is that the priority tooling (eg., sum tree etc) are coded in c++ on a single worker. But I understand that this may change your workflow a bit as the init is slightly different. For regular replay buffers, sharing data between workers could be done easily (we should just find a way of sharing the cursor of the storage). But for prioritized we would need a way of querying the sum tree and similar on the buffer worker. Because we want things to be modular and reusable across scenarios, I'm not sure we want to work on a multiprocessed replay buffer that does not support prioritized RBs.

If I understand your use case: you would like to have a RB where the storage is split between workers. You would also like that the priority and cursor are shared, since the main node is presumably the one doing the sampling. Those things are a bit conflicting since the cursor for a worker will be beyond the number of elements the buffer contains, which poses some questions on what this cursor means for that worker.

one way to go about this without recurring to RPC would simply be to put the data in a mp.Queue (or use a "buffer" ie a tensor in shared memory to pass data from one proc to another) and do all the replay buffer heavy lifting on a dedicated worker. Would that solve your problem? It's a bit less elegant that what you were suggesting but I don't really see how to do it otherwise... Happy to help implement a simple example if that helps! Also don't hesitate to correct me if I misunderstood something

patchmeifyoucan commented 1 year ago

Initially, I did not think of PER in distributed settings in the right way. My use case is exactly the standard one. When I checked the TensorDict RB implementation, I did not see that it had any MP/distributed support and hence a quick (and wrong) idea was to use worker-local buffers. But that of course changes the sampling distribution arbitrarily.

So the RB buffer being split was just an artifact of me not knowing how to properly use it. I think I will try out the RPC version if that's the way to go. Since I'm using Fabric for multi-device (and multi-node, in the future) support, I would rather use a method that works well across multiple-nodes. I think I will update my workflow to have async actors and learners, seems like I can avoid this kinds of problems easier this way and I planned to make acting and learning async anyways in the future.

I will read through the docs and also check the distributed example more carefully. I'll report back whether or not I resolved it myself.

Thank you very much for now.

vmoens commented 1 year ago

Awesome, let's keep this open. Happy to solve any issue you might have with the rpc version!

patchmeifyoucan commented 1 year ago

I think I now understood most of the technical concepts around the RPC framework and I will definitely use it. I would like to hear your opinion on some architectural design decisions, though.

I thought about my usage of Fabric and I think I will continue using it due to some if its nice properties, mostly because it creates the appropriate processes/ranks and offers nice functionality I might use in the future. Due to the cluster configuration I am working on, I would try to distribute all my components onto CPUs and GPUs depending on how much they need to compute. Not being sure whether I should do so using Fabric, I opened an issue here. But for now we can assume that I have a set of ranks created.

Currently, I don't really know what style of training is best for me, so I would like to keep things flexible. My main concern resides around whether I should implement a parameter server accumulating and applying gradients. Some other rank hold the RB. Then I have actors and learners on all other ranks.

The training then does one of the following modes

One thing to note is that in my case, the policy is the expensive part (it's a small CNN right now but it might become larger in the future). So I have to use the GPUs for both acting and learning. The reward is basically for free.

Do you think using a parameter server is a good idea in this case? What about latency? Should I even worry? Or does it make more sense keep DDP-style training since it directly synchronizes across devices? Does DDP also use RPC under the hood?

patchmeifyoucan commented 1 year ago

Nevermind, having read the DDP docs more carefully, it seems to be the way to go also in the multi-node case.

Ruthrash commented 5 months ago

Got it, I see your concern. The reason we use RPC is that the priority tooling (eg., sum tree etc) are coded in c++ on a single worker. But I understand that this may change your workflow a bit as the init is slightly different. For regular replay buffers, sharing data between workers could be done easily (we should just find a way of sharing the cursor of the storage). But for prioritized we would need a way of querying the sum tree and similar on the buffer worker. Because we want things to be modular and reusable across scenarios, I'm not sure we want to work on a multiprocessed replay buffer that does not support prioritized RBs.

I am trying to build and sample a regular replay buffer whose storage is shared across more than one computer. I got the RPC version of replaybuffer working, but this uses one node as the replay buffer node currently. If I were to extend the replaybuffer storage to more than one node, what would be the best way to implement this using torchRL. Please elaborate a bit more/share your thought @vmoens. Thanks so much in advance.