tensorflow / agents

TF-Agents: A reliable, scalable and easy to use TensorFlow library for Contextual Bandits and Reinforcement Learning.
Apache License 2.0
2.77k stars 714 forks source link

Example observation_and_action_constraint_splitter #255

Open marconardelli opened 4 years ago

marconardelli commented 4 years ago

Hello, I tried to understand how to use an action constraint on my Environment. I have an environment with 100 lines, and the agent can choose any line, so I have 100 actions When a line has been choosen i change a value in the 'processed' column of the environment from 0 to 1. How can I set the observation_and_action_constraint_splitter so that the only valid actions are the ones connected to a value of 0 in the processed column?

bartmaciszewski commented 4 years ago

Hi assuming you have an action space such as gym.spaces.Discrete(100) and the processed column is part of your observation then you just need to define an observation_and_action_constraint_splitter function that cycles through the lines in the observation and builds a tensor that has a 1 for each allowed action and 0 for not allowed. Something like this:

def observation_and_action_constrain_splitter(observation):
     action_mask = [0 for all lines]
     for each line in observation:
          if line.processed == 0:
               action_mask[line_number] = 1 #valid action
     return observation, tf.convert_to_tensor(action_mask, tf.dtype=np.int32)

Just pass a reference to this function to your agent at creation time and it should work.

If you want to share your code can take a look. Hope this helps!

B.

marconardelli commented 4 years ago

Hi bmac, thank you very much, I'm gonna try this today and let you know if it works. appreciated :)

bartmaciszewski commented 4 years ago

Hi Marco, I am working on a similar agent but I am using gym.spaces.Discrete(num_actions) and relying on the TFPyEnvironment wrapper to configure my action_spec.

I checked my resulting action_spec and it translates to a scalar (not a rank 1 tensor): BoundedTensorSpec(shape=(), dtype=tf.int64, name='action', minimum=array(0, dtype=int64), maximum=array(196, dtype=int64))

I have some other issues but the mask and selection of the right action seems to work:

Should you be using TensorSpecs? Maybe try the following?
self._action_spec = array_spec.BoundedArraySpec(shape=(),minimum=0,maximum=self.ord_rows-1)

Also what Agent are you using?

marconardelli commented 4 years ago

I'm using a DQN, running eagerly this function seems to work by its own:

tf.config.experimental_run_functions_eagerly(True)

# constraint for action
@tf.function
def observation_and_action_constrain_splitter(observation):
    mask = np.zeros(tf.shape(observation)[1]).astype(dtype=np.int32)
    line_number = 0
    for line in observation[0]:
        if line[3] == 0.0:
              mask[line_number] = 1 #valid action
        line_number += 1
    return observation, tf.convert_to_tensor(mask, dtype=np.int32)

the problem is that when I run the code i get the following error:

InvalidArgumentError: 'then' and 'else' must have the same size. but received: [1,1] vs. [1] [Op:Select]

I'm trying to understand what the problem could be, I think it should be probably connected with the return values of the constraint function.

bartmaciszewski commented 4 years ago

Do you know where in TF code this error is coming from?

I encountered a similar error with a DQN agent and a scalar action space and using an observation action splitter function. I managed to trace it back to the random policy part used during DQN training (seems it was returning a scalar vs a tensor from the greedy part) and hacked around it.

Not sure if I’m doing something wrong or it’s a bug.

See my issue post: https://github.com/tensorflow/agents/issues/253#issue-524770681

marconardelli commented 4 years ago

I'm getting the error on the epsilon_greedy_policy file, line 102

action = tf.compat.v1.where(cond, greedy_action.action, random_action.action)

same line...

marconardelli commented 4 years ago

I made the same modification to the random_tf_policy and it works fine now :)

bartmaciszewski commented 4 years ago

Nice!

sguada commented 4 years ago

I would recommend to keep all the mask logic in the Python part of the Environment, instead of trying to use tf.function.

So in the PyEnv step(action) function generate the (observation, mask)

If you really have to do it in the observation_and_action_constrain_splitter then you need to use only tf.ops use Tensor operations.

def observation_and_action_constrain_splitter(observation):
  mask = tf.equal(observation[0][3], 0.0)
  return observation, mask
