google-deepmind / reverb

Reverb is an efficient and easy-to-use data storage and transport system designed for machine learning research
Apache License 2.0
704 stars 93 forks source link

Any details for building the sharding reverb services? #3

Closed fmxFranky closed 4 years ago

fmxFranky commented 4 years ago

Hi all! Thanks for open-sourcing this~ I want to use reverb to build a distributed prioritized replay buffer used in APEX or R2D2. But I don't know how to achieve that, especially what functions need to use and how to use them with a gRPC compatible load balancer. And I cannot find the document mentioned in the section of "Sharding" of the README. Are there some examples or py files to let me how to use reverb in the sharding situation? Thanks a lot~

acassirer commented 4 years ago

Hey,

Great to hear that you wanna try scaling up using Reverb! As the documentation indicate you don't really need to do anything if you are able to setup a loadbalancer in front of your Reverb servers. Simply provide the address to the load balancer target when creating the clients (or datasets).

As a simple example, assume you have a server binary:

# server.py
"""Starts a Reverb server and blocks forever.

Port must be provided as the first and only argument.
"""

import sys
import reverb

def main(argv):
  port = int(argv[1])
  server = reverb.Server(tables=[...], port=port)
  server.wait()  # Blocks forever.

if __name__ == '__main__':
  main(sys.argv)

In a single machine setup you would connect to this as

# client.py
"""Connects to a server running on the same machine.

The same port must be provided to the server and the client.
"""

import sys
import reverb

def main(argv):
  port = int(argv[1])
  client = reverb.Client(f'localhost:{port}')

if __name__ == '__main__':
  main(sys.argv)

In a sharded setup it is exactly the same except that you run multiple instances of server.py behind a loadbalancer and then connect to it using its address. Simply put, replace localhost:<port> with <grpc_compatible_address_to_lb>.

If configured as described then Reverb will juggle the connections internally and write/read across the shards. You do however want to take a careful look at the dataset arguments in https://github.com/deepmind/reverb/blob/master/reverb/tf_client.py and configure max_samples_per_stream and num_workers_per_iterator to match your setup.

fmxFranky commented 4 years ago

Hi @acassirer Thank you for your reply. According to my understanding, the balancer and reverb is independent and the two parts do not affect each other. What I still wonder is if I want to implement the sharded reply buffer in APEX/R2D2 on a machine, what do I need to do in details? According to your reply, should I do things in the following order (in the same py program)?

  1. Set the balancing module on the machine and use port_0
  2. Define N reverb.servers, from server_1 to server_N, corresponding to N ports, from port_1 to port_N
  3. Define client_i on the remote actor_i and connect to port_0 If I don't have the balancer, shuold I need to choose a port from port_1 to port_N for each actor? If this is the case, how should I set up the grpc balancer? Is there any software to do this? Thank you very much, looking forward to your reply
acassirer commented 4 years ago

Hey,

If you are on a single machine there might be very limited value in sharding the replay server, we have been able to serve >10k actors using a single server so performance is most likely not going to be an issue. The main reason to shard a replay would be that there isn't enough memory on a single machine to handle your use case.

As for your step by step description, it looks accurate to me.

If you don't have a load balancer (and even if you do), just think of each shard as being completely independent from the others. You can therefore either setup the clients so that they talk to a single server and configure the system as a whole so it becomes "balanced" OR you simply create adopt a many to many approach and handle the switching in your code. Reverb uses round robin for writing (which is very easy to reimplement in code) and basically "pick whoever responds" for sampling.

I don't really have any experience in setting up load balancers using open soft software so I can't provide much help in this regard. Kubernetes does however seem like an obvious first thing to try.

Here are some pointers on how you could implement the round robin and how it really ends up being exactly the same as a single shard.


def round_robin(clients):
  """Generator that provides a client using round robin over `clients`."""
  while True:
     yield from iter(clients)

def run_and_record_episode(environment , actor , writer):
   """Use `actor` to interact with `environment` for an episode and record data to writer."""
   first = environment.reset()
   ...

def run_single_shard(environment, actor, client, num_episodes):
  for _ in range(num_episodes):
    with client.writer(...) as writer:
       run_and_record_episode(environment, actor, writer)

def run_multi_shard(environment, actor, clients, num_episodes):
  clients_lb = round_robin(clients)
  for _ in range(num_episodes):
    with next(clients_lb).writer(...) as writer:
      run_and_record_episode(environment, actor, writer)

To sample across multiple shards without a gRPC loadbalancer I'll recommend you to take a look at https://www.tensorflow.org/api_docs/python/tf/data/Dataset#interleave (create a dataset for each client and interleave them).

fmxFranky commented 4 years ago

