ray-project / ray

Ray is a unified framework for scaling AI and Python applications. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
32.01k stars 5.45k forks source link

[rllib] Custom model for multi-agent environment: access to all states #7341

Closed janblumenkamp closed 1 year ago

janblumenkamp commented 4 years ago

What is your question?

My goal is to learn a single policy that is deployed to multiple agents (i.e. all agents learn the same policy, but are able to communicate with each other through a shared neural network). RLlib's multi-agent interface works with the dict indicating an action for each individual agent.

It is not entirely clear to me how my custom model is supposed to obtain the current state after the last time-step for all agents at once (it appears to me that RLLib calls the forward-function in my subclass inherited from TorchModelV2 for each agent individually and passes the state for each agent into the state argument of the forward function).

tl;dr, if this is my custom model:

class AdaptedVisionNetwork(TorchModelV2, nn.Module):
    """Generic vision network."""

    def __init__(self, obs_space, action_space, num_outputs, model_config, name):
        TorchModelV2.__init__(self, obs_space, action_space, num_outputs, model_config, name)
        nn.Module.__init__(self)
        # ... NN model definition

    @override(TorchModelV2)
    def forward(self, input_dict, state, seq_lens):
        features = self.predict(input_dict["obs"].float())
        logits = self._logits(features)
        self._cur_value = self._value_branch(features).squeeze(1)
        return logits, state

    @override(TorchModelV2)
    def value_function(self):
        assert self._cur_value is not None, "must call forward() first"
        return self._cur_value

Then how do I manage to predict the logits for all of my n agents at once while having access to the current state of all my agents? Am I supposed to use variable-sharing? Is #4748 describing this exact problem? If so, is there any progress?

ericl commented 4 years ago

@janblumenkamp you can try these two options:

(1) Grouping the agents: https://ray.readthedocs.io/en/latest/rllib-env.html#grouping-agents Then you can write a single policy/model that computes all the agents logits at once, and can implement the desired information sharing structure in the model.

(2) Reshape the batch in your custom model from (batch, data) to (batch, agent_id, data) for processing (i.e., do the grouping in the model only). You will need some way of figuring out the agent id for each batch entry.

janblumenkamp commented 4 years ago

Hi Eric, in the meantime, I found this example. The states of the other agents are basically considered as part of the observation space, which seems to be the simplest solution to me and seems similar to your second proposed solution. Would that make sense or do you see any downsides/problems in doing that (in particular in terms of performance)?

ericl commented 4 years ago

Yes, that would work great! I guess you will end up evaluating the 'shared' layer once per agent but the overhead there should be minimal.

janblumenkamp commented 4 years ago

Thank you!

janblumenkamp commented 4 years ago

Hi Eric, I have a follow-up question: In this blog post, you write

decomposing the actions and observations of a single monolithic agent into multiple simpler agents not only reduces the dimensionality of agent inputs and outputs, but also effectively increases the amount of training data generated per step of the environment

but in the agent grouping documentation, it says

RLlib treats agent groups like a single agent with a Tuple action and observation space.

Do I still have the advantages of multiple single agents (i.e. more distributed experience) if I use agent grouping, or is the grouped super-agent literally treated as one big agent with a huge observation and action space? I assume the former is the case?

ericl commented 4 years ago

or is the grouped super-agent literally treated as one big agent with a huge observation and action space

It's the latter, it really is one big super-agent. You could potentially still do an architectural decomposition within the super agent model though (i.e., to emulate certain multi-agent architectures).

janblumenkamp commented 4 years ago

(I hope it is okay to follow up again in this issue since the question is again closely related.) How would one go ahead if one wanted to incorporate RNNs into the shared neural network? So right now I have one policy and multiple agents, the state contains the state of all agents and an ID for the corresponding agents (exactly as in my example posted here). In my model I process the state of all the other agents and know from the transmitted ID which state is the one for this agent (since forward will be called for each agent, the first part is therefore, as you pointed out before, redundant for all agents). If I now add RNNs (in my example directly after running self.cnn_model) I need a shared state among all agents, right? Otherwise each agent would have a different internal state for all the other agents? Or is this not a problem?

janblumenkamp commented 4 years ago

Please ignore my previous question, I had a misconception of how the batching works.

Yes, that would work great! I guess you will end up evaluating the 'shared' layer once per agent but the overhead there should be minimal.

So far I followed this strategy, but now that I add LSTMs it seems that the overhead adds up significantly, especially for larger amounts of agents.

(2) Reshape the batch in your custom model from (batch, data) to (batch, agent_id, data) for processing (i.e., do the grouping in the model only). You will need some way of figuring out the agent id for each batch entry.

Probably I will have to do something like this then. I will not only need to figure out each agent's id, but also which elements in the batch are coming from the same time step, right? Is it guaranteed that consecutive elements in the batch are coming from the same environment and same time step? Even if I don't allow any early termination of agents, I sometimes have cases where I receive a batch that contains fewer entries than I have agents (here printed the batch size from inside my model):