marconardelli commented 4 years ago

Hi Sergio, thank you for your reply, but if I generate it on the PyEnv, how can I pass it to the network? is there any example for this?

sguada commented 4 years ago

You can create a dict or a namedtuple for the observations, those would be passed to the Network, if you only want to pass the true observation to the Network but want to use the mask in the Policy then you can use the observation_and_action_constrain_splitter to separate them.

bartmaciszewski commented 4 years ago

Hi Sergio, how would specify the observation spec to pass to the qnetwork at creation time so that it is compatible with the true observation spilt out by the observation_and_action_constraint_splitter?

q_net = q_network.QNetwork(
    train_env.observation_spec(),
    train_env.action_spec(),
    fc_layer_params=fc_layer_params)

Also is the reason for pushing the logic to define the mask to the environment rather than graph execution for performance reasons or really just to keep the environment abstracted from agent?

Thanks!

bmac

marconardelli commented 4 years ago

Hi bmac, I think the solution for this is to create a custom Network that can receive the dict as a parameter, I'm working on this too...I'll give you an update when I will find the solution...

bartmaciszewski commented 4 years ago

Hi Marco, I was even thinking it maybe possible to just pass the entire observation together with the action mask to the network. In the worst case the network would just ignore the mask and may even end up using the information to drive the output probabilities.

In the splitter function you could just pick up the action mask from the observation but still pass the entire observation + mask to the network. Just a thought... haven’t tried yet!!!

marconardelli commented 4 years ago

Hi bmac, I will try to describe all steps you have to make to get things working.

First of all you have to change your environment:

Observation spec must be changed so that it includes the definition of the mask:

def get_osbervation_spec(self):
     """:returns An 'arrayspec' or a nested dict, list or tuple"""
     state_spec = array_spec.ArraySpec(shape=(self._tot_rows, 4), dtype=np.float32, name='state')
     mask_spec = array_spec.ArraySpec(shape=(self.ord_rows, ), dtype=np.float32, name='mask')

Then inside your environment create a function that returns the action mask, for me it was:

def get_masked_legal_actions(self, observation):
     """return a boolean mask indicating valid actions"""
     mask = np.zeros(np.shape(observation)[0]).astype(dtype=np.float32)
     line_number = 0
     for line in observation:
         if line[3] == 0.0:
             mask[line_number] = 1
         line_number += 1
     return mask

You have to change all the functions that creates the initial observation or give an updated observation so that they return a dict according to your observation specs:

for me this process was changing two functions:

get_starting_observation()

get_updated_observation()

just change the return value calling the get_masked_legal_actions functions and create a dict for me it was:

obs = requests.astype(np.float32)
mask = self.get_masked_legal_actions(obs)
obs = {'state': obs, 'mask': mask}
return obs

Now your environment is ok to go. Next thing you have to do is to create a MaskedNetwork that will be able to handle the dict instead of the observation only:

class MaskedQNetwork(network.Network):
  def __init__(self,
               input_tensor_spec,
               action_spec,
               mask_q_value=-100000,
               fc_layer_params=(75, 40),
               activation_fn=tf.keras.activations.relu,
               name='MaskedQNetwork'):

      super(MaskedQNetwork, self).__init__(input_tensor_spec, action_spec, name=name)

      self._q_net = q_network.QNetwork(input_tensor_spec['state'], action_spec, fc_layer_params=fc_layer_params,
                                       activation_fn=activation_fn)

      self._mask_q_value = mask_q_value

  def call(self, observations, step_type, network_state=()):
    state = observations['state']
    mask = observations['mask']

    q_values, _ = self._q_net(state, step_type)

    small_constant = tf.constant(self._mask_q_value, dtype=q_values.dtype, shape=q_values.shape)
    zeros = tf.zeros(shape=mask.shape, dtype=mask.dtype)
    masked_q_values = tf.where(tf.math.equal(zeros, mask),
                               small_constant, q_values)

    return masked_q_values, network_state