I want to implement R2D2/APEX on a DGX-1 machine, so that maybe it will not be an issue because in the two algorithms the number of actors are both less than 10k as you mentioned. Furthermore, only can sharding services make effect when i use it in the real project with huge memory usage which sigle machine cannot handle. The reason for me asking the question is that I have been tried to impletement APEX with other rpc tools and a third-party replay buffer package(named cpprb, do you know that?), however, i found that when 256 actors and a learner shared the same replay buffer for adding new transitions with priorities and updating existed transitions' priorities at the same time, the learner's speed will become very very slow than the numbers mentioned in paper, because all ops are blocked each other. So that i want to find the solution for the issue. I first known the tech 'sharding' in the ray-project. In the rllib of ray, it uses a number of shards buffers to make aysnc sampling and optimizing steps. When i see the same tech in reverb, I am curious if the two repo use the same trick for async sampling in APEX/R2D2 settings~ And it seems like reverb has embedded the async sampling into the server, right? I will appreciate if you can tell me more about this! I will have a try with a sigle reverb server first, thank you very much~

fastturtle commented 4 years ago

Hi @fmxFranky, we use a single-sharded Reverb setup quite regularly for training R2D2/APEX and you should be able to scale to use your DGX-1 with one or two shards (out of curiosity, are you using P100s or V100s?). The main reason we use multiple shards is when we cannot fit the replay onto a single machine.

We'd be interested in hearing your findings, so please keep us posted!

fmxFranky commented 4 years ago

Ok, i will share my attemps in DGX-1(40 cpu cores, 500G memory and 8 x tesla V100). As you can see, I also followed acme as the starting code and i am planning to build distributed actors by myself firstly because the team have no timetable to release Lauchpad for some reason. In my view, following the motivation of "decouple elements of rl training pipeline", it seems that it's possible to modified actor's behavior in EnvironmentLoop to achieve the goal(But i am not sure about the way of building interaction between actors, reverb and learner). My idea is that define the adder in each actor running in diffirent processes/threads and call adders.add() asynchronously affer building the reverb.server and learner in the main process. And I will have a try. If i can do this and run R2D2 successfully, I will share my findings about it. I will truly appreciate if you can tell me some details or tricks or technology about how to generate distributed actors in deepmind's unreleased Lauchpad. Thanks again~ I am glad to keep touch with you~

fastturtle commented 4 years ago

You've got the right idea, you can just pass a RecurrentActor into each EnvironmentLoop. You'll need to figure out how to update variables from the Learner -> Actor(s), which is be done with the variable_client. You can run some tests without using a variable_client but it's possible to write one that uses Reverb to pass variables around.

I'd recommend you use separate processes for each of the actors, threads will be fine for a couple of actors but performance will degrade quickly.

Also we (the Acme devs) are working out how we want to open-source a version of our distributed agents. It isn't feasible to open-source the full stack we use, but we are exploring other options :)

fmxFranky commented 4 years ago

I think that the version of your distributed agents maybe a decoupled module in acme. And i hope you can release that as soon as possible:) I will try as you said above. And in my opinion it is enough for research(efficiently scalable rl) if i can implement my idea successfully. As for "exploring other options", is there any difference between the way we discussed above and the way your team want to explore?

fmxFranky commented 4 years ago

Hi, @fastturtle ! I met some problems when i trying to using reverb to share variables between actors and learners. I want to share my findings to you, and i will appreciate it if you can give me some advice! I firstly used the variable_client and variable_souce to tranform weights of the learner's network. I have modified DQN Agent in this way, and i only use one actor. The code can run normally, but i found that this way will decrease the speed of training process. I guess that variable_client.update() may custom some resource. So i want to use the way you said before: using reverb as a variable_server. I write the following code to test whether i can add weights to server:

from acme.tf import utils as tf2_utils
from acme.tf import networks
from acme import wrappers

import dm_env
import gym
import reverb
import tensorflow as tf

def make_environment(evaluation: bool = False) -> dm_env.Environment:
  env = gym.make('PongNoFrameskip-v4', full_action_space=True)

  max_episode_len = 108_000 if evaluation else 50_000

  return wrappers.wrap_all(env, [
    wrappers.GymAtariAdapter,
    functools.partial(
      wrappers.AtariWrapper,
      to_float=True,
      max_episode_len=max_episode_len,
      zero_discount_on_life_loss=True,
    ),
    wrappers.SinglePrecisionWrapper,
  ])

gpus = tf.config.list_physical_devices(device_type='GPU')
for gpu in gpus:
  tf.config.experimental.set_memory_growth(device=gpu, enable=True)
tf.config.set_visible_devices(devices=gpus[0], device_type='GPU')

varaible_server = reverb.Server(tables=[
  reverb.Table(
    name='varaible_table',
    sampler=reverb.selectors.Lifo(),
    remover=reverb.selectors.Fifo(),
    max_size=3,
    rate_limiter=reverb.rate_limiters.MinSize(1)),
],
  port=None
)

address = f'localhost:{varaible_server.port}'
# varaible_client = reverb.TFClient(address)

env = make_environment()
env_spec = acme.make_environment_spec(env)
learner_model = networks.DQNAtariNetwork(env_spec.actions.num_values)
tf2_utils.create_variables(learner_model, [env_spec.observations])
np_learner_variables = [
  tf2_utils.to_numpy(v) for v in learner_model.variables
]

