rlworkgroup / garage

A toolkit for reproducible reinforcement learning research.
MIT License
1.86k stars 310 forks source link

Asynchronous Replay Buffer #1487

Open avnishn opened 4 years ago

avnishn commented 4 years ago

Currently, the Default Sampling worker operates in the following steps every time obtain_samples is called:

1) reset environment
2) step environment and collect samples `n` times
3) post process samples

This is fine for single-task on-policy algorithms and batched off-policy algorithms, however, it isn't good for when a user wants to do single-step sampling, an not necessarily collect a whole trajectory of data that is of length max_path_length.

This is because, if a user collects n < max_path_length steps, then the states that potentially fall into the m = max_path_length - n steps will potentially not be observed, because a call is made to env.reset() before they can be observed.

A way that this can be solved is by making a variable step sampling worker that maintains its own state such that it knows how many steps have been collected inside an environment. Calls can then be made to variable_step_worker.obtain_samples(number_of_steps_to_collect=n) and then the worker's environment will only be reset if worker_num_steps_collected == max_path_length. This can be achieved using the python generator design pattern.

For new users, go ahead and check out our test suite in order to understand the functionality of our samplers. Some easy to understand sampler tests can be found here.

ryanjulian commented 4 years ago

@avnishn AFAICT this is what VecWorker and its older cousin VecEnvExecutor are supposed to do.

ryanjulian commented 4 years ago

I should also add that I think the idea of "single step sampling" in off-policy algos is a bit of a lark. Assuming your policy optimizer can keep up with your sampler, there's absolutely no reason that "single-step" off-policy sampling actually need to be synchronous single-step.

That is, if sampling and optimization happened completely in parallel, if your sampler is producing ~1000 steps/second, as long as your Q-function optimizer is completing ~1000/gradients per second, there is absolutely no mathematical difference between the parallel configuration and the "step then optimize" synchronous version. The precise correct ratio (grad steps per env step) is an important hyperparameter which can vary from 1 to 30 in typical use, and allowing that to be tuned is what we should focus on.

If we take a step back, optimizing a code path which people shouldn't use (synchronous off-policy sampling) is a distraction from implementing the one they should use (decoupling the sampler from the runner, and having the off-policy sampler and optimizer only communicate through the replay buffer).

ryanjulian commented 4 years ago

To convince yourself that the parallel and synchronous versions are equivalent given that the step:grads ratio is maintained, ask yourself "If I collect a new sample s during off-policy sampling, what's the probability that s appears in the next batch of off-policy optimization? Is p(s in batch) different between the synchronous and asynchronous cases?"

Consider how this thought experiment goes for an on-policy algorithm as well.

avnishn commented 4 years ago

Hmm I think that using step:grads ratio definitely increases the likelihood that S is used in optimization of the Q function(s), however there is one thing about this that concerns me:

Say we collect a set of samples during a training epoch and sample s is in that set. If for every sample collected, I train my q function on a random batch of samples, S', and the probability of s appearing in that set is p. Our steps:grads ratio is s_g. Even if we optimize the q function s_g times, the likelihood that 's appears in our q_function optimization batch is relatively low.

With that said, I guess this is a general possibility with all off policy RL algorithms.

avnishn commented 4 years ago

Also related to our original conversation I think that you are correct in saying that this is the functionality of Vec worker, except that Vec worker's function rollout doesn't take a parameter the number of steps that should be collected by the worker, and therefore makes it not possible to implement the original functionality of the sampler used in the original SAC implementation,

however if we were to do sampling and optimization in parallel in SAC, then our SAC implementation would get closer to the original SAC implementation.

ryanjulian commented 4 years ago

@avnishn I think what I'm saying is that you can implement this as adding a single-stepping option to VecWorker. I think KR even has an issue open for that already #1245

Your observation about the likelihood of s appearing in the new batch is correct, but it's completely unchanged between parallel and synchronous versions of any off-policy algo.

The steps:grads ratio is present in both the synchronous and non-synchronous versions. There's no question of whether we are "using the steps:grads ratio" -- it's a quantity which exists in all cases. For "single-step" sampling it is 1:1. In a synchronous off-policy implementation, this is enforced by control flow (step once then optimize once), but it doesn't have to be. There's nothing special about the sample you collect during "step once" -- it goes into the same replay buffer and has the same likelihood of getting sampled during "optimize" once as if it had been added asynchronously.

That's my point -- this "s has a low likelihood of appearing in the batch" thing is already happening. Sampling s then sampling the replay buffer might make it feel like it makes s more likely to appear in the optimization, but it doesn't.

ryanjulian commented 4 years ago

My main point here is that focusing on the synchronous case is a waste of time when that effort could be spent adding asynchronous sampling to SAC and PEARL, which is more likely to create performance benefits.

If your sampling is asynchronous, your sampling worker(s) can collect samples in trajectories (without single-stepping) and all of the logic gets a lot simpler. Adding "single stepping" into the sampler API is actually a way of trying to force the sampler to enforce your chosen step:grads ratio.

Here's a different way you can enforce your step:grads ratio

# This example probably has several deadlocks, but it's just mean to illustrate the point
class AsyncReplayBuffer:

  def __init__(self, ...):
    self._puts_lock = RLock()
    self._gets_lock = RLock()

  def put_sample(self, sample):
    while self.put_get_ratio > self._target_put_get_ratio:   # too many puts
      time.sleep(0.1)   # okay busy waits are stupid and really we should wait on a condition variable

    with self._puts_lock:
      self._samples.append(s)
      self._puts += 1

  def get_samples(self, n):
    while self.put_get_ratio < self._target_put_get_ratio:  # too many gets
      time.sleep(0.1)

    batch = self._samples.choose_n(n)
    with self._gets_lock:
      self._gets += len(batch)
      return batch

  @property
  def put_get_ratio(self):
    return self._puts / self._gets

class MyWorker:

  def rollout(self)
    o = self._env.reset()
    done = False
    while not done and t < self._max_path_length:
      a = policy.get_action(o)
      next_o, r, done, info = self._env.step(a)
      self._replay_buffer.put_sample((o, a, r, next_o, done, info))  # might be blocked

class MyAlgo:

  def __init__(self, ...):
    self._replay_buffer.set_target_put_get_ratio(1.)

  def train(self, runner):
    for _ in runner.step_epochs:
      samples = self._replay_buffer.get_samples(batch_size)  # might be blocked
      self._optimize_qf(samples)
      self._optimize_policy(samples)
      self._sampler.update_policy(self._policy)