google-deepmind / dm_robotics

Libraries, tools and tasks created and used at DeepMind Robotics.
Apache License 2.0
332 stars 32 forks source link

Training specific agentflow subtaskoption #13

Closed jangirrishabh closed 1 year ago

jangirrishabh commented 1 year ago

Is their an example showing how to train a specific option policy? For example, from the agent flow tutorial, how can we setup training for the ExamplePolicy. The problem being that the output of step in the main_loop may have a different observation_spec and action_spec than the policy.

# Stubs for pulling observation and sending action to some external system.
observation_cb = ExampleObservationUpdater()
action_cb = ExampleActionSender()

# Create an environment that forwards the observation and action calls.
env = ProxyEnvironment(observation_cb, action_cb)

# Stub policy that runs the desired agent.
policy = ExamplePolicy(action_cb.action_spec(), "agent")

# Wrap policy into an agent that logs to the terminal.
task = ExampleSubTask(env.observation_spec(), action_cb.action_spec(), 10)
logger = print_logger.PrintLogger()
aggregator = subtask_logger.EpisodeReturnAggregator()
logging_observer = subtask_logger.SubTaskLogger(logger, aggregator)
agent = subtask.SubTaskOption(task, policy, [logging_observer])

reset_op = ExampleScriptedOption(action_cb.action_spec(), "reset", 3)
main_loop = loop_ops.Repeat(5, sequence.Sequence([reset_op, agent]))

# Run the episode.
timestep = env.reset()
while True:
  action = main_loop.step(timestep)
  timestep = env.step(action)

  # Terminate if the environment or main_loop requests it.
  if timestep.last() or (main_loop.pterm(timestep) > np.random.rand()):
    if not timestep.last():
      termination_timestep = timestep._replace(step_type=dm_env.StepType.LAST)
      main_loop.step(termination_timestep)
    break
araju commented 1 year ago

We unfortunately don't have open-sourced code for training, but I'll try to explain the pieces of the system and how you can handle differing action/observation specs.

There are 3 layers to this setup:

  1. The base environment, which is ProxyEnvironment in the example.
  2. The subtask, which specifies a particular reward, observation, and action spec. ExampleSubTask in the example.
  3. The policy/option which provides actions. subtask.SubTaskOption in the example.

If all 3 have the same action/observation spec, then this becomes a standard RL problem. For instance, you can push the actions and timesteps to a buffer and use your favorite off-policy RL algorithm to learn.

If, for whatever reason, your base environment has a different action and observation spec from the policy you are training, though, you can use the subtask to convert between the 2. The Subtask.agent_to_parent_action() and Subtask.parent_to_agent_timestep() methods allow for this ("parent" means base environment in here).

The ParameterizedSubTask class gives an easy way to build the correct subtask which does all the conversions you want. The action_space converts from the base environment action spec to your agent's action spec, and the timestep_preprocessor can modify the observation spec while also adding or changing the rewards/discounts you may need.

Does this answer your question?

jangirrishabh commented 1 year ago

Thanks for the prompt response! I understand the design choices here, but is pushing the data to a buffer and training in an off-policy way the only option?

To be more clear, here in order to train a policy I believe we will need to run the environment as a whole. So this is environment centric API. I am trying to think of a way to spawn multiple environments parallely for a single agent i.e. agent centric training.

Currently the agent/network lives inside the policy/option class. The env.step() func returns outputs from all the options. If there could be a way to isolate data from the learned policy/option, maybe this could be done. How can we spawn multiple envs and gather data from all of them but just the learnable option data? Does my question make more sense now? Thanks!

araju commented 1 year ago

Maybe I understand what you're asking. Let me try to draw out an example, and you can correct me where I'm missing things.

First,

but is pushing the data to a buffer and training in an off-policy way the only option?

Nope, this was just an example I was giving. None of MoMa or Agentflow necessitate any particular agent-training style. It is generic, and should be able to support what you're hoping to do.

Take a look at the example environment built in the MoMa tutorial Colab. I'll use that as our example in this response since it is a bit more concrete than the one you pointed to above.

In it, we have a Sawyer arm with a parallel gripper and a small block to lift. You can see how the base environment (BaseTask) is built in the Colab.

That base environment has many sensors and a raw 7D joint velocity effector. Let's imagine the base environment provided all the following observations:

sawyer_joint_pos
sawyer_joint_vel
sawyer_joint_torque
gripper_pos
gripper_quat
gripper_finger_pos
gripper_finger_vel
block_pos
block_quat

And let's say, for your policy, you want (a) a smaller action space to make learning easier and (b) less observations to feed in. Then you can do both of these via the "subtask" abstraction. In your subtask, you can specify an ActionSpace to map your policy's acitons to the base environment and you can add a RetainObservations preprocessor to filter out only the observations you want. Again, the MoMa tutorial Colab shows how to do both of these.

