ray-project / ray

Ray is an AI compute engine. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
33.56k stars 5.7k forks source link

[RLlib] Ape-X crashes for custom environment with Dict observation space #27309

Open eleninisioti opened 2 years ago

eleninisioti commented 2 years ago

What happened + What you expected to happen

I am trying to run Ape-X following the example here for the Wordcraft environemnt. I have taken a look at a similar issue here and the initial example provided runs correctly. so probably it's not an issue of the dependencies but of my environment.

Wordcraft is a quite big environment with different files, hence the big script. The main parts to notice are the run_apex.py method and the WordcraftEnvNoGoal class.

The error I get from running the script is related to the size of the observation space ( Expected flattened obs shape of [..., 10680], got (?, 5607)):

The full error log:

2022-07-30 18:01:53,647 INFO services.py:1462 -- View the Ray dashboard at http://127.0.0.1:8265 2022-07-30 18:01:56,122 INFO trainer.py:2296 -- Your framework setting is 'tf', meaning you are using static-graph mode. Set framework='tf2' to enable eager execution with tf2.x. You may also then want to set eager_tracing=True in order to reach similar execution speed as with static-graph mode. 2022-07-30 18:01:56,127 INFO simple_q.py:162 -- In multi-agent mode, policies will be optimized sequentially by the multi-GPU optimizer. Consider setting simple_optimizer=True if this doesn't work for you. 2022-07-30 18:01:56,127 INFO trainer.py:867 -- Current log_level is WARN. For more information, set 'log_level': 'INFO' / 'DEBUG' or use the -v and -vv flags. (pid=26361) (RolloutWorker pid=26770) 2022-07-30 18:02:01,875 WARNING rollout_worker.py:499 -- We've added a module for checking environments that are used in experiments. It will cause your environment to fail if your environment is not set upcorrectly. You can disable check env by setting disable_env_checking to True in your experiment config dictionary. You can run the environment checking module standalone by calling ray.rllib.utils.check_env(env). (RolloutWorker pid=26770) 2022-07-30 18:02:01,875 WARNING env.py:70 -- Env checking isn't implemented for VectorEnvs, RemoteBaseEnvs, ExternalMultiAgentEnv,or ExternalEnvs or Environments that are Ray actors (RolloutWorker pid=26770) /home/elena/workspace/playground/ray/SAPIENS (RolloutWorker pid=26770) 2022-07-30 18:02:05,516 ERROR worker.py:449 -- Exception raised in creation task: The actor died because of an error raised in its creation task, ray::RolloutWorker.init() (pid=26770, ip=192.168.1.36, repr=<ray.rllib.evaluation.rollout_worker.RolloutWorker object at 0x7f4c5a4cc7d0>) (RolloutWorker pid=26770) File "/home/elena/anaconda3/envs/apex/lib/python3.7/site-packages/ray/rllib/evaluation/rollout_worker.py", line 630, in init (RolloutWorker pid=26770) seed=seed, (RolloutWorker pid=26770) File "/home/elena/anaconda3/envs/apex/lib/python3.7/site-packages/ray/rllib/evaluation/rollout_worker.py", line 1723, in _build_policy_map (RolloutWorker pid=26770) name, orig_cls, obs_space, act_space, conf, merged_conf (RolloutWorker pid=26770) File "/home/elena/anaconda3/envs/apex/lib/python3.7/site-packages/ray/rllib/policy/policy_map.py", line 141, in create_policy (RolloutWorker pid=26770) observation_space, action_space, merged_config (RolloutWorker pid=26770) File "/home/elena/anaconda3/envs/apex/lib/python3.7/site-packages/ray/rllib/policy/tf_policy_template.py", line 270, in init (RolloutWorker pid=26770) get_batch_divisibility_req=get_batch_divisibility_req, (RolloutWorker pid=26770) File "/home/elena/anaconda3/envs/apex/lib/python3.7/site-packages/ray/rllib/policy/dynamic_tf_policy.py", line 326, in init (RolloutWorker pid=26770) is_training=in_dict.is_training, (RolloutWorker pid=26770) File "/home/elena/anaconda3/envs/apex/lib/python3.7/site-packages/ray/rllib/agents/dqn/dqn_tf_policy.py", line 229, in get_distribution_inputs_and_class (RolloutWorker pid=26770) policy, model, input_dict, state_batches=None, explore=explore (RolloutWorker pid=26770) File "/home/elena/anaconda3/envs/apex/lib/python3.7/site-packages/ray/rllib/agents/dqn/dqn_tf_policy.py", line 390, in compute_q_values (RolloutWorker pid=26770) model_out, state = model(input_batch, state_batches or [], seq_lens) (RolloutWorker pid=26770) File "/home/elena/anaconda3/envs/apex/lib/python3.7/site-packages/ray/rllib/models/modelv2.py", line 240, in call (RolloutWorker pid=26770) input_dict["obs"], self.obs_space, self.framework (RolloutWorker pid=26770) File "/home/elena/anaconda3/envs/apex/lib/python3.7/site-packages/ray/rllib/models/modelv2.py", line 402, in restore_original_dimensions (RolloutWorker pid=26770) return _unpack_obs(obs, original_space, tensorlib=tensorlib) (RolloutWorker pid=26770) File "/home/elena/anaconda3/envs/apex/lib/python3.7/site-packages/ray/rllib/models/modelv2.py", line 437, in _unpack_obs (RolloutWorker pid=26770) prep.shape[0], obs.shape (RolloutWorker pid=26770) ValueError: Expected flattened obs shape of [..., 10680], got (?, 5607) Traceback (most recent call last): File "/home/elena/anaconda3/envs/apex/lib/python3.7/site-packages/ray/rllib/agents/trainer.py", line 896, in setup self._init(self.config, self.env_creator) File "/home/elena/anaconda3/envs/apex/lib/python3.7/site-packages/ray/rllib/agents/trainer.py", line 1035, in _init raise NotImplementedError NotImplementedError