(pid=86346) 1                                                      
(pid=86346) 1  
(pid=86346) 1                     
(pid=86346) 1                                  
(pid=86347) 20 
(pid=86347) 1                                 
(pid=86347) 1                        
(pid=86347) 1                                                                                                              
(pid=86347) 1    
(pid=86347) 1                                                                                                                       
(pid=86347) 1                         
(pid=86347) 1                                                                                                                                  
(pid=86347) 1                                                         
(pid=86347) 1                                                                                                                                                 
(pid=86347) 1                                  
(pid=86347) 1                                                                                                             
(pid=86347) 1              
(pid=86347) 1    
(pid=86347) 1  
(pid=86347) 1  
(pid=86347) 1  
(pid=86347) 1                    
(pid=86347) 1                     
(pid=86347) 1                                                                                                                
(pid=86347) 1      
(pid=86327) 700  
(pid=86327) 700                                                                            
(pid=86327) 600                                                                            
(pid=86327) 600                                                                            

(Here I have 5 agents in my environment, the batches initially sometimes contain 20 elements and then only 1, but there are exactly 20 of those size 1 batches and the agent ID I am sending in the observation indicates that those 20 batches are one size 20 batch disassembled into multiple smaller batches - why is that happening? Should I just discard those batches?) Is it guaranteed that consecutive entries in the batch are from the same time step or could there be a possibility that this is not the case?

ericl commented 4 years ago

Hmm what exactly is being printed out here? The batches should get merged together to the train batch size eventually, but you might see smaller fragments during processing.

Also, note that batch.count always measures number of environment steps, which could be much lower than the sum of agent steps.

ericl commented 4 years ago

Oh, the "size 1" batches is probably during inference if you're printing from your model, that's normal.

janblumenkamp commented 4 years ago

Oh I see, that makes sense, then it's coming from the evaluation I am running during training! I guess reshaping the batch in the model won't work then. Then I have to group the agents? For PyTorch that will only be possible after #8101 is finalized. It looks like that will happen very soon, but just out of curiosity, what other options do I have? Just to recap, basically I want n actions for n agents from a single shared observation space (and thus perform n policy updates for each observation). I probably could implement a standard gym environment (so not use the MultiAgentEnv) that provides the shared observation space and takes a MultiDiscreteAction. My model would take this observation and provide the actions. I can then take this MultiDiscreteAction in my own RolloutWorker similar to this example to perform n policy updates? Or is there an easier way?

ericl commented 4 years ago

There is one workaround, which is to have the policies execute forward() through the "shared" communication layer separately. Basically the shared unit will be evaluated N times for N policies instead of once, so this is more computationally expensive, but should give the same results.

You'll have to make sure the observation space contains the observations of all other agents though.

Edit: example, suppose I have agents A, B, obs_a, obs_b, pi_a, pi_b, and shared_layer.

Independent agents

obs_a -> pi_a(obs_a) -> action for A
obs_b -> pi_b(obs_b) -> action for B

With shared layer

(obs_a, obs_b) -> pi_a(obs_a, shared_layer(obs_b)) -> action for A
(obs_b, obs_a) -> pi_b(obs_b, shared_layer(obs_a)) -> action for B

This is basically a version of a grouped agent that trades efficiency for simplicity.

janblumenkamp commented 4 years ago

Not sure if I understand, can this be implemented entirely in the model definition or does this require changes in RLlib? If it can be implemented in the model, it surely would require a normal Gym env instead of the MultiAgentEnv? If I have only a single policy and multiple agents, the shared unit would only be evaluated once with your proposed solution?

janblumenkamp commented 4 years ago

Now I am trying this: I have a MultiAgentEnv that contains all agent's observations in each observation as well as an id for each agent. My model now receives the batched observations for all agents. The idea was to filter out only the observations from the agent of id 0 from the whole batch in the model (so basically ignore all agent's observations except one agent), then compute the actions for all agents from that single observation in forward and return that batch. This would only work as long as the batch size is a multiple of the number of agents and as long as the batch contains the same number of samples from each agent. Unfortunately, the latter is not necessarily true - it seems like it can happen that the samples in a batch can be completely scrambled (but not always). Why is that the case?

ericl commented 4 years ago

Not sure if I understand, can this be implemented entirely in the model definition or does this require changes in RLlib?

The example above both (1) altering the env to provide the required obs, and (2) altering the model to implement the desired communication.