Next step is to change your policies to support rejection sampling, for the initial collect steps you have to create a MaskedRandomPolicy, you have to change the _action function like this:

  def _action(self, time_step, policy_state, seed):
    if time_step.observation['mask'] is not None:

      mask = time_step.observation['mask']

      zero_logits = tf.cast(tf.zeros_like(mask), tf.float32)
      masked_categorical = masked.MaskedCategorical(zero_logits, mask)
      action_ = tf.cast(masked_categorical.sample() + self.action_spec.minimum,
                        self.action_spec.dtype)

      # If the action spec says each action should be shaped (1,), add another
      # dimension so the final shape is (B, 1) rather than (B,).
      if self.action_spec.shape.rank == 1:
        action_ = tf.expand_dims(action_, axis=-1)
    else:
      outer_dims = nest_utils.get_outer_shape(time_step, self._time_step_spec)

      action_ = tensor_spec.sample_spec_nest(
          self._action_spec, seed=seed, outer_dims=outer_dims)

    if time_step is not None:
      with tf.control_dependencies(tf.nest.flatten(time_step)):
        action_ = tf.nest.map_structure(tf.identity, action_)

    policy_info = tensor_spec.sample_spec_nest(self._info_spec)

    if self.emit_log_probability:
      if time_step.observation['mask'] is not None:
        log_probability = masked_categorical.log_prob(
            action_ - self.action_spec.minimum)
      else:
        action_probability = tf.nest.map_structure(_uniform_probability,
                                                   self._action_spec)
        log_probability = tf.nest.map_structure(tf.math.log, action_probability)
      policy_info = policy_step.set_log_probability(
          policy_info, log_probability)

    step = policy_step.PolicyStep(action_, policy_state, policy_info)
    return step

You have to do the same thing for the EpsilonGreedyPolicy (or any other policy you are using)

I don't know if I made it correctly or there was a simpler way to do that, this seems to work, I still have to check if it is able to arrive to the correct solution tho, I'll give you an update, in the meantime if @sguada have any comment if this is fine or I'm doing something wrong would be great, thanks :)

mKay00 commented 4 years ago

+1 I have the same issue that I don't know how observation_and_action_constrain_splitter works and can't get it working. I if I keep the mask logic in the Python part I don't know how to pass it to the network. I am fairly new to RL and tf-agents, so I don't really understand the workarounds above. Do I have to clone the repo, change the code in the class, rebuild it and load the rebuilt tf-agents module? This seems fairly complicated and since I already have problems with the shapes and dimensions of everything, it seems like an impossible task to me. :(

I am currently trying RL as a possible solution to a hard routing problem for my masters thesis and I would really appreciate an easy to use solution to mask valid and invalid actions.

bartmaciszewski commented 4 years ago

Hi assuming you got your action space and mask splitter function I think you can try what Marco described or just do as I described here and pass the entire observation plus the mask to the network (eg pretend that the mask is part of the observation). After all it’s just some more information that the agent could potentially use. Disclaimer I haven’t tried this yet but should work in theory and should be simpler.

mKay00 commented 4 years ago

Phew, I finally got it to work. First I tried it with a tuple of (observation, mask), like recommended here, which resulted in lots of different errors, which had to do with the problem that the tuple is a sequence, thus I need a preprocessing_combiner for my QNetwork and I tried tf.keras.layer.Add, as recommended by the error message. But then the model build failed with an error about the observation tuple being a sequence and the other input not. I tried for hours to solve this, but I couldn't get the build to work.

So, afterwards I tried your idea, which, after some tinkering, finally worked. Although, I had to use your workaround from #253 as well.

So, as a summary, it finally worked and I have now in my observation the mask included and use the observation_and_action_constraint_splitter with this function:

def observation_and_action_constraint_splitter(observation):
    mask = tf.reshape(observation[0][-1:], (tf.math.multiply(Y,X),))
    return observation, mask

I am really happy that it finally worked and I hope, that in the future there will be better documentation on how to achieve an action mask. Thank your for the tip and the workaround.

The only problem left is that even with your workaround I can't use a ParallelPyEnvironment with more than 1 worker. If I do, I get an error in the same line as the workaround:

tensorflow.python.framework.errors_impl.InvalidArgumentError: 'then' and 'else' must have the same size. but received: [8,1] vs. [1,1] [Op:Select]

Bahador-Bakhshi commented 3 years ago

Inspired by these explanations, the following implementation works fine for me.

1) Define the _observation_spec as a dictionary that contains the actual observation and also the mask of the valid actions. Somethings like this