During handling of the above exception, another exception occurred:

Traceback (most recent call last): File "/home/elena/anaconda3/envs/apex/lib/python3.7/site-packages/ray/worker.py", line 1811, in get raise value ray.exceptions.RayActorError: The actor died because of an error raised in its creation task, ray::RolloutWorker.init() (pid=26770, ip=192.168.1.36, repr=<ray.rllib.evaluation.rollout_worker.RolloutWorker object at 0x7f4c5a4cc7d0>) File "/home/elena/anaconda3/envs/apex/lib/python3.7/site-packages/ray/rllib/evaluation/rollout_worker.py", line 630, in init seed=seed, File "/home/elena/anaconda3/envs/apex/lib/python3.7/site-packages/ray/rllib/evaluation/rollout_worker.py", line 1723, in _build_policy_map name, orig_cls, obs_space, act_space, conf, merged_conf File "/home/elena/anaconda3/envs/apex/lib/python3.7/site-packages/ray/rllib/policy/policy_map.py", line 141, in create_policy observation_space, action_space, merged_config File "/home/elena/anaconda3/envs/apex/lib/python3.7/site-packages/ray/rllib/policy/tf_policy_template.py", line 270, in init get_batch_divisibility_req=get_batch_divisibility_req, File "/home/elena/anaconda3/envs/apex/lib/python3.7/site-packages/ray/rllib/policy/dynamic_tf_policy.py", line 326, in init is_training=in_dict.is_training, File "/home/elena/anaconda3/envs/apex/lib/python3.7/site-packages/ray/rllib/agents/dqn/dqn_tf_policy.py", line 229, in get_distribution_inputs_and_class policy, model, input_dict, state_batches=None, explore=explore File "/home/elena/anaconda3/envs/apex/lib/python3.7/site-packages/ray/rllib/agents/dqn/dqn_tf_policy.py", line 390, in compute_q_values model_out, state = model(input_batch, state_batches or [], seq_lens) File "/home/elena/anaconda3/envs/apex/lib/python3.7/site-packages/ray/rllib/models/modelv2.py", line 240, in call input_dict["obs"], self.obs_space, self.framework File "/home/elena/anaconda3/envs/apex/lib/python3.7/site-packages/ray/rllib/models/modelv2.py", line 402, in restore_original_dimensions return _unpack_obs(obs, original_space, tensorlib=tensorlib) File "/home/elena/anaconda3/envs/apex/lib/python3.7/site-packages/ray/rllib/models/modelv2.py", line 437, in _unpack_obs prep.shape[0], obs.shape ValueError: Expected flattened obs shape of [..., 10680], got (?, 5607) python-BaseException (RolloutWorker pid=26770)

Process finished with exit code 1

Versions / Dependencies

gym Version: 0.18.0 ray Version: 1.12.0 python Version: 3.7.13 OS: Linux 5.4.0-122-generic

To run the script you need to have the file "example.json" under the same directory with content:

{"entities": {"a_base_a_0": {"id": 0, "recipes": []}, "a_base_b_0": {"id": 1, "recipes": []}, "a_base_c_0": {"id": 2, "recipes": []}, "a_1": {"id": 3, "recipes": [["a_base_b_0", "a_base_b_0"]]}, "a_2": {"id": 4, "recipes": [["a_1", "a_base_a_0"]]}, "a_3": {"id": 5, "recipes": [["a_2", "a_base_a_0"]]}, "a_4": {"id": 6, "recipes": [["a_3", "a_base_c_0"]]}, "a_5": {"id": 7, "recipes": [["a_4", "a_base_c_0"]]}, "a_6": {"id": 8, "recipes": [["a_5", "a_base_c_0"]]}, "a_7": {"id": 9, "recipes": [["a_6", "a_base_b_0"]]}, "a_8": {"id": 10, "recipes": [["a_7", "a_base_a_0"]]}, "a_9": {"id": 11, "recipes": [["a_8", "a_base_b_0"]]}, "_dist_a_0": {"id": 12, "recipes": []}, "_dist_b_0": {"id": 13, "recipes": []}, "_dist_c_0": {"id": 14, "recipes": []}, "_dist_d_0": {"id": 15, "recipes": []}, "_dist_e_0": {"id": 16, "recipes": []}, "_dist_f_0": {"id": 17, "recipes": []}, "_dist_g_0": {"id": 18, "recipes": []}, "_dist_h_0": {"id": 19, "recipes": []}}}