My model now receives the batched observations for all agents. The idea was to filter out only the observations from the agent of id 0 from the whole batch in the model (so basically ignore all agent's observations except one agent),

I'm not sure I understand this. Why not instead give the full observation to each agent as in the above example? Then, each agent's policy can filter out the desired data without needing to mess with the batch dimension (no guarantees on how that dimension is organized).

janblumenkamp commented 4 years ago

I'm sorry, maybe I am completely misunderstanding how this works... I highly appreciate your patience with me! Let me reiterate what I want to do:

In my use case, I have a single policy for n agents (Level 1: Multiple agents, shared policy 'homogenous agent') and I want them to share information at every time step. The idea is that each agent receives the output features of a CNN of the observation of all other agents at the same time steps and processes this feature vector to an action in the shared layer. If I use the MultiAgentEnv, each individual agent's observation would be fed to the model as one batch entry, right? This means that at every time step, each agent has to compute the CNN output vector of every other agent: At t=0, agent 0 evaluates the CNN observation for itself and the n-1 other agents, agent 1 then computes the same CNN observation for itself and the other agents etc. This means I have to include the observations of all other agents in every observation, as you said. This is fine if I have like five agents, but the training will slow down almost linearly with the number of agents due to the redundant evaluations, or am I missing something?

So far, this was not a big problem, the training generally works. Now I am adding an LSTM between the CNN and the shared layer (to share the output features of the LSTM among agents). Now, not only the CNN, but also the LSTM is evaluated n times at every time step. Most importantly, in order to evaluate the LSTM for all other agents, each agent has to maintain a redundant copy of the recurrent state for the other agents, right? This is my main concern. Beside wasting memory and computation resources, wouldn't this mess with the learning if the states that are maintained by each agents for all other agents diverge since it's the same policy for all agents? I tried to visualize this architecture: masters-project-arch-0

As I see it, this problem can only be solved if there is something in between the MultiAgentEnv and the standard Gym Env: I want a single observation each time step (while maintaining the LSTM state for n agents) but output n actions from this single observation while updating the policy in the same way as if I had five subsequent observations (i.e., I don't want a single super agent since I want to be able to change the number of agents during training). Also here a small visualization of how I thought this might work. Does this make sense? masters-project-arch-1

no guarantees on how that dimension is organized

Fair, makes sense!

ericl commented 4 years ago

(deleted an older post that was wrong)

This is fine if I have like five agents, but the training will slow down almost linearly with the number of agents due to the redundant evaluations, or am I missing something?

Yep, makes sense that it doesn't scale to a very large number of agents, since it's O(n^2) work as noted. It also makes sense that the super-agent approach doesn't work if the number of agents is varying dynamically over time (unless you use padding hacks, etc).

Also here a small visualization of how I thought this might work. Does this make sense?

Yep. So I think one way of doing this is treating this just as an optimization problem. So logically, we want to implement the first figure you have.

However, in many cases we can under the hood change our computation graph to actually execute the second figure. This should be possible since forward / backward passes are batched and if we are willing to peek across the batch dimension to see what computation can be shared.

I'm thinking you can try something like this, suppose we have a batch of data that looks like this (two env steps):

[
    [obs_a1, [obs_a1, obs_b1, obs_c1]],
    [obs_b1, [obs_a1, obs_b1, obs_c1]],
    [obs_c1, [obs_a1, obs_b1, obs_c1]],
    [obs_a2, [obs_a2, obs_b2, obs_c2]],
    [obs_b2, [obs_a2, obs_b2, obs_c2]],
    [obs_c2, [obs_a2, obs_b2, obs_c2]],
]

Naively you'd compute the output as

[
    policy(obs_a1, shared_layer([obs_a1, obs_b1, obs_c1])),
    policy(obs_b1, shared_layer([obs_a1, obs_b1, obs_c1])),
    policy(obs_c1, shared_layer([obs_a1, obs_b1, obs_c1])),
    policy(obs_a2, shared_layer([obs_a2, obs_b2, obs_c2])),
    policy(obs_b2, shared_layer([obs_a2, obs_b2, obs_c2])),
    policy(obs_c2, shared_layer([obs_a2, obs_b2, obs_c2])),
]

But you can instead compute it like this:

stage1_out = [
    shared_layer([obs_a1, obs_b1, obs_c1])),
    shared_layer([obs_a2, obs_b2, obs_c2])),
]

stage2_out = [
[
    policy(obs_a1, stage1_out[0]),
    policy(obs_b1, stage1_out[0]),
    policy(obs_c1, stage1_out[0]),
    policy(obs_a2, stage1_out[1]),
    policy(obs_b2, stage1_out[1]),
    policy(obs_c2, stage1_out[1]),
]

Does this seem workable? Ideally RLlib would make this easy (cc @sven1977 ) but I think it's doable manually for now.

There's also the question of duplication of observations, but that could be solvable with better data representations (also, compression algos are really good at de-duplicating data).

janblumenkamp commented 4 years ago

Thank you very much Eric, this sounds great! I will give it a try! But would the problem really be easier if I would go for a fixed number of agents? Would the superagent approach really be equivalent, i.e. would the policy benefit from the experience of multiple agents and internally output only a single action so that I would have the flexibility to deploy it to any number of agents? To me, it seemed like this wouldn't work because I would have to provide a single value function estimate for each action tuple, so I assumed that a single policy is trained that outputs n actions at once (which would make the training much harder, wouldn't it?).

ericl commented 4 years ago

Yeah I think the super agent approach is almost equivalent... You can implement the same data flow within a single policy if that works for the env.

You are right that there would be a single VF estimate for the baseline though, which would be one difference. That might reduce training efficiency.

janblumenkamp commented 4 years ago

Hmmm, but in order to implement your proposed solution I would have to assume a certain spatial arrangement in the batch and as you said there are no guarantees that the data is aligned like that, right? I think what you proposed is essentially what I suggested here. At the very least I would have to know which rows in the batch are from the same time step of the same environment instance?

ericl commented 4 years ago

You don't need to assume a spatial arrangement, you can inspect the data and de-duplicate based on whether you see the same identical obs.

(you could include extra data like, "timestep id" in the obs, to make this easier).

Might be a bit convoluted in TF (i.e., gather_nd), easier in torch.

janblumenkamp commented 4 years ago

I see. I now tried the super-agent approach again and could not really replicate the performance of my current naive implementation. Probably both the single VF estimate and, more importantly, summing the reward signal for all agents contributes to that (PPO has no way of knowing which individual action resulted in a reward of an individual agent, right? Aand this would probably be even worse for a larger number of agents).

