Eclectic-Sheep / sheeprl

Distributed Reinforcement Learning accelerated by Lightning Fabric
https://eclecticsheep.ai
Apache License 2.0
274 stars 26 forks source link

The algorithm framework differs from the DreamerV1 paper #226

Closed LYK-love closed 2 months ago

LYK-love commented 4 months ago

Hello, I found followig code in sheeprl/algos/dreamer_v3.py:

# Train the agent
        if update >= learning_starts and updates_before_training <= 0:
            local_data = rb.sample_tensors(
                cfg.algo.per_rank_batch_size,
                sequence_length=cfg.algo.per_rank_sequence_length,
                n_samples=(
                    cfg.algo.per_rank_pretrain_steps if update == learning_starts else cfg.algo.per_rank_gradient_steps
                ),
                dtype=None,
                device=fabric.device,
                from_numpy=cfg.buffer.from_numpy,
            )
            with timer("Time/train_time", SumMetric, sync_on_compute=cfg.metric.sync_on_compute):

                # All update steps
                for i in range(next(iter(local_data.values())).shape[0]):
                    if per_rank_gradient_steps % cfg.algo.critic.target_network_update_freq == 0:
                        tau = 1 if per_rank_gradient_steps == 0 else cfg.algo.critic.tau
                        for cp, tcp in zip(critic.module.parameters(), target_critic.parameters()):
                            tcp.data.copy_(tau * cp.data + (1 - tau) * tcp.data)
                    batch = {k: v[i].float() for k, v in local_data.items()}
                    train(
                        fabric,
                        world_model,
                        actor,
                        critic,
                        target_critic,
                        world_optimizer,
                        actor_optimizer,
                        critic_optimizer,
                        batch,
                        aggregator,
                        cfg,
                        is_continuous,
                        actions_dim,
                        moments,
                    )
                    per_rank_gradient_steps += 1
                train_step += world_size
            updates_before_training = cfg.algo.train_every // policy_steps_per_update
            if cfg.algo.actor.expl_decay:
                expl_decay_steps += 1
                actor.expl_amount = polynomial_decay(
                    expl_decay_steps,
                    initial=cfg.algo.actor.expl_amount,
                    final=cfg.algo.actor.expl_min,
                    max_decay_steps=max_step_expl_decay,
                )
            if aggregator and not aggregator.disabled:
                aggregator.update("Params/exploration_amount", actor.expl_amount)

Based on my understanding, the training procedure of DreamerV3 is the same as DreamerV1, as is shown in DreamerV1's paper:

image-20240225122007862

Basically, we need to:

  1. initialize the replay buffer with some data, this is done through the interaction with the environment
  2. Start a while not converged loop, in each iteration we:
    1. Start a for i in range(update_steps) loop, in each iteration we:
      1. Start one time of dynamic learning
      2. Start one time of behavior learning
    2. After the for loop, we trained the agent to be better. We then use this better agent to do interaction with the environment and collect one episode and add it into the replay buffer.
  3. When the while loop ends, the training is over.

So, I think sheeprl's train() function is the same as: one time of dynamic learning + one time of behavior learning. It should be called for update_steps for a for loop, and the for loop should be called multiple times before the agent converges.

However, in the code I provided in the beginning, I didn't see the train() function is called for update_steps for a for loop, and I didn't see the outermost while loop. Meanwhile, I didn't find that, after each for loop, an episode is collected and added to the replay buffer.

I think sheeprl's implementation is a little different from the paper. Can you explain it?

What's more, can you explain what are : train_step, per_rank_gradient_steps, if update >= learning_starts and updates_before_training <= 0, updates_before_training, num_updates? I also can't understand the logic of this piece of code