Reproduction script

from ray import tune
import sys
sys.path.append(".")
import gym
import ray
import ray.rllib.agents.dqn.apex as apex
from ray.tune.logger import pretty_print
from ray.tune.registry import register_env
import os
import json
import pickle
import collections
import random
import timeit
import copy
import re
import numpy as np
from gym.utils import seeding

recipe_book_info = {"example": {"path": "example.json",
                                "best_paths": ["a_8"],
                                "n_paths": 1,
                                "best_reward": 36,
                                "max_steps": 1000000,
                                "early_step": 500000}}

DEBUG = False

class Recipe(collections.Counter):
    """A hashable recipe.
    Allows for indexing into dictionaries.
    """

    def __hash__(self):
        return tuple(
            sorted(self.items(), key=lambda x: x[0] if x[0] is not None else "")
        ).__hash__()

    def __len__(self):
        return len(list(self.elements()))

class Task:
    """
    A hashable recipe task.
    """

    def __init__(self, goal, base_entities, intermediate_entities, relevant_recipes):
        self.goal = goal
        self.base_entities = tuple(sorted(base_entities))
        self.intermediate_entities = tuple(sorted(intermediate_entities))
        self.relevant_recipes = tuple(relevant_recipes)

    def __hash__(self):
        return tuple(
            (
                self.goal,
                self.base_entities,
                self.intermediate_entities,
                self.relevant_recipes,
            )
        ).__hash__()