varaible_adder = reverb.Client(address)
varaible_adder.insert(np_learner_variables, priorities={'varaible_table': 1.0})

However, it seems like that the model's variables are too huge to insert to server. It occurs following errors:

Traceback (most recent call last):
  File "/home/mxfeng/acme/acme/parallel_tf_agents/dist_variable_utils.py", line 59, in <module>
    varaible_adder.insert(np_learner_variables, priorities={'varaible_table': 1.0})
  File "/usr/local/lib/python3.6/dist-packages/reverb/client.py", line 238, in insert
    table=table, num_timesteps=1, priority=priority)
  File "/usr/local/lib/python3.6/dist-packages/reverb/client.py", line 172, in create_item
    self._writer.CreateItem(table, num_timesteps, priority)
IndexError: Sent message larger than max (32071728 vs. 30000000)

How can i add the whole model's weights into the server? Thank you very much!

fastturtle commented 4 years ago

Hi @fmxFranky, the maximum message size is set by kMaxMessageSize (currently set to 30 MB) and used to create the gRPC channel. You can increase this constant, build reverb from source, and install the new .whl file to test out the modification.

I'll discuss with @acassirer to see if there is a better solution that will work in the longer-term.

Note: Increasing the max message size may have other consequences, but I'm unsure of any at the moment.

fmxFranky commented 4 years ago

Hi, @fastturtle, I will try it~ Yesterday i tried to use other rpc tools(i.e. ray or PARL) to define remote actor and run the loop for collecting data controlled by my main process, but it seems that this way still decreased the speed, maybe because of the third-party rpc itself. And i also found that it will break the source code of acme to invoking other tools(I need to rewrite the code for Actor, Learner etc.). So i will next try to use run many actors via multiprocesses or multiprograms and with modified reverb package. When i decide to use multiprocess or multiprograms to run actors and learner at the same time, a new problem occurred: i dont know how to get the current capacity of the replay buffer. I read the code of acme and dont find any methods to get the number of items in the replay_table. But i need this property to let the decoupled learner wait until the items are enough for the learner to start updating. In acme, it use a negetive min_observations to do this. How to deal with it when i run leaner and actors in diffirent processes or programs? Thanks~

fmxFranky commented 4 years ago

Hey, @fastturtle, I cannot build reverb from dockerfile. And it seems that the dockerfile has not been updated in a month. So can you update the dockerfile to fit the new version of TF?

fastturtle commented 4 years ago

Hey @fmxFranky, I'd recommend using multiple processes/programs, where each actor runs in its own process. The reverb.Client.server_info() method returns a proto that contains a map of TableInfo protos, which has information on the table's current size. However items can (will) be added/removed by the time you act upon that information. Instead I would use a rate limiter to control when the learner starts updating. In particular you can use a reverb.MinSize if you just want to make sure your Learner waits until N items are in the table.

What error are you seeing when using the dockerfile?

fmxFranky commented 4 years ago

Hi, @fastturtle, I fix the building error by using a 4CPU Vultr machine:)

I use a less elegant way to achieve parallel acme. I finally decide using ray to make all actor run on the cluster's different cpus and build a remote reverb client outside the learner to share variables asynchronously. The reason for doing this is that I found that The large number of model parameters of the neural network causes the transmission speed to be slow even when using reverb(add a new DQNAtariNetwork uses 0.13\~0.18s, while adding a new version R2D2AtariNetwork uses 0.35\~0.4s!. If this process running with learner's update process, the learner's updating speed will become very slow). I override two classes to achieve the goal:

  1. Override the FeedForwardActor with ray.remote decorator to get RemoteFeedForwardActor
  2. Define a RemoteVariableClient to share parameters asynchronously

When run the APEX , R2D2 or IMPALA, only adding serveral lines can I run Distributed version of acme algorithms

  1. Define remote_actor = RemoteActor.remote(...)
  2. Define the acme agent showing in acme examples
  3. Run actors asynchronously
  4. Run agent._learner.step() looply instead of running EnvironmentLoop.run()

For my attempts, I thinks the process of sharing parameters with a reverb server may be a bottleneck because it must has a delay in data transmission. So I am curious about how to deal with it in Lauchpad? Or can you give some advice to improve my method? Thanks a lot!

The limitation of ray is that i can only use cpu actor. When I use gpu actor, i cannot set the memory with tf.config.experimental.set_memory_growth(device=gpu, enable=True) in remote actor....

Another result is that i found maybe my machine is not good enough(2080Ti), When I run the original single-process R2D2 algorithm in acme(without any changes, only modified the batch_size, burn-in length and trace_length), the update speed of the Learner can only reach 2.5 steps per second (only run the step() method of Learner looply), and the actor's speed can only reach 70 environment steps per second only use CPU(the number is 170 when using GPU). While the numbers mentioned in the paper are 5 learner steps per second and 260 environments steps per second. Maybe only running it on TPU can the speed match the numbers mentioned in the paper....