self._observation_spec = {
                                'observations': tf_agents.specs.BoundedArraySpec(
                                                                   name = "observation", 
                                                                   shape = (…),  
                                                                   dtype = …  ), 
                                'valid_actions': array_spec.ArraySpec(
                                                                   name  = "valid_actions",
                                                                   shape = (THE_NUMBER_OF_ACTIONS, ), 
                                                                    dtype = np.bool_
                                            )
                                }

2) Everywhere in the environment (i.e., _reset, _start, _step) that an observation is returned to the agent, find the valid actions mask according to the observation and return the corresponding dictionary, somethings like this

obs['observation'] = get_the_env_observations(…)
obs['valid_actions'] = get_valid_actions_masks(…) # True for valid and False for invalid actions
tf_agents.trajectories.time_step.transition(obs, reward,…)

3) Create a function to extract the mask and the actual observations, something like this

def observation_action_ splitter(obs):
    return obs['observations'], obs['valid_actions']

4) Pass the function to the agent init, something like this

    agent = dqn_agent.DqnAgent(
                …
                q_network=…,
                optimizer=…,
                observation_and_action_constraint_splitter = observation_action_splitter
            )

5) As far as I checked, the splitter function is also automatically passed the policies of the agent. However, if another policy is also used besides the agent policies (e.g., a random policy to initially fill the replay buffer), the splitter function should also be passed to the init of the policy, something like this

   random_policy = random_tf_policy.RandomTFPolicy(
                                     …
                                    observation_and_action_constraint_splitter = observation_action_splitter
                                )

6) Check the implementation in the _step(), something like this

def _step(self, action):
      masks = get_valid_actions_masks(…)
      if not masks[actions]:
         print("Error: Invalid action is taken”)
         sys.exit(-1)
masterkey2000 commented 2 years ago

Hello, I followed the instructions from @Bahador-Bakhshi but I get an error when running

agent = dqn_agent.DqnAgent(
    train_env.time_step_spec(),
    train_env.action_spec(),
    q_network=q_net,
    optimizer=optimizer,
    observation_and_action_constraint_splitter=environment.observation_action_splitter,
    td_errors_loss_fn=common.element_wise_squared_loss,
    #train_step_counter=train_step_counter)
    train_step_counter=global_step)

The Error is:

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
C:\Users\MASTER~1\AppData\Local\Temp/ipykernel_6148/3855110115.py in <module>
      7 
      8 #create agent
----> 9 agent = dqn_agent.DqnAgent(
     10     train_env.time_step_spec(),
     11     train_env.action_spec(),

~\Anaconda3\envs\dqn\lib\site-packages\gin\config.py in gin_wrapper(*args, **kwargs)
   1603       scope_info = " in scope '{}'".format(scope_str) if scope_str else ''
   1604       err_str = err_str.format(name, fn_or_cls, scope_info)
-> 1605       utils.augment_exception_message_and_reraise(e, err_str)
   1606 
   1607   return gin_wrapper

~\Anaconda3\envs\dqn\lib\site-packages\gin\utils.py in augment_exception_message_and_reraise(exception, message)
     39   proxy = ExceptionProxy()
     40   ExceptionProxy.__qualname__ = type(exception).__qualname__
---> 41   raise proxy.with_traceback(exception.__traceback__) from None
     42 
     43 

~\Anaconda3\envs\dqn\lib\site-packages\gin\config.py in gin_wrapper(*args, **kwargs)
   1580 
   1581     try:
-> 1582       return fn(*new_args, **new_kwargs)
   1583     except Exception as e:  # pylint: disable=broad-except
   1584       err_str = ''

~\Anaconda3\envs\dqn\lib\site-packages\tf_agents\agents\dqn\dqn_agent.py in __init__(self, time_step_spec, action_spec, q_network, optimizer, observation_and_action_constraint_splitter, epsilon_greedy, n_step_update, boltzmann_temperature, emit_log_probability, target_q_network, target_update_tau, target_update_period, td_errors_loss_fn, gamma, reward_scale_factor, gradient_clipping, debug_summaries, summarize_grads_and_vars, train_step_counter, name)
    234       net_observation_spec, _ = observation_and_action_constraint_splitter(
    235           net_observation_spec)
--> 236     q_network.create_variables(net_observation_spec)
    237     if target_q_network:
    238       target_q_network.create_variables(net_observation_spec)

~\Anaconda3\envs\dqn\lib\site-packages\tf_agents\networks\network.py in create_variables(self, input_tensor_spec, **kwargs)
    216           "Network did not define one.")
    217 
--> 218     random_input = tensor_spec.sample_spec_nest(
    219         input_tensor_spec, outer_dims=(1,))
    220     initial_state = self.get_initial_state(batch_size=1)

~\Anaconda3\envs\dqn\lib\site-packages\tf_agents\specs\tensor_spec.py in sample_spec_nest(structure, seed, outer_dims, minimum, maximum)
    398       raise TypeError("Spec type not supported: '{}'".format(spec))
    399 
--> 400   return tf.nest.map_structure(sample_fn, structure)
    401 
    402 

~\AppData\Roaming\Python\Python38\site-packages\tensorflow\python\util\nest.py in map_structure(func, *structure, **kwargs)
    867 
    868   return pack_sequence_as(
--> 869       structure[0], [func(*x) for x in entries],
    870       expand_composites=expand_composites)
    871 

~\AppData\Roaming\Python\Python38\site-packages\tensorflow\python\util\nest.py in <listcomp>(.0)
    867 
    868   return pack_sequence_as(
--> 869       structure[0], [func(*x) for x in entries],
    870       expand_composites=expand_composites)
    871 

~\Anaconda3\envs\dqn\lib\site-packages\tf_agents\specs\tensor_spec.py in sample_fn(spec)
    376                 sample_spec, outer_dims=outer_dims, seed=seed_stream()))
    377       else:
--> 378         bounded_spec = BoundedTensorSpec.from_spec(spec)
    379 
    380         spec_max = bounded_spec.maximum

~\AppData\Roaming\Python\Python38\site-packages\tensorflow\python\framework\tensor_spec.py in from_spec(cls, spec)
    320     """
    321     dtype = dtypes.as_dtype(spec.dtype)
--> 322     minimum = getattr(spec, "minimum", dtype.min)
    323     maximum = getattr(spec, "maximum", dtype.max)
    324     return BoundedTensorSpec(spec.shape, dtype, minimum, maximum, spec.name)

~\AppData\Roaming\Python\Python38\site-packages\tensorflow\python\framework\dtypes.py in min(self)
     99     if (self.is_quantized or
    100         self.base_dtype in (bool, string, complex64, complex128)):
--> 101       raise TypeError(f"Cannot find minimum value of {self} with "
    102                       f"{'quantized type' if self.is_quantized else 'type'} "
    103                       f"{self.base_dtype}.")

TypeError: Cannot find minimum value of <dtype: 'bool'> with type <dtype: 'bool'>.
  In call to configurable 'DqnAgent' (<class 'tf_agents.agents.dqn.dqn_agent.DqnAgent'>)

This are my _action_spec and _observation_spec:

self._action_spec = array_spec.BoundedArraySpec(
            shape=(), dtype=np.int32, minimum=0, maximum=2, name='action')

self._observation_spec = {
            'price':array_spec.BoundedArraySpec(shape=(20,5), dtype=np.float32, minimum=0, name='obs_price'),
            'legal_actions': array_spec.ArraySpec(shape=(3,), dtype=np.bool_, name = 'legal_actions')  #3 legal_actions

The observation dictionary i fill with:

 legal_actions = [True, True, True]

 obs = {}
 obs['price'] = self.__get_observation_data(_idx).astype(np.float32)
 obs['legal_actions'] = np.array(legal_actions)

Does anyone know what the error message means and where It come from?

sguada commented 2 years ago

The 'legal_actions' shouldn't be part of the observation_spec since it's going to be separated with the observation_action_splitter

masterkey2000 commented 2 years ago

@sguada: What do you mean? I did it exactly as @Bahador-Bakhshi suggested, or did I miss something?

sguada commented 2 years ago

So make sure that the observation_action_splitter splits the observation and the mask correctly.

def observation_action_splitter(obs):
    return obs['price'], obs['legal_actions']