class RecipeBook:
    def __init__(
        self,
        data_path="datasets/alchemy2.json",
        max_depth=1,
        split=None,
        train_ratio=1.0,
        seed=None,
    ):
        self.test_mode = False
        self.train_ratio = train_ratio
        self.set_seed(seed)

        self.data_path = data_path
        self._rawdata = self._load_data(data_path)
        self.max_depth = max_depth

        self.entities = tuple(self._rawdata["entities"].keys())
        self.entity2level = {
            e: [int(s) for s in re.findall(r"\d+", e)][
                0
            ]  # Todo : use the recipe chaining tools from task instead of name based
            for i, e in enumerate(self.entities)
        }
        self.entity2index = {e: i for i, e in enumerate(self.entities)}
        self.entity2recipes = collections.defaultdict(list)
        self.distractors = tuple(e for e in self.entities if "dist" in e)

        for e in self.entities:
            for r in self._rawdata["entities"][e]["recipes"]:
                if e not in r:
                    self.entity2recipes[e].append(Recipe(r))
        self.entity2recipes = dict(self.entity2recipes)

        self.max_recipe_size = 0
        self.recipe2entity = collections.defaultdict(str)
        for entity, recipes in self.entity2recipes.items():
            for r in recipes:
                self.recipe2entity[r] = entity
                self.max_recipe_size = max(len(r), self.max_recipe_size)
        self.root_entities = set(
            [e for e in self.entities if e not in self.entity2recipes]
        )

        self.init_neighbors_combineswith()
        self.terminal_entities = set(
            [e for e in self.entities if e not in self.neighbors_combineswith]
        )

        # self._init_tasks_for_depth(max_depth)
        # self._init_recipe_weighted_entity_dist()

        # self._init_data_split(split=split, train_ratio=train_ratio)

    def _random_choice(self, options):
        # Fast random choice
        i = self.np_random.randint(0, len(options))
        return options[i]

    def _load_data(self, path):
        print(os.getcwd())
        f = open(path)
        jsondata = json.load(f)
        f.close()

        return jsondata

    def set_seed(self, seed):
        self.np_random, self.seed = seeding.np_random(seed)

    def save(self, path):
        """
        Serialize to bytes and save to file
        """
        path = os.path.expandvars(os.path.expanduser(path))
        f = open(path, "wb+")
        pickle.dump(self, f)

    @staticmethod
    def load(path):
        """
        Returns a new RecipeBook object loaded from a binary file that is the output of save.
        """
        path = os.path.expandvars(os.path.expanduser(path))
        f = open(path, "rb")
        return pickle.load(f)

    def get_recipes(self, entity):
        return self.entity2recipes[entity] if entity in self.entity2recipes else None

    def evaluate_recipe(self, recipe):
        e = self.recipe2entity[recipe]
        return e if e != "" else None

    def init_neighbors_combineswith(self):
        self.neighbors_combineswith = collections.defaultdict(set)
        for recipe in self.recipe2entity:
            e1, e2 = recipe if len(recipe.keys()) == 2 else list(recipe.keys()) * 2
            self.neighbors_combineswith[e1].add(e2)
            self.neighbors_combineswith[e2].add(e1)

    def sample_task(self, depth=None):
        """
        Returns a task tuple (<goal>, <intermediate entities>, <base entities>)
        """
        if depth is None:
            depth = self.np_random.choice(range(1, self.max_depth + 1))

        sample_space = (
            self.depth2task_test
            if self.test_mode and self.train_ratio < 1.0
            else self.depth2task_train
        )
        return self._random_choice(sample_space[depth])

    def sample_distractors(self, task, num_distractors=1, uniform=True):
        base_e = set(task.base_entities)
        intermediate_e = set(task.intermediate_entities)

        def is_valid(e):
            return e != task.goal and e not in base_e and e not in intermediate_e

        options = [(i, e) for i, e in enumerate(self.entities) if is_valid(e)]
        sample_index_space, sample_space = zip(*options)

        if uniform:
            return tuple(self._random_choice(sample_space, num_distractors).tolist())
        else:
            # sample according to recipe-weighted entity distribution
            sample_index_space = set(sample_index_space)
            dist = np.array(
                [p for i, p in enumerate(self.entity_dist) if i in sample_index_space]
            )
            dist /= dist.sum()
            return tuple(
                self.np_random.choice(sample_space, num_distractors, p=dist).tolist()
            )

    def _generate_all_tasks_for_goal(self, goal, max_depth=3):
        base_entities = [goal]
        intermediate_entities = set()
        print(DEBUG, f"Expanding tasks to goal {goal}")
        self._expand_tasks_to_goal(
            goal, max_depth, base_entities, intermediate_entities
        )
        print(DEBUG, "Done.")

    def _expand_tasks_to_goal(
        self,
        goal,
        max_depth=1,
        base_entities=[],
        intermediate_entities=set(),
        relevant_recipes=[],
    ):
        """
        DFS expansion of recipes for an entity to generate new tasks
        """
        for b in base_entities:
            if (
                b not in self.root_entities
            ):  # Can't expand if it's a root entity or cyclic
                if b != goal:
                    intermediate_entities.add(b)
                next_base_entities = base_entities[:]
                next_base_entities.remove(b)

                cur_depth = len(intermediate_entities) + 1

                print(DEBUG, "--Expanding base entity", b)

                # Expand each recipe for each base entity
                for recipe in self.entity2recipes[b]:
                    print(DEBUG, f"----Trying recipe for {b}, {recipe}")
                    expanded_entities = [
                        e for e in recipe if e not in next_base_entities
                    ]
                    is_cycle = False
                    for e in recipe:
                        if e in intermediate_entities or e == goal:
                            print(
                                DEBUG, f"------Cycle detected, skipping recipe {recipe}"
                            )
                            is_cycle = True
                            break
                    if is_cycle:
                        continue

                    old_base_entities = next_base_entities
                    next_base_entities = expanded_entities + next_base_entities

                    # Add task
                    relevant_recipes.append(recipe)
                    task = Task(
                        goal,
                        next_base_entities,
                        intermediate_entities,
                        relevant_recipes[:],
                    )
                    if task not in self.depth2task[cur_depth]:
                        self.depth2task[cur_depth].add(task)
                        print(DEBUG, f"------Adding task {task}")

                    if cur_depth < max_depth:
                        print(DEBUG, f"current depth is {cur_depth}")
                        self._expand_tasks_to_goal(
                            goal,
                            max_depth,
                            next_base_entities,
                            intermediate_entities,
                            relevant_recipes[:],
                        )

                    relevant_recipes.remove(recipe)
                    next_base_entities = old_base_entities

                if b != goal:
                    intermediate_entities.remove(b)

    def _init_tasks_for_depth(self, max_depth=2):
        self.depth2task = collections.defaultdict(set)  # depth to task tuples

        total = 0
        for e in self.entities:
            # self._generate_all_tasks_for_goal(e)
            s = timeit.timeit(
                lambda: self._generate_all_tasks_for_goal(e, max_depth=max_depth),
                number=1,
            )
            # print(f'Generated max-depth {max_depth} recipes for {e} in {s} s.')
            total += s

        print(
            f"Generated all max-depth {max_depth} tasks for {len(self.entities)} entities in {total} s."
        )

        for d in self.depth2task:
            self.depth2task[d] = tuple(self.depth2task[d])
            print(f"Depth {d} tasks: {len(self.depth2task[d])}")

    def _init_recipe_weighted_entity_dist(self):
        entities_cnt = dict({e: 0 for e in self.entities})
        for recipe in self.recipe2entity.keys():
            for e in recipe:
                entities_cnt[e] += 1

        unnormalized = (
            np.array(list(entities_cnt.values())) + 1
        )  # Even terminal entities have > 0 chance of being sampled
        self.entity_dist = unnormalized / unnormalized.sum()

    def _init_data_split(self, split, train_ratio):
        self.split = split

        depths = range(1, self.max_depth + 1)

        self.goals_train = []
        self.goals_test = []

        self.depth2task_train = {d: [] for d in depths}
        self.depth2task_test = {d: [] for d in depths}

        if split in ["debug", "by_goal", "by_goal_train_terminals"]:
            # Map goals --> depth --> tasks
            self.goal2depth2task = {
                goal: {depth: [] for depth in depths} for goal in self.entities
            }
            for depth in self.depth2task:
                tasks = self.depth2task[depth]
                for task in tasks:
                    self.goal2depth2task[task.goal][depth].append(task)

            # Split goals into train and test
            all_goals = list(self.entities)
            self.np_random.shuffle(all_goals)
            if split == "debug":
                train_ratio = 1.0
            train_size = int(np.ceil(train_ratio * len(all_goals)))

            if split == "by_goal_train_terminals":
                assert train_size > len(
                    self.terminal_entities
                ), "Train size must be > terminal entities"

                all_goals = list(set(all_goals) - self.terminal_entities)
                train_size = train_size - len(self.terminal_entities)

            self.goals_train = all_goals[:train_size]
            self.goals_test = all_goals[train_size:]

            if split == "debug":
                self.goals_test = list(self.goals_train)

            for depth in depths:
                for goal in self.goals_train:
                    self.depth2task_train[depth] += self.goal2depth2task[goal][depth]

                for goal in self.goals_test:
                    self.depth2task_test[depth] += self.goal2depth2task[goal][depth]

        elif split in ["by_recipe", "by_recipe_train_all_goals"]:
            all_recipes = list(self.recipe2entity.keys())
            self.np_random.shuffle(all_recipes)
            train_size = int(np.ceil(train_ratio * len(all_recipes)))
            self.recipes_train = set(all_recipes[:train_size])
            self.recipes_test = set(all_recipes[train_size:])
            if split == "by_recipe_train_all_goals":
                self._fill_recipe_entity_support()

            for depth in self.depth2task:
                tasks = self.depth2task[depth]
                for task in tasks:
                    is_test_task = False
                    for recipe in task.relevant_recipes:
                        if recipe in self.recipes_test:
                            self.depth2task_test[depth].append(task)
                            is_test_task = True
                            break
                    if not is_test_task:
                        self.depth2task_train[depth].append(task)

        elif split == "by_task":
            for depth in depths:
                all_tasks_at_depth = list(self.depth2task[depth])
                self.np_random.shuffle(all_tasks_at_depth)
                train_size_at_depth = int(
                    np.ceil(train_ratio * len(all_tasks_at_depth))
                )

                self.depth2task_train[depth] = all_tasks_at_depth[:train_size_at_depth]
                self.depth2task_test[depth] = all_tasks_at_depth[train_size_at_depth:]

        else:
            raise ValueError(f"Unsupported split {split}")

        train_size = 0
        test_size = 0
        overlap = 0
        for depth in depths:
            train_tasks = set(self.depth2task_train[depth])
            test_tasks = set(self.depth2task_test[depth])

            train_size += len(train_tasks)
            test_size += len(test_tasks)

            overlap += len(train_tasks.intersection(test_tasks))

    def _fill_recipe_entity_support(self):
        # Make sure all entities are represented among self.recipes_train at depth=1 as either ingredient or goa
        def make_entity2recipes(recipes):
            entity2recipes = collections.defaultdict(set)
            for recipe in recipes:
                goal = self.recipe2entity[recipe]
                entity2recipes[goal].add(recipe)
                for e in recipe:
                    entity2recipes[e].add(recipe)
            return entity2recipes

        entity2recipes_train = make_entity2recipes(self.recipes_train)
        entity2recipes_test = make_entity2recipes(self.recipes_test)

        train_entities = set(entity2recipes_train.keys())
        missing_entities = [e for e in self.entities if e not in train_entities]

        aux_recipes = set()
        for e in missing_entities:
            aux_recipe = self._random_choice(list(entity2recipes_test[e]))
            aux_recipes.add(aux_recipe)

        for recipe in aux_recipes:
            self.recipes_train.add(recipe)
            self.recipes_test.remove(recipe)