for update in range(start_step, num_updates + 1):
        policy_step += cfg.env.num_envs * world_size

        # Measure environment interaction time: this considers both the model forward
        # to get the action given the observation and the time taken into the environment
        with timer("Time/env_interaction_time", SumMetric, sync_on_compute=False):
            # Sample an action given the observation received by the environment
            if (
                update <= learning_starts
                and cfg.checkpoint.resume_from is None
                and "minedojo" not in cfg.env.wrapper._target_.lower()
            ):
                real_actions = actions = np.array(envs.action_space.sample())
                if not is_continuous:
                    actions = np.concatenate(
                        [
                            F.one_hot(torch.as_tensor(act), act_dim).numpy()
                            for act, act_dim in zip(actions.reshape(len(actions_dim), -1), actions_dim)
                        ],
                        axis=-1,
                    )
            else:
                with torch.no_grad():
                    preprocessed_obs = {}
                    for k, v in obs.items():
                        preprocessed_obs[k] = torch.as_tensor(v[np.newaxis], dtype=torch.float32, device=device)
                        if k in cfg.algo.cnn_keys.encoder:
                            preprocessed_obs[k] = preprocessed_obs[k] / 255.0 - 0.5
                    mask = {k: v for k, v in preprocessed_obs.items() if k.startswith("mask")}
                    if len(mask) == 0:
                        mask = None
                    real_actions = actions = player.get_exploration_action(preprocessed_obs, mask)
                    actions = torch.cat(actions, -1).cpu().numpy()
                    if is_continuous:
                        real_actions = torch.cat(real_actions, dim=-1).cpu().numpy()
                    else:
                        real_actions = (
                            torch.cat([real_act.argmax(dim=-1) for real_act in real_actions], dim=-1).cpu().numpy()
                        )

            step_data["actions"] = actions.reshape((1, cfg.env.num_envs, -1))
            rb.add(step_data, validate_args=cfg.buffer.validate_args)

            next_obs, rewards, dones, truncated, infos = envs.step(real_actions.reshape(envs.action_space.shape))
            dones = np.logical_or(dones, truncated).astype(np.uint8)

        step_data["is_first"] = np.zeros_like(step_data["dones"])
        if "restart_on_exception" in infos:
            for i, agent_roe in enumerate(infos["restart_on_exception"]):
                if agent_roe and not dones[i]:
                    last_inserted_idx = (rb.buffer[i]._pos - 1) % rb.buffer[i].buffer_size
                    rb.buffer[i]["dones"][last_inserted_idx] = np.ones_like(rb.buffer[i]["dones"][last_inserted_idx])
                    rb.buffer[i]["is_first"][last_inserted_idx] = np.zeros_like(
                        rb.buffer[i]["is_first"][last_inserted_idx]
                    )
                    step_data["is_first"][i] = np.ones_like(step_data["is_first"][i])

        if cfg.metric.log_level > 0 and "final_info" in infos:
            for i, agent_ep_info in enumerate(infos["final_info"]):
                if agent_ep_info is not None:
                    ep_rew = agent_ep_info["episode"]["r"]
                    ep_len = agent_ep_info["episode"]["l"]
                    if aggregator and not aggregator.disabled:
                        aggregator.update("Rewards/rew_avg", ep_rew)
                        aggregator.update("Game/ep_len_avg", ep_len)
                    fabric.print(f"Rank-0: policy_step={policy_step}, reward_env_{i}={ep_rew[-1]}")

        # Save the real next observation
        real_next_obs = copy.deepcopy(next_obs)
        if "final_observation" in infos:
            for idx, final_obs in enumerate(infos["final_observation"]):
                if final_obs is not None:
                    for k, v in final_obs.items():
                        real_next_obs[k][idx] = v

        for k in obs_keys:
            step_data[k] = next_obs[k][np.newaxis]

Thanks!

belerico commented 4 months ago

Hi @LYK-love, as you said the algorithm you posted refers to the Dreamer-V1 one, which by the way is very similar to the one you shared here. The code you refer to is the one of Dreamer-V3, which is quite different from the V1 version, especially from the insights we have gained from looking at the authors code. As pointed out in #218 we have run experiments and we are matching the Dreamer-V3 paper. In #223 we are considering adopting the more generalized and accepted replay_ratio instead of the Hafner train_ratio (which is related) to better match the Dreamer-V3 implementation. For your last question: please refer first to the #223, for which we have removed the per_rank_gradient_steps and updates_before_learning and replace it with the replay_ratio; also, have you tried to look at the how-to where we explain the policy_steps and everything related?

michele-milesi commented 4 months ago

Hi @LYK-love, as @belerico said, we are moving from our way of computing the replay_ratio (with train_every and per_rank_gradient_steps parameters) to a more standard way.

In any case, I would like to answer your questions:

Perhaps what has led you astray is that in this implementation we do not have an outer loop with two loops inside (one for environment interaction and the other for training), but we do have a loop for environment interaction and inside it, we check whether we need to carry out training. This choice was made to follow the original repository as closely as possible.

So the structure of our code is (let me change the name of the variables, for better understanding):

# counter of policy steps played, when zero, then you have to train the agent
initialize env_interaction_steps_between_trainings
for i in total_steps:
    env.step(action)

    env_interaction_steps_between_trainings -= 1
    if train_started and env_interaction_steps_between_trainings <= 0:
        for j in per_rank_gradient_steps:
            train()
        reset env_interaction_steps_between_trainings

Another thing I noticed is that num_updates and updates_before_training are imprecise names, we will fix them for clarity. Let me know if it is clearer now. Thanks

LYK-love commented 3 months ago

Thanks! I'll do more work on sheeprl and try to reproduce the results of the oiginial paper.

LYK-love commented 3 months ago

@michele-milesi So for each iteration in for i in total_steps:, only one step of env interaction is performed, right? While in the pseudocode, T steps will be performed in one iteration of the outermost while loop. I think T is the episode length.

michele-milesi commented 3 months ago

Yeah, at each iteration of the outer for-loop, one step of env interaction is performed. You can obtain the same behaviour of the pseudocode by properly setting the env_interaction_steps_between_trainings variable (our train_every). If set to 10, it means that you perform 10 steps of env interaction between one training and the next one. Remember that the Dreamer aims to be sample efficient (the fewer steps of env interaction you do between one training session and the next, the better).