So, now, with these changes, you should have an environment which exposes the exact action space your policy wants and the exact data you want to feed into your policy. For instance, this can be the new list of observations:

sawyer_joint_pos
sawyer_joint_vel
gripper_pos
gripper_quat
gripper_finger_pos
block_pos

Now, your policy does not necessarily need to be wrapped in an Option. This is a handy abstraction when you are dealing with multiple subtasks, but if you are just training one policy for one task, you don't necessarily need it (even if you are running multiple environments concurrently).

However, for the purposes of this answer, let's assume your policy network which you are training is at least wrapped in some class which has a step() function:

class MyPolicy:

  def step(timestep: dm_env.TimeStep) -> np.ndarray:
    # `timestep` is the output of the Sawyer environment.
    flattened_observations = ...  # do some processing to take the observations are merge them into a list of floats
    normalized_action = ...  # feed the floats into your network and get some output.
    unnormalized_action = ...  # scale the network output back to the environment action spec.
    return unnormalized_action

The example above is bare bones and missing some things you may need for concurrency, but you get my drift.

Now, if you want to run your policy with different environments, you could run the following lines in several threads:

env = MySawyerEnv()  # this is a SubTaskEnvironment
agent = MyPolicy()
for _ in range(num_episodes):
  timestep = env.reset()
  while not timestep.last():
    action = agent.step(timestep)
    timestep = env.step(action)

tl;dr: the Subtask abstraction, with it's ActionSpace and TimestepPreprocessor abstractions should give you the tools to change a base environment into what you want to expose to your policy.

jangirrishabh commented 1 year ago

Thanks for the explanation! this example would really help me put forth my point better now. Suppose we divide this block lift scenario into 4 different parts. 3 of which are straightforward hard-coded options/policies and 1 is a learned policy. For eg, lets say: Option 1 (hard-coded): Take gripper above the object, Option 2 (learned policy): Given the observations, find the best grasp location, Option 3 (hard-coded): Given the grasp location, execute grasp, Option 4 (hard-coded): Take grasped object to location.

Now, when we start the simulator it starts in the initial state, and then when we execute env.step() it cycles through 1->2->3->4->reset and repeat. Given this, the observations we will now see inside the loop will not just belong to Option 2, but all the env episodes.

So my question is, how can I train Option 2 in this case? Other options dont need training, but in order to train Option 2 I need to get the simulator to a particular state by transitioning through 1-2-3-4-reset. I know one answer is just to get rid of all this modularity and simply make 1, 3, 4 part of the environment init and reset operations. But that kills the whole effort of your Agentflow API I feel. Their should be a smarter way to do this?

Thank you very much for your time and patience!

araju commented 1 year ago

Ah, ok. I see your situation now. Yes! Agentflow actually is exactly designed for this, while MoMa's SubTaskEnvironment (a stricter version of Agentflow which adhere's to DeepMind's environment API) is not. So feel free to ignore pieces of my previous message, haha.

You rightly described each phase of the task as an "option". They would be SubTaskOptions, made of both a SubTask and the scripted or learned policy/option.

Again, like my last message, let's say the base environment exposes a 8D action space (7D for the arm joints and 1D for the gripper). And let's say it exposes the following observations again:

sawyer_joint_pos
sawyer_joint_vel
sawyer_joint_torque
gripper_pos
gripper_quat
gripper_finger_pos
gripper_finger_vel
block_pos
block_quat

Here is how you could compose the various SubTaskOptions you need:

base_env = ...

parent_spec = spec_utils.TimeStepSpec(
    base_env.observation_spec(),
    base_env.reward_spec(),
    base_env.discount_spec())

# Create a cartesian ActionSpace which projects 6D Cartesian actions to the 8D base environment action space.
cartesian_action_space = ...

# Phase 1 - Move the arm above the object.
# Create a TimestepPreprocessor which terminates when the gripper is above the object.
# You can terminate subtasks/options by setting the "pterm" (probability of termination) to 1.0.
# See subtask_termination.py for examples on how to write a terminating timestep preprocessor.
stop_above_obj_preprocessor = ...
move_gripper_above_obj_task = af.ParameterizedSubTask(
    parent_spec=parent_spec,
    # Assume your scripted motion outputs 6D Cartesian actions.
    action_space=cartesian_action_space,
    timestep_preprocessor=stop_above_obj_preprocessor)
move_gripper_above_obj_scripted_option = ...
move_gripper_above_obj = af.SubTaskOption(
    sub_task=move_gripper_above_obj_task,
    agent=move_gripper_above_obj_scripted_option)