NO_RECIPE_PENALTY = 0
IRRELEVANT_RECIPE_PENALTY = 0
GOAL_REWARD = 1.0
SUBGOAL_REWARD = 1.0

class FeatureMap:
    """
    Used for initializing entity features.
    """
    def __init__(
        self,
        words,
        feature_type='glove',
        random_feature_size=300, # only for random features
        random_feature_std=0.4,
        shuffle=False,
        seed=None,
        savedir='~/model_cache/spacy',
        cache_id='latest'
    ):
        self.feature_map = {}
        self.feature_dim = 0
        self.np_random, self.seed = seeding.np_random(seed)
        self.savedir = savedir
        self.cache_id = cache_id

        # Initialize featurizer
        if feature_type == 'one_hot':
            one_hots = np.eye(len(words))
            for i, e in enumerate(words):
                self.feature_map[e] = one_hots[i]
            self.feature_dim = one_hots[i].shape[-1]

        elif feature_type == 'random':
            b = random_feature_std/np.sqrt(2)
            rand_features = self.np_random.laplace(
                loc=0, scale=b, size=(len(words), random_feature_size)
            )
            for i, e in enumerate(words):
                self.feature_map[e] = rand_features[i,:]
            self.feature_dim = rand_features[i,:].shape[-1]

        elif feature_type == 'glove':
            loaded = self._load_feature_map()
            if loaded is not None:
                words, feature_vectors = loaded
            else:
                print('Loading glove model...')
                word2vec = spacy.load('en_core_web_lg') # 300-dim GloVe embeddings
                print('Loaded glove model.')
                feature_vectors = []
                for e in words:
                    tokens_embeddings = []
                    for token in e.split():
                        tokens_embeddings.append(word2vec(token).vector)
                    feature_vectors.append(np.array(tokens_embeddings).max(0))

                if cache_id:
                    feature_vectors = np.array(feature_vectors)
                    self._save_feature_map(words, feature_vectors)

            if shuffle:
                self.np_random.shuffle(feature_vectors)
            self.feature_map = dict(zip(words, feature_vectors))
            self.feature_dim = self.feature_map[words[0]].shape[-1]

        else:
            raise ValueError(f'Feature type {feature_type} is not implemented.')

    def seed(self, seed):
        self.np_random, self.seed = seeding.np_random(seed)

    def feature(self, word):
        return self.feature_map[word]

    def _basepath(self, savedir, cache_id):
        basepath = os.path.expandvars(os.path.expanduser(savedir))
        basepath = os.path.join(basepath, cache_id)
        return basepath

    def _save_feature_map(self, vocab, feature_vectors, savedir=None, cache_id=None):
        if savedir is None:
            savedir = self.savedir

        if cache_id is None:
            cache_id = self.cache_id

        basepath = self._basepath(savedir, cache_id)
        if not os.path.exists(basepath):
            os.makedirs(basepath, exist_ok=True)
        feature_bin_path = os.path.join(basepath, f'{self.cache_id}')
        vocab_path = os.path.join(basepath, f'{self.cache_id}.vocab')

        np.save(feature_bin_path, feature_vectors)

        with open(vocab_path, 'w+') as vocab_file:
            for w in vocab:
                vocab_file.write(w+'\n')

    def _load_feature_map(self, savedir=None, cache_id=None):
        if savedir is None:
            savedir = self.savedir

        if cache_id is None:
            cache_id = self.cache_id

        basepath = self._basepath(savedir, cache_id)
        feature_bin_path = os.path.join(basepath, f'{self.cache_id}.npy')
        vocab_path = os.path.join(basepath, f'{self.cache_id}.vocab')

        if os.path.exists(feature_bin_path) and os.path.exists(vocab_path):
            with open(vocab_path) as vocab_file:
                words = [w.strip() for w in vocab_file]
            feature_vectors = np.load(feature_bin_path)
            feature_map = dict(zip(words, feature_vectors))
            return words, feature_vectors
        else:
            return None