I also tried implementing your proposed solution. I added a random identifier to the observation for each time step so I can identify batch rows of the same time steps in the model. Then I can pick the unique values from those identifiers and compute the shared layer only for them. Afterwards, I can transform this back to the original batch size. This works if I have no recurrent layer. If I add that, I also have to keep track of the time dimension with seq_lens which is doable, but it gets very messy...

This solution feels a bit hacky, to be honest... If observations from the same time step are spread across multiple batches it will not work (and I believe independent states would then again be maintained for the same agents). Is there absolutely no way to implement my second architecture more directly? I'd imagine a model that takes a super-observation and provides n actions and n value function estimates which can then be used to perform n policy updates. Can't this be done by implementing my own RolloutWorker or is that for a different use case?

ericl commented 4 years ago

Hmm, with a custom loss function (see custom algorithms docs) and custom model you could get a super agent to work the same. The only question is how to record the rewards; those could be returned info (accessible to torch policies) or put in obs_tp1 (a hack).

E.g., qmix_policy does this via infos:

    def learn_on_batch(self, samples):
        ...
        group_rewards = self._get_group_rewards(samples[SampleBatch.INFOS])
janblumenkamp commented 4 years ago

That makes a lot of sense, that is exactly what I needed! I implemented it and it yields slightly better results as the naive implementation, but faster and with less required memory!

I took the PPO torch policy as base and implemented my own postprocess_ppo_gae function and PPOLoss class as well as a modified action distribution based on TorchMultiCategorical. Here all steps summarized:

  1. Adapt the environment: The observation space contains the state of all agents. The action space is MultiDiscrete. The reward for each agent is returned in the info dict.
  2. The model takes the super-state and predicts the action and value estimate similar to how it would be done for a single agent. The predicted logits and values are concatenated to two tensors of size (batch, n_agents, n_actions) and (batch, n_agent), respectively. The value tensor is returned like that from the model's value_function and the logit tensor is reshaped to (batch, n_agents*n_actions) according to the MultiDiscrete action space.
  3. postprocess_ppo_gae: Basically identical to the implementation in the PPO policy, except we have to extract the rewards for each agents from info, compute the advantages with compute_advantages for each agent and then concatenate the fields in the original sample batch so that we have a single SampleBatch that stores rewards, vf predictions, advantages etc. for all agents of the same time step that we can then access from the loss in the next step.

    def postprocess_ppo_gae(policy,
                        sample_batch,
                        other_agent_batches=None,
                        episode=None):
    """Adds the policy logits, VF preds, and advantages to the trajectory."""
    samplebatch_infos = SampleBatch.concat_samples([SampleBatch(s) for s in sample_batch[SampleBatch.INFOS]])
    samplebatch_infos_rewards = SampleBatch.concat_samples([SampleBatch({str(k): v for k, v in s.items()}) for s in samplebatch_infos['rewards']])
    
    sample_batch_agent = sample_batch.copy()
    # samplebatches for each agents
    batches = []
    for i in samplebatch_infos_rewards.keys():
        sample_batch_agent[SampleBatch.REWARDS] = samplebatch_infos_rewards[i]
        sample_batch_agent[SampleBatch.ACTIONS] = sample_batch[SampleBatch.ACTIONS][...,int(i)][...,np.newaxis]
        sample_batch_agent[SampleBatch.VF_PREDS] = sample_batch[SampleBatch.VF_PREDS][...,int(i)]
    
        completed = sample_batch_agent["dones"][-1]
        if completed:
            last_r = 0.0
        else:
            next_state = []
            for i in range(policy.num_state_tensors()):
                next_state.append([sample_batch_agent["state_out_{}".format(i)][-1]])
            last_r = policy._value(sample_batch_agent[SampleBatch.NEXT_OBS][-1],
                                   sample_batch_agent[SampleBatch.ACTIONS][-1],
                                   sample_batch_agent[SampleBatch.REWARDS][-1],
                                   *next_state)[int(i)]
        batches.append(compute_advantages(
            sample_batch_agent,
            last_r,
            policy.config["gamma"],
            policy.config["lambda"],
            use_gae=policy.config["use_gae"]))
    
    # Now take original samplebatch and overwrite following elements as a concatenation of these
    for k in [SampleBatch.REWARDS, SampleBatch.VF_PREDS, Postprocessing.ADVANTAGES, Postprocessing.VALUE_TARGETS]:
        sample_batch[k] = np.stack([b[k] for b in batches], axis=-1)
    
    return sample_batch
  4. Custom action distribution: Inherit from TorchMultiCategorical and overwrite logp so that it is not summed for all agents:

    class ExtendedTorchMultiCategorical(TorchMultiCategorical):
    @override(TorchMultiCategorical)
    def __init__(self, inputs, model):
        super().__init__(inputs, model, [5,5,5,5,5]) # Adapt this to your MultiDiscrete action
    
    @override(TorchMultiCategorical)
    def logp(self, actions):
        # If tensor is provided, unstack it into list.
        if isinstance(actions, torch.Tensor):
            actions = torch.unbind(actions, dim=1)
            return torch.stack(
                [cat.log_prob(act) for cat, act in zip(self.cats, actions)], axis=-1)

    Then add it as specified here or better in setup_config with before_init.

  5. PPOLoss/ppo_surrogate_loss: We have to adapt it so that the batches which now have an additional dimension for each agent is flattened, thus the policy is updated by the cumulative individual experience of each agent. This is only required because the model is accessed here again - if this was not the case, we would not have to modify the loss and could just provide a larger SampleBatch containing the sequential experience of all agents in postprocess_ppo_gae.

    class PPOLoss:
    def __init__(self,
                 dist_class,
                 model,
                 value_targets,
                 advantages,
                 actions,
                 prev_logits,
                 prev_actions_logp,
                 vf_preds,
                 curr_action_dist,
                 value_fn,
                 cur_kl_coeff,
                 valid_mask,
                 entropy_coeff=0,
                 clip_param=0.1,
                 vf_clip_param=0.1,
                 vf_loss_coeff=1.0,
                 use_gae=True):
        if valid_mask is not None:
            def reduce_mean_valid(t):
                return torch.mean(t * valid_mask)
        else:
            def reduce_mean_valid(t):
                return torch.mean(t)
    
        prev_dist = dist_class(prev_logits, model)
        # Make loss functions.
        cur_logp = curr_action_dist.logp(actions)
        logp_ratio = torch.exp(
            cur_logp - prev_actions_logp).flatten()
        action_kl = prev_dist.multi_kl(curr_action_dist).flatten()
        self.mean_kl = reduce_mean_valid(action_kl)
        curr_entropy = curr_action_dist.multi_entropy().flatten()
        self.mean_entropy = reduce_mean_valid(curr_entropy)
    
        surrogate_loss = torch.min(
            advantages.flatten() * logp_ratio,
            advantages.flatten() * torch.clamp(logp_ratio, 1 - clip_param,
                                     1 + clip_param))
        self.mean_policy_loss = reduce_mean_valid(-surrogate_loss)
    
        if use_gae:
            vf_loss1 = torch.pow(value_fn.flatten() - value_targets.flatten(), 2.0)
            vf_clipped = vf_preds.flatten() + torch.clamp(value_fn.flatten() - vf_preds.flatten(),
                                                -vf_clip_param, vf_clip_param)
            vf_loss2 = torch.pow(vf_clipped - value_targets.flatten(), 2.0)
            vf_loss = torch.max(vf_loss1, vf_loss2)
            self.mean_vf_loss = reduce_mean_valid(vf_loss)
            loss = reduce_mean_valid(
                -surrogate_loss + cur_kl_coeff * action_kl +
                vf_loss_coeff * vf_loss - entropy_coeff * curr_entropy)
        else:
            self.mean_vf_loss = 0.0
            loss = reduce_mean_valid(-surrogate_loss +
                                     cur_kl_coeff * action_kl -
                                     entropy_coeff * curr_entropy)
        self.loss = loss

    Also adapt ppo_surrogate_loss so that this loss is used. This loss could be integrated into the model, but it seems like the custom loss is not (yet) fetched in PyTorch policies (#8193).

  6. kl_and_loss_stats: adapt vf_explained_var since explained_variance obviously can't handle multidimensional value functions
  7. Instantiate a custom trainer from that adapted policy that can be fed to tune.

Just for reference. Please let me know if you have any tips or if this can be done with fewer modifications. I am looking forward to when something like this is possible in the future more easily :)