# Phase 2 - Choose grasp location.
# This is the learned phased. Let's assume 2 things:
#   1: your learned policy operated in a smaller action space than the base env.
#   2: the policy reads a subset of the base env observations.
# You can still handle both of these via a ParameterizedSubTask.
move_to_grasp_loc_action_space = ...
move_to_grasp_loc_task = af.ParameterizedSubTask(
    parent_spec=parent_spec,
    action_space=move_to_grasp_loc_action_space,
    timestep_preprocessor=af.RetainObservations([
        'sawyer_joint_pos',
        'block_pos',
        'block_quat',
    ]))
move_to_grasp_location_learned_policy = ...
move_to_grasp_location = af.SubTaskOption(
    sub_task=move_to_grasp_loc_task,
    agent=move_to_grasp_location_learned_policy)

# Phase 3 - Execute grasp
...
grasp = af.SubTaskOption(...)

# Phase 4 - Move object to new location
...
move_to_new_loc = af.SubTaskOption(...)

full_procedure = af.Sequence([
    move_gripper_above_obj,
    move_to_grasp_location,
    grasp,
    move_to_new_loc
])

# Run an episode
timestep = base_env.reset()
while not timestep.last():
  action = full_procedure.step(timestep)
  timestep = base_env.step(action)

  # Terminate if the environment or main_loop requests it.
  if full_procedure.pterm(timestep) > np.random.rand():
    if not timestep.last():
      # Make sure the option sees a LAST timestep.
      termination_timestep = timestep._replace(step_type=dm_env.StepType.LAST)
      main_loop.step(termination_timestep)
    break

You can configure each SubTask to have the exact action space, set of observations, and rewards/discounts that you want for that particular phase. Different phases do not need to share the same observations.

jangirrishabh commented 1 year ago

Alright, so I can define each step as an SubtaskOption. Currently the termination condition for my tasks is MAX_STEPS. Maybe I am missing something subtle here. What I do not understand is within the loop-

# Run an episode
timestep = base_env.reset()
while not timestep.last():
  action = full_procedure.step(timestep)
  timestep = base_env.step(action)

  # Terminate if the environment or main_loop requests it.
  if full_procedure.pterm(timestep) > np.random.rand():
    if not timestep.last():
      # Make sure the option sees a LAST timestep.
      termination_timestep = timestep._replace(step_type=dm_env.StepType.LAST)
      main_loop.step(termination_timestep)
    break

when I call timestep = base_env.step(action) I observe all the timesteps belonging to the env cycling from 1-2-3-4, all the subtaskoptions. Given that I want to train only option 2 (i.e. move_to_grasp_location_learned_policy), I would hope to have a way in which I can expose observations belonging only for option 2.

TLdr: How can I train move_to_grasp_location_learned_policy , especially by spawning multiple envs? Thank you!

araju commented 1 year ago

The output of base_env.step() will not change, no matter which SubTaskOption is active at the moment. The thing that changes is what the af.Option sees within the SubTaskOption.

Let's say move_to_grasp_location_learned_policy is an af.Option. So it will look something like:

class MyPolicy(af.Option):

  def step(timestep) -> np.ndarray:
    flattened_observations = ...  # do some processing to take the observations are merge them into a list of floats
    normalized_action = ...  # feed the floats into your network and get some output.
    unnormalized_action = ...  # scale the network output back to the environment action spec.
    return unnormalized_action

The timestep in MyPolicy.step() will only include the observations you want for training (assuming you set up the move_to_grasp_loc_task correctly). So, there is a difference. The base_env.step() will always return the full list of observations, but your policy will receive the subset you defined.

A bit of "how to train move_to_grasp_location_learned_policy" depends on, well, how you want to train it. At the end of the day though, you will be needing state-action trajectories. The relevant state-action trajectories, in this case, are not in the main run-loop you copied above. They are actually within the move_to_grasp_loc SubTaskOption.

Here are a couple ideas:

  1. You do your learning within MyPolicy, either offline or online or however you want to. You can even make the step() threadsafe so if you decide to use the same policy object across all the threads with your environments, it will work fine.
  2. You can use an af.SubTaskObserver and attach it to the subtask in order to observe the trajectories of Phase 2. Then save those trajectories somewhere and train your policy based on those. Again, the intermediate storage for your trajectories should be threadsafe.

Like I said, the specifics on how to train the policy depend on your approach. But the relevant trajectories are all accessible in the SubTaskOption, not in the main base_env-full_procedure run-loop. Stick your learning stuff within the subtask.

Does that make sense?

jangirrishabh commented 1 year ago

Thank you @araju ! that answers my question for a single learnable agent! For thread reference, I used the af.SubTaskObserver to stack observations and pass this observer object to the main loop, popping observations from it whenever new obs are added to the learnable option.

araju commented 1 year ago

Glad that worked! Closing the issue.