RoboBase, robot learning baselines, covering:
Top Features of RoboBase:
System installs:
sudo apt-get install ffmpeg # Usually pre-installed on most systems
pip install .
pip install ".[dmc]"
sudo apt-get install python3.10-dev # if using python3.10
./extra_install_scripts/install_coppeliasim.sh # If you dont have CoppeliaSim already installed
pip install ".[rlbench]"
pip install ".[bigym]"
:white_check_mark: = High confidence that it is implemented correctly and thoroughly evaluated.
:warning: = Lower confidence that it is implemented correctly and/or thoroughly evaluated.
Method | Paper | 1-line Summary | Differences to paper? | Stable |
---|---|---|---|---|
drqv2 | Mastering Visual Continuous Control: Improved Data-Augmented Reinforcement Learning | Uses augmentation (4-pixel shifting) and layer-norm bottleneck to aid learning from pixels. | None. | :white_check_mark: |
alix | Stabilizing Off-Policy Deep Reinforcement Learning from Pixels | Rather then augmentation (as in DrQV2), uses a Adaptive Local SIgnal MiXing (LIX) layer that explicitly enforces smooth featuremap gradients. | None. | :white_check_mark: |
sac_lix | Soft Actor-Critic: Off-Policy Maximum Entropy Deep Reinforcement Learning with a Stochastic Actor | Maximum entropy RL algorithm that has adaptive exploration. | Uses ALIX as the base algorithm. | :white_check_mark: |
drm | DrM: Mastering Visual Reinforcement Learning through Dormant Ratio Minimization | Uses dormant ratio as a metric to measure inactivity in the RL agent's network to allow effective exploration. | None. | :warning: |
dreamerv3 | Mastering Diverse Domains through World Models | Learns world models with CNN/MLP encoder and decoder. | None. | :white_check_mark: |
mwm | Masked World Models for Visual Control | World model (similar to DreamerV2) that uses Masked Autoencoders (MAE) for visual feature learning. | None. | :white_check_mark: |
iql_drqv2 | Offline Reinforcement Learning with Implicit Q-Learning | Does not evaluate "unseen" actions to limit Q-value overestimation. | Uses DrQv2 as the base algorithm. | :white_check_mark: |
CQN | Coarse-to-fine Q-Network | Value-based agent (without a separate actor) for continuous control that zooms into discrete action space multiple times. | None. | :white_check_mark: |
Method | Paper | 1-line Summary | Differences to paper? | Stable |
---|---|---|---|---|
diffusion | Diffusion Policy: Visuomotor Policy Learning via Action Diffusion | Brings diffusion to robotics. | None. | :warning: |
act | Learning Fine-Grained Bimanual Manipulation with Low-Cost Hardware | Transformer and action-sequence prediction. | None. | :white_check_mark: |
Feature (argument name) | Description | Methods supported |
---|---|---|
Action sequence (action_sequence) | Same as action chunking in ACT, it allows a model to predict a sequence of actions per inference time | All methods |
Frame stacking (frame_stack) | Stacking current frame with previous ones to provide recent input history to the model | All methods |
Action standardization (use_standardization) | Based on demonstration data, perform z-score normalization on actions. Note that default option clips actions beyond $3\sigma$ | All methods |
Action min/max normalization (use_min_max_normalization) | Based on demonstration data, perform min/max normalization on actions. | All methods |
Distributional critic (method.distributional_critic) | A distributional version of the critic model based on A Distributional Perspective on Reinforcement Learning, known to improve learning stability | All RL methods |
Critic ensembling (method.num_critics) | Using multiple critics to mitigate value overestimation | All RL methods |
Intrinsic exploration algorithms (intrinsic_reward_module) | Advanced exploration through the use of intrinsic rewards | All RL methods |
Below is an example of launching a method using all of the above features:
python3 train.py method=sac_lix env=dmc/cartpole_swingup action_sequence=3 \
frame_stack=3 use_standardization=true method.num_critics=4 intrinsic_reward_module=rnd \
method.distributional_critic=true method.critic_model.output_shape=\[251, 1\]
All implemented methods should extend Method
:
class Method:
def __init__(
self,
observation_space: spaces.Dict,
action_space: spaces.Box,
device: torch.device,
num_train_envs: int,
replay_alpha: float,
replay_beta: float,
):
...
@property
def random_explore_action(self) -> torch.Tensor:
# Produces a random action for exploration
...
@abstractmethod
def act(
self, observations: dict[str, torch.Tensor], step: int, eval_mode: bool
) -> BatchedActionSequence:
# Called when an action is needed in the environment. Outputs tensor: (B, T, A)
...
@abstractmethod
def update(
self,
replay_iter: Iterator[dict[str, torch.Tensor]],
step: int,
replay_buffer: ReplayBuffer = None,
) -> Metrics:
# Called when gradient updates should be performed
...
@abstractmethod
def reset(self, step: int, agents_to_reset: list[int]):
# Called on each environment.
...
Within the update
method, we can access batch data from the replay buffer via:
batch = next(replay_iter)
Batch will be a dictionary mapping strings to torch.Tensor
. All observation data will have the following shape: (B, T, ...)
, where B
is batch size, and T
is an observation history (aka frame stack).
Networks should be passed into the Method
class so that they can be parameterised through Hydra.
Most of the methods in RoboBase assume 3 networks (RoboBaseModule
) to be passed in:
If you are frame stacking on channel, i.e. frame_stack_on_channel=true
, then:
(B, V, T, C, W, H)
⌄
(B, V, T * C, W, H)
⌄
|EnoderModule|
⌄
(B, V, Z)
⌄
|FusionModule|
⌄
(B, Z',)
⌄
|FullyConnectedModule|
⌄
(B, T', A)
If you are using an rnn to roll in the frame stack, i.e. frame_stack_on_channel=false
, then:, then:
(B, V, T, C, W, H)
⌄
(B * T, V, C, W, H)
⌄
|EnoderModule|
⌄
(B * T, V, Z)
⌄
|FusionModule|
⌄
(B * T, Z')
⌄
(B, T, Z')
⌄
|FullyConnectedModule|
⌄
(B, T', A)
where V
is the number of cameras/views, and T'
is the action output sequence.
Note that FullyConnectedModule
can have either a 1-dim (Z,)
input or a 2-dim (T, Z)
input.
To stop training, execute ctrl-c
in the terminal. This will cleanly terminate the training process.
There are 4 common ways to use RoboBase:
Option 2, 3, and 4 requires you importing RoboBase into your project, while option 1 you can install and use directly in the terminal with no new code. See below for details on each of these options.
From the root of the project, you can launch experiments from any of the supported environments. Here are some examples:
Launch the sac_lix
method on the cartpole_swingup
task, with episode_length
1000.
python3 train.py method=sac_lix env=dmc/cartpole_swingup env.episode_length=1000
Let's launch this as a pixel-based experiment, using a prioritised replay buffer, and with some tensorboard logging:
python3 train.py method=sac_lix env=dmc/cartpole_swingup env.episode_length=1000 \
pixels=true replay.prioritization=true tb.use=true \
tb.log_dir=/tmp/robobase_tb_logs tb.name="my_experiment"
You can now track that experiment in tensorboard by running:
tensprboard --logdir=/tmp/robobase_tb_logs --port
and then in your browser, navigate to: http://localhost:6006/
For a full list of launch configs, see here.
Launch the drqv2
method on the reach_target
task, with episode_length
100, and 10 demos with pixels.
python3 train.py method=drqv2 env=rlbench/reach_target env.episode_length=100 demos=10 pixels=true
Let's reduce the number of channels in the CNN of our vision encoder, and the number of nodes in our critic MPL:
python3 train.py method=drqv2 env=rlbench/reach_target env.episode_length=100 demos=10 \
pixels=true method.encoder_model.channels=16 method.critic_model.mlp_nodes=\[128,128\]
You can create your own handy config file in robobase.cfgs.launch
and use them to launch your experiments.
Here are some examples:
python3 train.py launch=drqv2 env=rlbench/reach_target env.episode_length=100
python3 train.py launch=drqv2_pixel_dmc env=dmc/cartpole_balance
python3 train.py launch=mwm env=dmc/walker_walk
python3 train.py launch=mwm_rlbench env=rlbench/open_drawer
python3 train.py method=act pixels=true env=rlbench/reach_target
In a new project/repo, you will need to create a minimum of 3 files:
myenv.yaml
Factory
to build it, e.g. myenv.py
train.py
myenv.yaml
# @package _global_
env:
env_name: my_env_name
physics_dt: 0.004 # The time passed per simulation step
# Others ways to configure your environment
myenv.py
import gymnasium as gym
from gymnasium.wrappers import TimeLimit
from omegaconf import DictConfig
from robobase.envs.env import EnvFactory
from robobase.envs.wrappers import (
OnehotTime,
FrameStack,
RescaleFromTanh,
AppendDemoInfo,
ConcatDim,
)
class MyEnv(gym.Env):
pass
class MyEnvFactory(EnvFactory):
def _wrap_env(self, env, cfg):
env = RescaleFromTanh(env)
if cfg.use_onehot_time_and_no_bootstrap:
env = OnehotTime(env, cfg.env.episode_length)
env = ConcatDim(env, 1, 0, "low_dim_state")
env = TimeLimit(env, cfg.env.episode_length)
env = FrameStack(env, cfg.frame_stack)
env = AppendDemoInfo(env)
return env
def make_train_env(self, cfg: DictConfig) -> gym.vector.VectorEnv:
return gym.vector.AsyncVectorEnv(
[
lambda: self._wrap_env(MyEnv(), cfg)
for _ in range(cfg.num_train_envs)
]
)
def make_eval_env(self, cfg: DictConfig) -> gym.Env:
return self._wrap_env(MyEnv(), cfg)
train.py
import hydra
from robobase.workspace import Workspace
from myenv import MyEnvFactory
@hydra.main(
config_path="cfgs", config_name="my_cfg", version_base=None
)
def main(cfg):
workspace = Workspace(cfg, env_factory=MyEnvFactory())
workspace.train()
if __name__ == "__main__":
main()
In a new project/repo, you will need to create a minimum of 2 files:
mymethod.yaml
mymethod.py
method/mymethod.yaml
# @package _global_
method:
_target_: mymethod.MyMethod
my_special_parameter: 1
# Others ways to configure your environment
mymethod.py
import torch
from robobase.method.core import Method, BatchedActionSequence, Metrics
from typing import Iterator
from robobase.replay_buffer.replay_buffer import ReplayBuffer
class MyMethod(Method):
def __init__(self, my_special_parameter: int, *args, **kwargs):
super().__init__(*args, **kwargs)
self.my_special_parameter = my_special_parameter
def reset(self, step: int, agents_to_reset: list[int]):
pass
def update(self, replay_iter: Iterator[dict[str, torch.Tensor]], step: int,
replay_buffer: ReplayBuffer = None) -> Metrics:
pass
def act(self, observations: dict[str, torch.Tensor], step: int,
eval_mode: bool) -> BatchedActionSequence:
pass
You can then launch that algorithm on an environment, e.g.
python3 train.py --config-dir=. method=mymethod env=dmc/cartpole_swingup env.episode_length=1000
where config-dir
adds a config directory to the Hydra config search path.
A combination of the two configurations described above.
In your method, only log when logging is True; this will be slight more efficient, especially if you log a lot.
from robobase.method.core import OffPolicyMethod
class MyMethod(OffPolicyMethod):
def update(self, *args):
metrics = {}
if self.logging:
metrics["loss"] = 0
return metrics