class WordCraftEnvNoGoal(gym.Env):
    """
    Simple text-only RL environment for crafting multi-step recipes.

    At a high level, the state consists of a goal, the inventory, and the current selection.
    """

    def __init__(
        self, env_config
    ):
        super().__init__()

        self.eval_mode = False

        if env_config["seed"] is None:
            env_config["seed"] = int.from_bytes(os.urandom(4), byteorder="little")
        self.set_seed(env_config["seed"])
        # utils_seed(seed)

        if env_config["recipe_book_path"] is not None:
            self.recipe_book = RecipeBook.load(env_config["recipe_book_path"])
            self.recipe_book.set_seed(env_config["seed"])
            max_depth = self.recipe_book.max_depth
        else:
            self.recipe_book = RecipeBook(
                data_path=env_config["data_path"],
                max_depth=env_config["max_depth"],
                split=env_config["split"],
                train_ratio=env_config["train_ratio"],
                seed=env_config["seed"],
            )

        self.feature_map = FeatureMap(
            words=self.recipe_book.entities,
            feature_type=env_config["feature_type"],
            random_feature_size=env_config["random_feature_size"],
            shuffle=env_config["shuffle_features"],
            seed=env_config["seed"],
        )

        self.max_selection_size = self.recipe_book.max_recipe_size
        self.max_mix_steps = max(env_config["max_mix_steps"] or env_config["max_depth"], env_config["max_depth"])
        self.max_steps = self.max_selection_size * self.max_mix_steps

        self.sample_depth = env_config["max_depth"]

        self.subgoal_rewards = env_config["subgoal_rewards"]
        self.max_depth = env_config["max_depth"]
        self.num_distractors = env_config["num_distractors"]
        self.uniform_distractors = env_config["uniform_distractors"]
        if self.num_distractors:
            self.distractors = np.random.choice(
                self.recipe_book.distractors, self.num_distractors
            )
        else:
            self.distractors = []
        self.orig_table = [
            entity
            for entity in self.recipe_book.entity2level
            if self.recipe_book.entity2level[entity] == 0
               and entity not in self.recipe_book.distractors
        ]

        # I think this assumes 2 base elements
        self.max_table_size = (
            2 ** env_config["max_depth"] + env_config["num_distractors"] + self.max_mix_steps + len(self.orig_table) - 2
        )

        self.task = None

        self.goal_features = np.zeros(self.feature_map.feature_dim)

        self._reset_table()
        self._reset_selection()
        self._reset_history()

        self.episode_step = 0
        self.episode_mix_steps = 0
        self.episode_reward = 0
        self.done = False

        self.data_path = env_config["data_path"]

        obs = self.reset()
        num_entities = len(self.recipe_book.entities)
        dspaces = {
            "table_index": gym.spaces.MultiDiscrete(
                self.max_table_size * [num_entities]
            ),
            "table_features": gym.spaces.Box(
                shape=self.table_features.shape, low=-1.0, high=1.0
            ),
            "selection_index": gym.spaces.MultiDiscrete(
                self.max_selection_size * [num_entities]
            ),
            "selection_features": gym.spaces.Box(
                shape=self.selection_features.shape, low=-1.0, high=1.0
            ),
        }
        self.observation_space = gym.spaces.Dict(dspaces)
        self.action_space = gym.spaces.Discrete(
            self.max_table_size
        )  # Actions correspond to choosing an entity in a table position
        self.discovered = []

        self.debug = {"actions": []}
        """
        self.n_entities = len(self.recipe_book.entities)
        self.action_space = Discrete(
            self.n_entities
        )  # could be changed to only the elements than can be reached in the allowed steps (for now, too deep to be
        # requires)
        dspaces = {
            "table_features": MultiDiscrete(self.n_entities * [self.n_entities]),
            "selection_features": MultiDiscrete(self.n_entities * [self.n_entities]),
        }
        self.observation_space = Dict(dspaces)
        self.proportional = proportional
        self.discovered = []
        """

    def reset(self):
        self.discovered = []
        self.episode_step = 0
        self.episode_mix_steps = 0
        self.episode_reward = 0
        self.done = False

        # self.task = self.recipe_book.sample_task(depth=self.sample_depth)
        # self.distractors = self.recipe_book.sample_distractors(
        #     self.task, self.num_distractors, uniform=self.uniform_distractors
        # )
        # self.goal_features = self.feature_map.feature(self.task.goal)
        # self.distractors = np.random.choice(
        #     self.recipe_book.distractors, self.num_distractors
        # )
        self._reset_selection()
        self._reset_table()
        self._reset_history()

        return self._get_observation()

    def eval(self, split="test"):
        self.eval_mode = True
        self.recipe_book.test_mode = split == "test"

    def train(self):
        self.eval_mode = False
        self.recipe_book.test_mode = False

    def set_seed(self, seed):
        self.np_random, self.seed = seeding.np_random(seed)

    def sample_depth(self, depth):
        self.sample_depth = depth

    def _reset_table(self):

        self.distractors = np.random.choice(
            self.recipe_book.distractors, self.num_distractors
        )

        self.table = list(
            np.concatenate(
                (
                    self.orig_table,
                    self.distractors,
                )
            )
        )
        self.np_random.shuffle(self.table)
        self.table_index = -np.ones(self.max_table_size, dtype=int)
        self.table_features = np.zeros(
            (self.max_table_size, self.feature_map.feature_dim)
        )

        num_start_items = len(self.table)
        self.table_index[:num_start_items] = np.array(
            [self.recipe_book.entity2index[e] for e in self.table], dtype=int
        )
        # if self.task:
        self.table_features[:num_start_items, :] = np.array(
            [self.feature_map.feature(e) for e in self.table]
        )

    def _reset_selection(self):
        self.selection = []
        self.selection_index = -np.ones(self.max_selection_size, dtype=int)
        self.selection_features = np.zeros(
            (self.max_selection_size, self.feature_map.feature_dim)
        )

    def _reset_history(self):
        self.subgoal_history = set()

    def _get_observation(self):
        """
        Note, includes indices for each inventory and selection item,
        since torchbeast stores actions in a shared_memory tensor shared among actor processes
        """
        return {
            # "goal_index": [self.recipe_book.entity2index[self.task.goal]],
            # "goal_features": self.goal_features,
            "table_index": self.table_index,
            "table_features": self.table_features,
            "selection_index": self.selection_index,
            "selection_features": self.selection_features,
        }

    def step(self, action):
        reward = 0
        if self.done:  # no-op if env is done
            return self._get_observation(), reward, self.done, {}

        # Handle invalid actions
        invalid_action = not (0 <= action < self.max_table_size)
        if invalid_action:
            self.episode_step += 1
            if self.episode_step >= self.max_steps:
                self.done = True

        i = self.table_index[action]
        e = self.recipe_book.entities[i]

        selection_size = len(self.selection)
        if selection_size < self.max_selection_size:
            # Update selection
            self.selection.append(e)
            self.selection_index[selection_size] = i
            self.selection_features[selection_size, :] = self.feature_map.feature(e)
            selection_size = len(self.selection)

        if selection_size == self.max_selection_size:
            self.episode_mix_steps += 1

            # Evaluate selection
            recipe = Recipe(self.selection)
            result = self.recipe_book.evaluate_recipe(recipe)

            if result != None and result not in self.discovered:
                reward = self.recipe_book.entity2level[result]

                self.discovered.append(result)

            self.episode_reward += reward

            if result:
                result_i = self.recipe_book.entity2index[result]
                table_size = len(self.table)
                self.table.append(result)
                self.table_index[table_size] = result_i
                self.table_features[table_size, :] = self.feature_map.feature(result)

            # Clear selection
            self._reset_selection()

        self.episode_step += 1
        if (
            self.episode_mix_steps >= self.max_mix_steps
            or self.episode_step >= self.max_steps
        ):
            self.done = True

        obs = self._get_observation()

        return obs, reward, self.done, {}