ramondalmau commented 4 years ago

hey! that was an excellent solution that helped me a lot. Just wanted to thank you for the effort. I think "learning to communicate" is a hot topic in the RL community, and a lot of new algorithms have been recently proposed. Just to mention a couple of examples: CommNET, DGN, MAAC,... In most of these implementations agents share some information before jointly taking an action (most of the time the information sharing is carried out via attention mechanisms), and a centralised critic takes the observations and actions of all agents. You proposed a workaround for PPO. What about other implementations such as SAC? How difficult would it be to modify RLlib such that it accepts this kind of communication between agents before taking the actions?

janblumenkamp commented 4 years ago

Great to hear that this was helpful! I agree, there is a lot going on in this area and I believe the Ray team is working on making this easier in the future. One thing that was already done is providing a callback to share information between agents. Note that this is not a differentiable communication channel. I don't know what is the current schedule for integrating such a differentiable communication channel into RLLib.

ericl commented 4 years ago

Yeah, there is a plan to allow the callback to support differentiable communications as well, though it is still unclear the best way to approach that.

ramondalmau commented 4 years ago

GOOD to know! I look forward to it. In the meantime, I am using @janblumenkamp's implementation and it works fine. I also tried a different approach with a similar objective in which the model has three heads:

However, I had to modify the sourcode (sampler.py) to get back the message into the environment. In my humble opinion, the easiest way would be to include the "message" in extra action fetches, but these information is not returned to the environment by RLlib by default. Another solution is to consider the message as a second action and use a Tuple action space. But I do not want the policy gradient to depend on the message... Which is the best solution in your opinion?

ericl commented 4 years ago

Maybe add a stop gradient for the output action part that is the message?

On Thu, May 28, 2020, 10:06 PM ramondalmau notifications@github.com wrote:

GOOD to know! I look forward to it. In the meantime, I am using @janblumenkamp https://github.com/janblumenkamp's implementation and it works fine. I also tried a different approach with a similar objective in which the model has three heads:

  • action (discrete space)
  • value
  • message (a vector of real values)

