ray-project / ray

Ray is a unified framework for scaling AI and Python applications. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
32.93k stars 5.57k forks source link

[RLlib] Gymnasium Graph Space has no Preprocessor #39280

Open dkupsh opened 1 year ago

dkupsh commented 1 year ago

What happened + What you expected to happen

There is no default preprocessor space for graph Spaces, that are implemented in gymnasium. This causes an error in the Preprocessor initialization:

 @PublicAPI
    def __init__(self, obs_space: gym.Space, options: dict = None):
        _legacy_patch_shapes(obs_space)

        self._obs_space = obs_space
        if not options:
            from ray.rllib.models.catalog import MODEL_DEFAULTS

            self._options = MODEL_DEFAULTS.copy()
        else:
            self._options = options
        self.shape = self._init_shape(obs_space, self._options)
        self._size = int(np.product(self.shape))
        self._i = 0
        self._obs_for_type_matching = self._obs_space.sample() 

where the shape returns None (as its using the default preprocessor), but there is a requirement of a integer for the product.

There should be a graph preprocessor, similar to DictFlatteningPreprocessor, for graph spaces.

Versions / Dependencies

ray, version 2.6.3

Reproduction script

import gymnasium as gym

class TestEnv(gym.Env):
     @property
     def observation_space(self) -> Space:
           return gym.spaces.Dict({
                'graph': Graph(
                         node_space=Box(low=-1, high=np.inf, shape=(1,), dtype=np.int32),
                         edge_space=Discrete(1),
                )
          })

...

And then testing this environment.

Issue Severity

High: It blocks me from completing my task.

sven1977 commented 12 months ago

Hey @dkupsh , thanks for filing this issue. Could you try one thing and disable the preprocessor API via your config?

config.experimental(_disable_preprocessor_api=True)

if you are still working with config dicts:

config["_disable_preprocessor_api"] = True
dkupsh commented 12 months ago

Hi @sven1977,

So, I tried disabling the preprocessor API, but still didn't work.

As I have been investigating further, it seems RLLIB doesn't support the graph space. Specifically, there seems to be four problems that I have found (1) the space_utils get_dummy_batch_for_space function, (2) the serialization function, (3) concatenating sample batches with different node sizes, and (4) compressing the batch space doesn't work.

I created a fix for (1) and (2), and can work on making a pull request for those. But, I'm still unsure of how to tackle (3) and (4).

I'm unsure if this should be classified as a feature request or a bug report now. I think generally supporting this graph space will be extremely helpful for those (like me) using GNNs and not wanting to create an assumption within their environments a max graph nodes element.

sven1977 commented 12 months ago

Makes sense! Thanks for your response and the additional information. For completeness and to make this easier for us to debug, could you complete your reproduction script? Maybe with a very simple model that should be able to handle the graph input.

dkupsh commented 12 months ago

So, I haven't tested it (I just heavily simplified my use-case), but here's a full reproduction script:

Firstly the environment:

import gymnasium as gym
from gymnasium.spaces import Space, Box, Discrete, Graph, GraphInstance
import numpy as np

class TestEnv(gym.Env):    
    @property
    def observation_space(self) -> Space:
        # Graph Space
        return gym.spaces.Dict({
            'graph': Graph(
                        node_space=Box(low=-1, high=np.inf, shape=(5,), dtype=np.int32),
                        edge_space=Discrete(1),
            )
        })

    @property
    def action_space(self) -> Space:
        # Discrete Action Space
        return Discrete(5)

    def reset(self, *, seed=None, option=None):
        # Sample will return Dict with GraphInstance for Space
        return self.observation_space.sample(), {}

    def step(self, action):
        # DUMMY Actions
        return self.observation_space.sample(), 0, False, False, {}

Secondly, a model:

import gymnasium as gym
from ray.rllib.utils.typing import ModelConfigDict
import torch
from ray.rllib.models.torch.torch_modelv2 import TorchModelV2
from torch_geometric.data import Data
from torch_geometric.loader import DataLoader

from torch_geometric.nn import GCNConv
from torch_geometric.nn import Sequential as GNNSequential
from torch.nn import Sequential, Module, Linear