gym.envs.registration.register(
    id="wordcraft-multistep-no-goal-v0",
    entry_point=f"{__name__}:WordCraftEnvNoGoal",
)

def run_apex():
    top_dir = "."

    ray.init()
    config = apex.APEX_DEFAULT_CONFIG.copy()
    config["num_gpus"] = 0
    config["num_workers"] = 1

    tasks = ["example"]

    for task in tasks:
        project_path = top_dir + "/" + task

        config["env_config"] = {"log_path": project_path + "/tb_logs",
                                "data_path": recipe_book_info[task]["path"], "seed": None,
                                "recipe_book_path": None,
                                "feature_type": "one_hot",
                                "shuffle_features": False,
                                "random_feature_size": 300,
                                "max_depth": 8,
                                "split": "by_recipe",
                                "train_ratio": 1.0,
                                "num_distractors": 0,
                                "uniform_distractors": False,
                                "max_mix_steps": 8,
                                "subgoal_rewards": True}

        select_env = "wordcraft-multistep-no-goal-v0"

        register_env(select_env, lambda config: WordCraftEnvNoGoal)
        trainer = apex.ApexTrainer(env=WordCraftEnvNoGoal, config=config)

        for _ in range(1000):
            # Perform one iteration of training the policy with Apex-DQN
            result = trainer.train()
            print(pretty_print(result))