However, I had to modify the sourcode (sampler.py) to get back the message into the environment. In my humble opinion, the easiest way would be to include the "message" in extra action fetches, but these information is not returned to the environment by RLlib by default. Another solution is to consider the message as a second action and use a Tuple action space. But I do not want the policy gradient to depend on the message... Which is the best solution in your opinion?

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/ray-project/ray/issues/7341#issuecomment-635761832, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAADUSWV3YE5REDPFLMLMOLRT47GVANCNFSM4K4Q7YFA .

janblumenkamp commented 4 years ago

I would like to extend my current solution so that I can have different (groups of) agents with different policies that are communicating to each other through a non-differentiable communication channel: multi-env-shared-observation In this image, my previously described approach (the super-agent) would be one of these two policies and then there is a second group of agents training with the same algorithm, but a different set of hyperparameters. The challenge is that the two policies depend on each other, so it is not possible to first evaluate policy A and then policy B, instead for both policies the pre-processing network would have to be evaluated first, then information are shared and then the post-processing network is evaluated with the shared information. As I understand it, I can't use the current observation sharing callback because it is fully non-differentiable. I would probably have to modify my workaround (and similar to @ramondalmau the sampler) so that it supports two different sub-networks. But what would be the best way to actually share the information? Global variables should work since I only need the information from the same time step, right? That seems like a huge hack though.

janblumenkamp commented 4 years ago

Okay, I could solve this with the observation function, but it requires redundant evaluation of the preprocessing NN. It would be nice if batching would be available for the observation function.

ericl commented 4 years ago

Got it. Maybe a simple improvement we could make is to allow batching in the observation function, and then in the policy, expose which agent IDs correspond to each batch item. This at least allows differentiable communication within agents of the same policy.

On Fri, Jun 5, 2020, 11:57 AM Jan Blumenkamp notifications@github.com wrote:

Okay, I could solve this with the observation function, but it requires redundant evaluation of the preprocessing NN. It would be nice if batching would be available for the observation function.

— You are receiving this because you were assigned. Reply to this email directly, view it on GitHub https://github.com/ray-project/ray/issues/7341#issuecomment-639718423, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAADUSU3GNPAMKUJJDFRT43RVE53PANCNFSM4K4Q7YFA .

ramondalmau commented 4 years ago

Wou! I definitely support it! It will trigger a lot of application for "graph" reinforcement learning!

ericl commented 4 years ago

Just want to provide an update on the design of this. Currently the thought is to implement something like this:

In combination with (non-differentiable) observation functions, the two changes above make it possible to support arbitrary differentiable communications between agents controlled by the same policy, and non-differentiable communications across agents controlled by different policies. You'll also still be able to share layers between policies as before (https://github.com/ray-project/ray/blob/master/rllib/examples/models/shared_weights_model.py).

ramondalmau commented 4 years ago

Great! I really like the proposed solution @ericl, and in my humble opinion is the way to go. This will allow to use many of the already existing algorithms in RLlib with minor changes. Looking forward to use this :)

Just a minor question: let us imagine a scenario in which a team of agents wants to achieve a cooperative goal, thus maximising a shared reward function (e.g., the sum or mean of individual rewards). All of them may share the same policy (homogeneous agents), but they communicate through a differentiable protocol before jointly taking the action. Yet, NOT ALL agents are "active" during the entire episode. That is, some agents can start contributing to the common goal in the middle of the episode, and some others can stop contributing before the end. Nevertheless, those agents entering late / exiting early still want to achieve the most long term shared reward. I am wondering if this new implementation will help to deal with such problem :O

ericl commented 4 years ago

@ramondalmau here's how I would tackle that one:

shared reward function

Here, the env can produce a global reward and give it to each agent.

All of them may share the same policy (homogeneous agents), but they communicate through a differentiable protocol before jointly taking the action.

This part should be doable once we add the agent id array and allow lockstep replay.

Nevertheless, those agents entering late / exiting early still want to achieve the most long term shared reward.

IIUC this is the key difficulty, that you want to give the final shared reward to exited agents. One way of handling this is to not send done=True for exited agents until the env itself finishes. Then, you can have the env send done to all agents, discounting it appropriately depending on the time delay between agent exit and env exit.

ramondalmau commented 4 years ago

Hey @ericl sorry for the late reply :)

I am approaching the problem a little bit different:

I only see a problem: imagine an hypothetical case with a batch size of 200, and with 10 agents. Therefore, each batch actually contains therefore 200 time steps * 10 agents = 2000 samples. However, some of these samples may be masked and not considered to update the policy. This means that, actually, the size of the batch is not fixed to 2000, but it will depend on how many agents were active in those 200 steps :)

I do not know if it was clear... but my conclusion is that having agents that exit before and a batch size that refers to time steps and not actual valid samples, may lead to dangerous dynamic batch sizes.

ericl commented 4 years ago

I would recommend an optimization to not need the masking strategy by calculating the discounted final reward in the env. Then, you only need to have one "final" observation for each agent, instead of a long sequence of noops. That should avoid the problem you mentioned but still apply proper temporal discounting.

On Thu, Jun 11, 2020, 7:41 AM ramondalmau notifications@github.com wrote:

Hey @ericl https://github.com/ericl sorry for the late reply :)

I am approaching the problem a little bit different:

-

Each agent a receives a team reward r at every time step t (which is the sum of individual rewards), even if the agent already exited. The reason is that what an agent did before exiting may have an effect on the team return, which is what I want to maximise! If I only consider the rewards that the agent received while it was active, I do not really achieve a cooperative behavior.

