Open fmxFranky opened 4 years ago
Hi @fmxFranky ,
Did you happen to try out parallelizing the environments (as in baselines SubprocVec) using ACME codebase? If so, could you share how you did it?
Hi, @jaejaywoo , my method is that using the interfaces in dm_env
to wrap each transition in SubprocVec into dm_env.Timestep
instances and create N adders to add them into the replay server. However, this way is also slowly because the adding process is serial so that it will cost more time. And i am sorry that i have no idea about how to achieve the goal in a efficient way....
Thanks for your reply @fmxFranky :)
Hi @fmxFranky, I would recommend maintaining one Adder per environment. Each adder is linked to a Reverb client, which maintains a gRPC stream that the Reverb server uses for keeping track of episodes.
Also, I just did a quick estimation of how long it takes to add data to Reverb using the tutorial colab and it appears to take ~0.37 ms per call to Adder.add
. This is using the gym mountaincar environment, which may have smaller state than your environments, but I don't expect this to change much unless you're using images. This means that it'd take approximately 23.68ms to add the state from all 64 environments, is this what you're seeing?
You can use a thread pool to parallelize the calls to Adder.add
without having to change the internal code at all. You'll need to be careful about propagating errors, but this will be similar to any multi-threaded code.
For example:
pool = concurrent.futures.ThreadPoolExecutor(num_workers=...)
futures = []
for timestep, action, adder in zip(timesteps, actions, adders):
f = pool.submit(adder.add(action, timestep))
futures.append(f)
# Do some error checking on futures
...
Thanks a lot, @fastturtle , I will add a thread pool to make the calls parallelized~
Great, let us know how it goes :)
Great, let us know how it goes :) I find that using thread pool doesn't give me any time savings
with concurrent.futures.ThreadPoolExecutor(max_workers=len(self._adder)) as pool: futures = [] for (i, next_timestep) in enumerate(next_timestep_list): f = pool.submit(self._adder[i].add, action[i], next_timestep) futures.append(f) concurrent.futures.wait(futures, return_when=concurrent.futures.ALL_COMPLETED)
The above code sets env_num= 3, 30, 300 , and the time is still multiplied.
ThreadPools are subject to GIL, which means only one thread can run at a time. (It's concurrent but not parallel), so you won't have any speedup beyond 1x~1.2x. Multi-processing is the way to go; acme has been using launchpad to deal with parallel actors (see distributed agent examples).
I wanna use acme's algorithms to interact with the vectorized environments(just like
baselines.VectorEnv
), however maintaining N adders for N environments will decrease the efficiency when collecting samples and sending them to reverb_server(when I use my VecEnv(num_envs=64), sending transitions to replay buffer will use 60% time of the total sampling time). So how to modify the code ofacme.adders.reverb.base.Adder
and its subclasses to make it can add several environments' transitions in one adder? Thank you very much~