if __name__ == "__main__":
    run_apex()

Issue Severity

High: It blocks me from completing my task.

sven1977 commented 2 years ago

Hey @eleninisioti , thanks for posting this issue here!

I tried to reproduce it, but ran into another, seemingly related error message:

The observation collected from env.reset() was not  contained within your env's observation space. It is possible that there was a type mismatch, or that one of the sub-observations  was out of bounds: 
 reset_obs: {'table_index': array([ 1,  0,  2, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,
       -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,
       -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,
       -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,
       -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,
       -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,
       -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,
       -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,
       -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,
       -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,
       -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,
       -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,
       -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,
       -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,
       -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,
       -1, -1, -1, -1, -1, -1, -1, -1, -1, -1]), 'table_features': array([[0., 1., 0., ..., 0., 0., 0.],
       [1., 0., 0., ..., 0., 0., 0.],
       [0., 0., 1., ..., 0., 0., 0.],
       ...,
       [0., 0., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 0., 0., 0.]]), 'selection_index': array([-1, -1]), 'selection_features': array([[0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0.],
       [0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0.]])}
 env.observation_space: Dict(selection_features:Box(-1.0, 1.0, (2, 20), float32), selection_index:MultiDiscrete([20 20]), table_features:Box(-1.0, 1.0, (265, 20), float32), table_index:MultiDiscrete([20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20
 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20
 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20
 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20
 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20
 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20
 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20
 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20
 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20
 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20
 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20
 20]))
 reset_obs's dtype: <class 'dict'>
 env.observation_space's dtype: None

If you look at the value returned from your reset() method on the "table_index" key, you can see that even though the space for this is MultiDiscrete([20, 20, ...]), some of the values are -1.0 (only values between 0 and 19 are allowed here).

Could you make sure to fix that issue inside your environment or your observation space and repost a new repro script here?

Since I was not able to fully reproduce your exact error, could you also post the Ray version and OSS here? Maybe your particular issue was already fixed in master.

RocketRider commented 1 year ago

Could be related to the ordering issue in the dict space: https://github.com/ray-project/ray/issues/33327