class GraphModel(TorchModelV2, Module):

    def __init__(self, obs_space: gym.spaces.Space, action_space: gym.spaces.Space, num_outputs: int, model_config: ModelConfigDict, name: str):
        super().__init__(obs_space, action_space, num_outputs, model_config, name)
        self.gnn = GNNSequential('x, edge_index', [(GCNConv(5, 5), 'x, edge_index -> x')])

        # Policy Head
        self._policy_head = Sequential(
            Linear(in_features=5, out_features=num_outputs)
        )

        self._value_head = Sequential(
            Linear(in_features=5, out_features=1)
        )

    def forward(self, input_dict, state, seq_len):

        batches = len(input_dict['obs']['graph'][0])

        # NOTE: Model assumes that each batch can have different number of nodes and they are stored as a list of 2-d tensors (where the first list is the batch, second is node number, third is feature)
        node_features = input_dict['obs']['graph'][0]
        adj_matrix = input_dict['obs']['graph'][2]

        # Get the number of nodes in each batch
        batch_node_size = [len(node_features[i]) for i in range(batches)]
        max_nodes = max(batch_node_size)

        # Pad the Node Features dynamically to batch with largest input size
        for batch in range(batches):
            to_append = torch.zeros(max_nodes - node_features[batch].shape[0], node_features[batch].shape[1])
            node_features[batch] = torch.cat([node_features[batch], to_append], dim=0)
        node_features = torch.stack(node_features, dim=1).type(torch.int).reshape(batches, max_nodes, -1)

        # Minibatch using PyTorch Geometric DataLoader
        data_list = []
        for batch in range(batches):
            node_batch_features = node_features[batch, :, :]
            adj = adj_matrix[batch].type(torch.long).permute(1, 0)
            data_list.append(Data(x=node_batch_features, edge_index=adj))
        data = DataLoader(data_list, batch_size=batches, shuffle=False)

        # Do GNN Inference
        for batch in data:
            output_gnn = self.gnn(batch.x, batch.edge_index).view(batches, max_nodes, -1)

        # Extract First Node to be output feature
        embedding = torch.empty(batches, 5)
        for batch in range(batches):
            embedding[batch] = output_gnn[batch, 0, :]

        # Value Head
        self.value = self._value_head(embedding)

        # Policy Head
        logits = self._policy_head(embedding)

        return logits, state

    def value_function(self):
        return torch.reshape(self.value, [-1])
dkupsh commented 12 months ago

Also, from what I have currently looked at. I changed space_utils.py to add this clause to get_dummy_batch_for_space function:

    elif isinstance(space, Graph):
        def get_nodes(num_nodes):
            return np.concatenate([get_dummy_batch_for_space(space.node_space, 1, fill_value) for _ in range(num_nodes)])
        def get_edges(num_edges):
            return np.concatenate([get_dummy_batch_for_space(space.edge_space, 1, fill_value) for _ in range(num_edges)])

        def get_edge_links(nodes, edges, batch):
            if fill_value == "random":
                return space.np_random.integers(
                    low=0, high=nodes[batch].shape[0], size=(edges[batch].shape[0], 2), dtype=space.dtype
                )
            return np.full(
                shape=(edges[batch].shape[0], 2), fill_value=fill_value, dtype=space.dtype
            )

        nodes = [get_nodes(np.random.randint(low=5, high=15)) for _ in range(batch_size)]
        edges = [get_edges(np.random.randint(low=3, high=10)) for _ in range(batch_size)]
        edge_links = [get_edge_links(nodes, edges, i) for i in range(batch_size)]

        return (nodes, edges, edge_links)
dkupsh commented 12 months ago

but, I'm currently still stuck on trying to get the concatenation function working correctly with different node sizes per batch.

Panhaolin2001 commented 8 months ago

Has this problem been solved now? Or is there any other way....

dkupsh commented 8 months ago

I was able to fix it. Unfortunately, this fundamentally altered the Samplebatch class, as data is no longer homogeneous with there being arrays of differently sized arrays. If it helps, I can give the branch that works for me, although its no longer updated.

Panhaolin2001 commented 8 months ago

Thank you for your reply! I am glad to hear that you have solved the problem successfully. If possible, I'd love to take a look at your branch code to better understand your solution. I would appreciate it if you could share the branch code of your work.

dkupsh commented 8 months ago

https://github.com/dkupsh/ray