When an agent exits the environment, I do not consider its actions anymore in the 'step' of the environment (i completely ignore it because it already finished), and I fill its observation with 0s (ensuring that observing all 0s is never possible if the agent is active, of course)

After generating a batch of experiences from the environment, the return (or advantage in my case) of each agent (which is identical to all of them because they share the reward) is computed taking into account all the rewards, from time 0 to the end of the batch, even if the agent exited before.

However, in the loss function I mask the samples for which an agent was not active (which I know because the observation was all 0s), meaning that these samples do not contribute to update the policy.

I only see a problem: imagine an hypothetical case with a batch size of 200, and with 10 agents. Each element in the batch contains therefore 200 time steps * 10 agents = 2000 samples. However, some of these samples may be masked and not considered to update the policy. This means that, actually, the size of the batch is not fixed to 2000, but it will depend on how many agents were active in those 200 steps :)

I do not know if it was clear... but my conclusion is that having agents that exit before and a batch size that refers to time steps and not actual valid samples, may lead to dangerous dynamic batch sizes.

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/ray-project/ray/issues/7341#issuecomment-642702939, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAADUSSJXLSVBXDFZDBQOU3RWDUJPANCNFSM4K4Q7YFA .

ramondalmau commented 4 years ago

Yes. This is definitely a good proposal. Many thanks @ericl Unfortunately, at present I was not able do apply a similar strategy because I was using the with_agents_groups wrapper. Therefore, each sample in the batch was composed by the observations of all agents, similar to the solution of https://github.com/ray-project/ray/issues/7341#issuecomment-621285967

I had a look at your recent replay implementation, and it will help me to avoid the with_agents_groups wrappers and solve the above-mentioned issue by computing the discounted reward in the environment.

janblumenkamp commented 4 years ago

Thanks for all the efforts so far Eric, very helpful! Those changes look really good! I was wondering what exactly stands in the way of having a differentiable communication channel between different policies now. Is it only the observation function or is there something else?

ericl commented 4 years ago

Hmm, I think the main barrier would be that we would have to re-execute the observation function at training time. Currently, it's run only during rollout and we save the observed output as numpy arrays. This would require some tricky plumbing changes in RLlib. I'm also not sure how batching would work.

janblumenkamp commented 4 years ago

What if we would let the information sharing completely be the model's (and therefore the user's) job? Additional to the agent grouping, we have an independent model grouping configuration. Different (groups of) agents may have different policies but share the same model. It's then up to the user to have different policy-networks for different agents in the model if that is desired. For each model grouping, there is a separate optimizer. Depending on the model grouping, the policy losses are combined, and the models are optimized accordingly. Essentially this would mean that a policy is separated from the model, so I am not sure if that is really the best solution, but it would allow vast flexibility in multi-agent settings and make differentiable communication channels very easy to implement directly (and only) inside the model. Or am I missing something?

ramondalmau commented 3 years ago

Dear MARL friends

I hope you are doing well Just a small question: is it possible to use lockstep mode with multi-agent PPO? Kind regards

Ramon

ThomasLecat commented 3 years ago

Hi all,

I have been working on MARL with differentiable communication between agents and I just stumbled on this ticket. We came up with exactly the same solution that consists in implementing a "super agent" (that we call "central agent": CentralPPO, CentralDQN), with all the tricks described so well by @janblumenkamp in his message:

That makes a lot of sense, that is exactly what I needed! I implemented it and it yields slightly better results as the naive implementation, but faster and with less required memory!

...

Just for reference. Please let me know if you have any tips or if this can be done with fewer modifications. I am looking forward to when something like this is possible in the future more easily :)

It works, with a few difficulties at evaluation time like #10228.

I've seen that the replay modes (independent/lockstep) are now integrated to Ray 0.8.7 but was wondering where we stand regarding the second point raised by @ericl:

The replay mode option by itself isn't enough. In a custom model, you still need to be able to identify all the co-executing agents at a timestep. This can be done if we also pass in ("episode_id, "agent_id", and "timestep") arrays to model.forward() that identify the origin of each batch element. @sven1977 is exploring automatically making these available in the model input_dict.

Generally, how far are we from enabling differentiable communication with built-in agents? @janblumenkamp, are you were still using the "super agent" approach?

Many thanks!

janblumenkamp commented 3 years ago

Hi Thomas, last time I talked to Sven he told me that according to the current schedule, the trajectory view API for multi-agent use cases will be tackled after the Ray summit in October. We stuck to the super-agent approach and it worked quite well for us, but it involves a few not so nice hacks and workarounds, so I am definitely still looking forward to the multi-agent trajectory API. If you are interested, this is our paper which is the result of this ticket.

ThomasLecat commented 3 years ago

Hi Jan, thanks for the quick reply and congrats on the paper, I will definitely read it and see how it relates to the other approaches like Graph Convolutional RL, STMARL, MARL for networked system control, Intention propagation, etc. that all revolve around the same idea of differentiable communication channels.

Thanks for the info, we are still using a super-agent as well but I would love to get rid of the hacks and workarounds, as the problem we are trying to solve is very complex (large scale, long horizon, highly cooperative, continuous, dynamic neighbourhoods, agents appearing and disappearing during the episode, potentially formulated with heterogeneous agents, ...) and the hacks to accommodate for all of this keep piling up.

Good luck with you research!

OnTheRicky commented 3 years ago

What is currently the best way to go about sharing states or any other information between agents with a dynamic number of agents?

I define a maximum number of active agents in the environment, but during an episode agents can finish and some time later a new agent will start. So the total number of agents continue increasing, however there will never be more activate agents than allowed.

OnTheRicky commented 3 years ago

I've built a custom centralised critic RNN model that receives two inputs, one to predict next action that only contains agent specific observations and one to compute value that contains all observations from all agents.

Still using rllib version 0.8.4

class CentralisedCriticModel(RecurrentTFModelV2):
    """Multi-agent model that implements a centralised value function."""

    def __init__(self,
                 obs_space,
                 action_space,
                 num_outputs,
                 model_config,
                 name,
                 hidden_size=256,
                 cell_size=64):
        super(CentralisedCriticModel, self).__init__(obs_space,
                                                     action_space,
                                                     num_outputs,
                                                     model_config,
                                                     name)
        self.cell_size = cell_size

        # Define input layers
        action_input_layer = tf.keras.layers.Input(
            shape=(None,18), name="action_inputs")
        value_input_layer = tf.keras.layers.Input(
            shape=(None,obs_space.shape[0]), name="value_inputs")
        state_in_h = tf.keras.layers.Input(shape=(cell_size, ), name="h")
        state_in_c = tf.keras.layers.Input(shape=(cell_size, ), name="c")
        seq_in = tf.keras.layers.Input(shape=(), name="seq_in", dtype=tf.int32)

        # Preprocess observation with a hidden layer and send to LSTM cell
        dense_action = tf.keras.layers.Dense(
            hidden_size, activation=tf.nn.relu, name="dense_action")(action_input_layer)
        dense_value = tf.keras.layers.Dense(
            hidden_size, activation=tf.nn.relu, name="dense_value")(value_input_layer)

        action_lstm_out, state_h, state_c = tf.keras.layers.LSTM(
            cell_size, return_sequences=True, return_state=True, name="action_lstm")(
                inputs=dense_action,
                mask=tf.sequence_mask(seq_in),
                initial_state=[state_in_h, state_in_c])

        value_lstm_out, state_h, state_c = tf.keras.layers.LSTM(
            cell_size, return_sequences=True, return_state=True, name="value_lstm")(
                inputs=dense_value,
                mask=tf.sequence_mask(seq_in),
                initial_state=[state_in_h, state_in_c])

        # Postprocess LSTM output with another hidden layer and compute values
        action_logits = tf.keras.layers.Dense(
            self.num_outputs,
            activation=tf.keras.activations.linear,
            name="action_logits")(action_lstm_out)

        values = tf.keras.layers.Dense(
            1, activation=None, name="values")(value_lstm_out)

        # Create the RNN model
        self.rnn_model = tf.keras.Model(
            inputs=[action_input_layer, value_input_layer, seq_in, state_in_h, state_in_c],
            outputs=[action_logits, values, state_h, state_c])
        self.register_variables(self.rnn_model.variables)
        self.rnn_model.summary()

I also modified forward() and forward_rnn()

    def forward(self, input_dict, state, seq_lens):
            """Adds time dimension to batch before sending inputs to forward_rnn().
            You should implement forward_rnn() in your subclass."""
            assert seq_lens is not None

            action_padded_inputs = tf.concat([input_dict["obs"]["capacity"],input_dict["obs"]["sensors"]],axis=1)#flatten(action_obs,framework="tf")
            value_padded_inputs = input_dict["obs_flat"]
            output, new_state = self.forward_rnn(
                add_time_dimension(action_padded_inputs,
                                   seq_lens=seq_lens),
                add_time_dimension(value_padded_inputs,
                                   seq_lens=seq_lens),
                state,
                seq_lens)
            return tf.reshape(output, [-1, self.num_outputs]), new_state

    @override(RecurrentTFModelV2)
    def forward_rnn(self, action_input_dict, value_input_dict, state, seq_lens):
        model_out, self._value_out, h, c = self.rnn_model([action_input_dict,
                                                           value_input_dict,
                                                           seq_lens] + state)
        return model_out, [h, c]

My complete observation space looks as follows:

self.observation_space = spaces.Dict({"sensors":spaces.Box(low=-0.1, high=1.1, shape=(len(self.sensors),),dtype=np.float64),
                                      "capacity":spaces.Box(low=-(n_agents*failure_t)+capacity,high=capacity,shape=(1,),dtype=np.float64),
                                      "other_sensors":spaces.Box(low=-0.1, high=1.1, shape=(3,len(self.sensors),),dtype=np.float64)})

In my environment at the end of the step() I then fill up other_sensors for each agent, using the observations of all other agents, or fill with zeros if there are missing agents.

It's training extremely slow and I can't tell if it's because I've essentially created two LSTM models?

I'm also not sure if I should be populating other_sensors in my observations at the end of step() or if I should be using the observation_fn API?

janblumenkamp commented 3 years ago

This issue can hopefully be closed once #10884 is done :)

wullli commented 3 years ago

Do you guys have a rough idea when #10884 will be finished? I am currently using @janblumenkamp 's awesome workarounds. However, I don't want to build things twice if I can help it. Thanks!