Closed goodluckoguzie closed 7 months ago
Hi @goodluckoguzie, thanks for reaching out!
All of ours Dreamer's implementation can work with both image and vector observations as is.
In theory you should only have to implement your environment and select it when you run your training: sheeprl exp=dreamer_v3 env=your_custom_env algo.cnn_keys.encoder=[...] algo.mlp_keys.encoder=[...]
, where you have to select which environment keys you want the agent to encode/decode.
Hi, @belerico , Thanks for your response. Please can you elaborate this (sheeprl exp=dreamer_v3 env=your_custom_env algo.cnn_keys.encoder=[...] algo.mlp_keys.encoder=[...]) please what should be the inputs (algo.cnn_keys.encoder=[...] algo.mlp_keys.encoder=[...]) my observation is 23 and my action space is 2.
Hi @goodluckoguzie, if the observation space is a gymnasium.spaces.Box
, then the sheeprl.utils.env.make_env()
function will automatically wrap the observation space with a gymnasium.spaces.Dict
space. The key will be the one you specify in the algo.mlp_keys.encoder
list (i.e., you can choose the key you want to assign to your vector observations).
If your environment implements the render()
function, then you can decide to exploit the image observation too: all you have to do is to specify a key in the algo.cnn_keys.encoder
list.
Otherwise, if your custom environment is a gymnasium.spaces.Dict
, then, you just put the name of your observations in the algo.mlp_keys.encoder
.
Hi @goodluckoguzie, here there's an example of your environment compliant with sheeprl, to be placed under sheeprl/envs/socnav.py
:
import random
import sys
from typing import Any, Dict, Optional
import cv2
import gymnasium as gym
import numpy as np
from gymnasium import spaces
# TO DO List
#
#
# Not urgent:
# - Improve how the robot moves (actual differential platform)
#
class SocNavEnv(gym.Env):
def __init__(
self,
resolution=700.0,
resolution_view=1000.0,
map_size=8.0,
margin=0.5,
max_ticks=250,
timestep=0.1,
robot_radius=0.15,
goal_radius=0.5,
number_of_humans=5,
human_threshold=0.4,
reach_reward=1.0,
outofmap_reward=-0.5,
maxticks_reward=-0.5,
alive_reward=-0.00001,
collision_reward=-1.0,
distance_reward_divisor=1000,
max_advance=1.4,
max_rotation=np.pi / 2,
milliseconds=30,
debug: int = 0,
):
super().__init__()
self.resolution = resolution
self.resolution_view = resolution_view
self.map_size = map_size
self.margin = margin
self.max_ticks = max_ticks
self.timestep = timestep
self.robot_radius = robot_radius
self.goal_radius = goal_radius
self.number_of_humans = number_of_humans
self.human_threshold = human_threshold
self.reach_reward = reach_reward
self.outofmap_reward = outofmap_reward
self.maxticks_reward = maxticks_reward
self.alive_reward = alive_reward
self.collision_reward = collision_reward
self.distance_reward_divisor = distance_reward_divisor
self.max_advance = max_advance
self.max_rotation = max_rotation
self.milliseconds = milliseconds
self.debug = debug
self.goal_threshold = (self.robot_radius + self.goal_radius,)
self.pixel_to_world = (resolution / map_size,)
self.window_initialised = False
self.ticks = 0
self.humans = np.zeros((self.number_of_humans, 4)) # x, y, orientation, speed
self.robot = np.zeros((1, 3)) # x, y, orientation
self.goal = np.zeros((1, 2)) # x, y
self.robot_is_done = True
self.world_image = np.zeros((int(self.resolution), int(self.resolution), 3))
self.reset()
@property
def observation_space(self):
low = np.array(
[
-self.map_size / 2,
-self.map_size / 2,
-1.0,
-1.0,
-self.map_size / 2,
-self.map_size / 2,
] # robot: x, y, sin, cos
+ [-self.map_size / 2, -self.map_size / 2, -1.0, -1.0, -self.max_advance]
* self.number_of_humans # goal: x, y
) # humans: x, y, sin, cos, speed
high = np.array(
[
+self.map_size / 2,
+self.map_size / 2,
+1.0,
+1.0,
+self.map_size / 2,
+self.map_size / 2,
] # robot: x, y, sin, cos
+ [+self.map_size / 2, +self.map_size / 2, +1.0, +1.0, +self.max_advance]
* self.number_of_humans # goal: x, y
) # humans: x, y, sin, cos, speed
return spaces.box.Box(low, high, low.shape, np.float32)
@property
def action_space(self):
# adv rot
low = np.array([-1, -1])
high = np.array([+1, +1])
return spaces.box.Box(low, high, low.shape, np.float32)
@property
def metadata(self):
return {}
@property
def done(self):
return self.robot_is_done
def discrete_to_continuous_action(self, action: int):
"""
Function to return a continuous space action for a given discrete action
"""
# Linear vel --> [-1,1]: -1: Stop; 1: Move forward with max velocity
# Rotational vel --> [-1,1]: -1: Max clockwise rotation; 1: Max anti-clockwise rotation
if action == 0:
return np.array([0, 0.25], dtype=np.float32)
elif action == 1:
return np.array([0, -0.25], dtype=np.float32)
elif action == 2:
return np.array([1, 0.125], dtype=np.float32)
elif action == 3:
return np.array([1, -0.125], dtype=np.float32)
elif action == 4:
return np.array([1, 0], dtype=np.float32)
elif action == 5:
return np.array([-1, 0], dtype=np.float32)
elif action == 6:
return np.array([-0.8, +0.4], dtype=np.float32)
elif action == 7:
return np.array([-0.8, -0.4], dtype=np.float32)
else:
raise NotImplementedError
# # Turning anti-clockwise
# if action == 0:
# return np.array([0, 1.0], dtype=np.float32)
# # Turning clockwise
# elif action == 1:
# return np.array([0, -1.0], dtype=np.float32)
# # Turning anti-clockwise and moving forward
# elif action == 2:
# return np.array([1, 1.0], dtype=np.float32)
# # Turning clockwise and moving forward
# elif action == 3:
# return np.array([1, -1.0], dtype=np.float32)
# # Move forward
# elif action == 4:
# return np.array([1, 0.0], dtype=np.float32)
# # Move backward
# elif action == 5:
# return np.array([-1, 0.0], dtype=np.float32)
# # No Op
# elif action == 6:
# return np.array([0.0, 0.0], dtype=np.float32)
# else:
# raise NotImplementedError
def step(self, action_pre):
def process_action(action_pre):
action = np.array(action_pre)
action[0] = ((action[0] + 1.0) / 2.0) * self.max_advance # [-1, +1] --> [0, self.max_advance]
# action[0] = ((action[0]+0.0)/1.0)*self.max_advance # [-1, +1] --> [-self.max_advance, +self.max_advance]
action[1] = (
(action[1] + 0.0) / 1.0
) * self.max_rotation # [-1, +1] --> [-self.max_rotation, +self.max_rotation]
if action[0] < 0: # Advance must be negative
action[0] *= -1
if action[0] > self.max_advance: # Advance must be less or equal self.max_advance
action[0] = self.max_advance
if action[1] < -self.max_rotation: # Rotation must be higher than -self.max_rotation
action[1] = -self.max_rotation
elif action[1] > +self.max_rotation: # Rotation must be lower than +self.max_rotation
action[1] = +self.max_rotation
return action
def update_robot_and_humans(self, action):
# update robot
moved = action[0] * self.timestep # a. speed x time
xp = self.robot[0, 0] + np.cos(self.robot[0, 2]) * moved
yp = self.robot[0, 1] + np.sin(self.robot[0, 2]) * moved
self.robot[0, 0] = xp
self.robot[0, 1] = yp
self.robot[0, 2] -= action[1] * self.timestep # r. speed x time
# update humans' positions
for human in range(self.humans.shape[0]):
moved = self.humans[human, 3] * self.timestep # speed x time
xp = self.humans[human, 0] + np.cos(self.humans[human, 2]) * moved
yp = self.humans[human, 1] + np.sin(self.humans[human, 2]) * moved
self.humans[human, 0] = xp
self.humans[human, 1] = yp
if self.robot_is_done:
raise Exception("step call within a finished episode!")
action = process_action(action_pre)
update_robot_and_humans(self, action)
observation = self.get_observation()
reward = self.compute_reward_and_ticks()
done = self.robot_is_done
info = {}
self.cumulative_reward += reward
if self.debug > 0 and self.ticks % 50 == 0:
self.render()
elif self.debug > 1:
self.render()
if self.debug > 0 and self.robot_is_done:
print(f"cumulative reward: {self.cumulative_reward}")
return observation, reward, done, False, info
def compute_reward_and_ticks(self):
self.ticks += 1
# check for the goal's distance
distance_to_goal = np.linalg.norm(self.robot[0, 0:2] - self.goal[0, 0:2], ord=2)
# check for human-robot collisions
collision_with_a_human = False
for human in range(self.humans.shape[0]):
distance_to_human = np.linalg.norm(self.robot[0, 0:2] - self.humans[human, 0:2], ord=2)
if distance_to_human < self.human_threshold:
collision_with_a_human = True
break
# calculate the reward and update is_done
if (
self.map_size / 2 < self.robot[0, 0]
or self.robot[0, 0] < -self.map_size / 2
or self.map_size / 2 < self.robot[0, 1]
or self.robot[0, 1] < -self.map_size / 2
):
self.robot_is_done = True
reward = self.outofmap_reward
elif distance_to_goal < self.goal_threshold:
self.robot_is_done = True
reward = self.reach_reward
elif collision_with_a_human is True:
self.robot_is_done = True
reward = self.collision_reward
elif self.ticks > self.max_ticks:
self.robot_is_done = True
reward = self.maxticks_reward
else:
self.robot_is_done = False
reward = -distance_to_goal / self.distance_reward_divisor + self.alive_reward
return reward
def reset(self, seed: Optional[int] = None, options: Optional[Dict[str, Any]] = None):
self.cumulative_reward = 0
HALF_SIZE = self.map_size / 2.0 - self.margin
# robot
self.robot[0, 0] = random.uniform(-HALF_SIZE, HALF_SIZE) # x
self.robot[0, 1] = random.uniform(-HALF_SIZE, HALF_SIZE) # y
self.robot[0, 2] = 0.2 # random.uniform(-np.pi, np.pi) # orientation
# goal
self.goal[0, 0] = random.uniform(-HALF_SIZE, HALF_SIZE) # x
self.goal[0, 1] = random.uniform(-HALF_SIZE, HALF_SIZE) # y
# humans
for i in range(self.number_of_humans):
self.humans[i, 0] = random.uniform(-HALF_SIZE, HALF_SIZE) # x
self.humans[i, 1] = random.uniform(-HALF_SIZE, HALF_SIZE) # y
self.humans[i, 2] = random.uniform(-np.pi, np.pi) # orientation
self.humans[i, 3] = random.uniform(0.0, self.max_advance) # speed
self.robot_is_done = False
self.ticks = 0
return self.get_observation(), {}
def get_observation(self):
def observation_with_cos_sin_rather_than_angle(i):
output = np.empty((i.shape[0], i.shape[1] + 1))
output[:, 0:2] = i[:, 0:2]
output[:, 2] = np.cos(i[:, 2])
output[:, 3] = np.sin(i[:, 2])
if i.shape[1] > 3:
output[:, 4] = i[:, 3]
return output.flatten()
robot_obs = observation_with_cos_sin_rather_than_angle(self.robot)
goal_obs = self.goal.flatten()
humans_obs = observation_with_cos_sin_rather_than_angle(self.humans)
return np.concatenate((robot_obs, goal_obs, humans_obs)).astype(np.float32)
def render(self):
def w2px(i):
return int(self.pixel_to_world * (i + (self.map_size / 2)))
def w2py(i):
return int(self.pixel_to_world * ((self.map_size / 2) - i))
def draw_oriented_point(image, input_data, colour, radius=0.15, nose=0.1):
left = np.array(
[
input_data[0, 0] + np.cos(input_data[0, 2] + np.pi / 2) * radius,
input_data[0, 1] + np.sin(input_data[0, 2] + np.pi / 2) * radius,
]
)
right = np.array(
[
input_data[0, 0] + np.cos(input_data[0, 2] - np.pi / 2) * radius,
input_data[0, 1] + np.sin(input_data[0, 2] - np.pi / 2) * radius,
]
)
front = np.array(
[input_data[0, 0] + np.cos(input_data[0, 2]) * nose, input_data[0, 1] + np.sin(input_data[0, 2]) * nose]
)
centre = np.array([input_data[0, 0], input_data[0, 1]])
cv2.line(image, (w2px(centre[0]), w2py(centre[1])), (w2px(left[0]), w2py(left[1])), colour, 3)
cv2.line(image, (w2px(centre[0]), w2py(centre[1])), (w2px(right[0]), w2py(right[1])), colour, 3)
cv2.line(image, (w2px(centre[0]), w2py(centre[1])), (w2px(front[0]), w2py(front[1])), colour, 5)
if not self.window_initialised:
cv2.namedWindow("world", cv2.WINDOW_NORMAL)
cv2.resizeWindow("world", int(self.resolution_view), int(self.resolution_view))
self.window_initialised = True
# empty the image
self.world_image = (np.ones((int(self.resolution), int(self.resolution), 3)) * 255).astype(np.uint8)
# draws robot
draw_oriented_point(self.world_image, self.robot, (255, 0, 0))
# draws goal
cv2.circle(
self.world_image,
(w2px(self.goal[0, 0]), w2py(self.goal[0, 1])),
int(self.goal_radius * 100.0),
(0, 255, 0),
2,
)
# draws humans
for human in range(self.number_of_humans):
draw_oriented_point(self.world_image, self.humans[human : human + 1, :], (0, 0, 255))
# shows image
cv2.imshow("world", self.world_image)
k = cv2.waitKey(self.milliseconds)
if k % 255 == 27:
sys.exit(0)
def close(self):
pass
class DiscreteSocNavEnv(SocNavEnv):
def __init__(self, advance_split=5, rotation_split=5):
super().__init__()
self.advance_split = advance_split
self.rotation_split = rotation_split
self.action_map = self.build_action_map(advance_split, rotation_split)
def build_action_map(self, advance_split, rotation_split):
advance_grid, rotation_grid = np.meshgrid(np.linspace(-1, 1, advance_split), np.linspace(-1, 1, rotation_split))
return np.hstack(
(
advance_grid.reshape((advance_split * rotation_split, 1)),
rotation_grid.reshape((advance_split * rotation_split, 1)),
)
)
@property
def action_space(self):
return spaces.Discrete(self.advance_split * self.rotation_split)
def step(self, action_idx):
# print(action_idx, self.action_map[action_idx])
return SocNavEnv.step(self, self.action_map[action_idx])
The config for the environment is the following, to be placed under sheeprl/configs/envs/socnav.yaml
:
defaults:
- default
- _self_
# Override from `default` config
id: SocNav
num_envs: 16
frame_stack: 1
sync_env: False
screen_size: 64
action_repeat: 1
grayscale: False
clip_rewards: False
capture_video: True
frame_stack_dilation: 1
max_episode_steps: null
reward_as_observation: False
# Wrapper to be instantiated
wrapper:
_target_: sheeprl.envs.socnav.SocNavEnv
resolution: 700.0
resolution_view: 1000.0
map_size: 8.0
margin: 0.5
max_ticks: 250
timestep: 0.1
robot_radius: 0.15
goal_radius: 0.5
number_of_humans: 5
human_threshold: 0.4
reach_reward: 1.0
outofmap_reward: -0.5
maxticks_reward: -0.5
alive_reward: -0.00001
collision_reward: -1.0
distance_reward_divisor: 1000
max_advance: 1.4
milliseconds: 30
debug: 0
Then you can run any algorithm with: python sheeprl.py algo=ppo algo.mlp_keys.encoder=[state] env=socnav
Can you try this?
Hi @goodluckoguzie, here there's an example of your environment compliant with sheeprl, to be placed under
sheeprl/envs/socnav.py
:import random import sys from typing import Any, Dict, Optional import cv2 import gymnasium as gym import numpy as np from gymnasium import spaces # TO DO List # # # Not urgent: # - Improve how the robot moves (actual differential platform) # class SocNavEnv(gym.Env): def __init__( self, resolution=700.0, resolution_view=1000.0, map_size=8.0, margin=0.5, max_ticks=250, timestep=0.1, robot_radius=0.15, goal_radius=0.5, number_of_humans=5, human_threshold=0.4, reach_reward=1.0, outofmap_reward=-0.5, maxticks_reward=-0.5, alive_reward=-0.00001, collision_reward=-1.0, distance_reward_divisor=1000, max_advance=1.4, max_rotation=np.pi / 2, milliseconds=30, debug: int = 0, ): super().__init__() self.resolution = resolution self.resolution_view = resolution_view self.map_size = map_size self.margin = margin self.max_ticks = max_ticks self.timestep = timestep self.robot_radius = robot_radius self.goal_radius = goal_radius self.number_of_humans = number_of_humans self.human_threshold = human_threshold self.reach_reward = reach_reward self.outofmap_reward = outofmap_reward self.maxticks_reward = maxticks_reward self.alive_reward = alive_reward self.collision_reward = collision_reward self.distance_reward_divisor = distance_reward_divisor self.max_advance = max_advance self.max_rotation = max_rotation self.milliseconds = milliseconds self.debug = debug self.goal_threshold = (self.robot_radius + self.goal_radius,) self.pixel_to_world = (resolution / map_size,) self.window_initialised = False self.ticks = 0 self.humans = np.zeros((self.number_of_humans, 4)) # x, y, orientation, speed self.robot = np.zeros((1, 3)) # x, y, orientation self.goal = np.zeros((1, 2)) # x, y self.robot_is_done = True self.world_image = np.zeros((int(self.resolution), int(self.resolution), 3)) self.reset() @property def observation_space(self): low = np.array( [ -self.map_size / 2, -self.map_size / 2, -1.0, -1.0, -self.map_size / 2, -self.map_size / 2, ] # robot: x, y, sin, cos + [-self.map_size / 2, -self.map_size / 2, -1.0, -1.0, -self.max_advance] * self.number_of_humans # goal: x, y ) # humans: x, y, sin, cos, speed high = np.array( [ +self.map_size / 2, +self.map_size / 2, +1.0, +1.0, +self.map_size / 2, +self.map_size / 2, ] # robot: x, y, sin, cos + [+self.map_size / 2, +self.map_size / 2, +1.0, +1.0, +self.max_advance] * self.number_of_humans # goal: x, y ) # humans: x, y, sin, cos, speed return spaces.box.Box(low, high, low.shape, np.float32) @property def action_space(self): # adv rot low = np.array([-1, -1]) high = np.array([+1, +1]) return spaces.box.Box(low, high, low.shape, np.float32) @property def metadata(self): return {} @property def done(self): return self.robot_is_done def discrete_to_continuous_action(self, action: int): """ Function to return a continuous space action for a given discrete action """ # Linear vel --> [-1,1]: -1: Stop; 1: Move forward with max velocity # Rotational vel --> [-1,1]: -1: Max clockwise rotation; 1: Max anti-clockwise rotation if action == 0: return np.array([0, 0.25], dtype=np.float32) elif action == 1: return np.array([0, -0.25], dtype=np.float32) elif action == 2: return np.array([1, 0.125], dtype=np.float32) elif action == 3: return np.array([1, -0.125], dtype=np.float32) elif action == 4: return np.array([1, 0], dtype=np.float32) elif action == 5: return np.array([-1, 0], dtype=np.float32) elif action == 6: return np.array([-0.8, +0.4], dtype=np.float32) elif action == 7: return np.array([-0.8, -0.4], dtype=np.float32) else: raise NotImplementedError # # Turning anti-clockwise # if action == 0: # return np.array([0, 1.0], dtype=np.float32) # # Turning clockwise # elif action == 1: # return np.array([0, -1.0], dtype=np.float32) # # Turning anti-clockwise and moving forward # elif action == 2: # return np.array([1, 1.0], dtype=np.float32) # # Turning clockwise and moving forward # elif action == 3: # return np.array([1, -1.0], dtype=np.float32) # # Move forward # elif action == 4: # return np.array([1, 0.0], dtype=np.float32) # # Move backward # elif action == 5: # return np.array([-1, 0.0], dtype=np.float32) # # No Op # elif action == 6: # return np.array([0.0, 0.0], dtype=np.float32) # else: # raise NotImplementedError def step(self, action_pre): def process_action(action_pre): action = np.array(action_pre) action[0] = ((action[0] + 1.0) / 2.0) * self.max_advance # [-1, +1] --> [0, self.max_advance] # action[0] = ((action[0]+0.0)/1.0)*self.max_advance # [-1, +1] --> [-self.max_advance, +self.max_advance] action[1] = ( (action[1] + 0.0) / 1.0 ) * self.max_rotation # [-1, +1] --> [-self.max_rotation, +self.max_rotation] if action[0] < 0: # Advance must be negative action[0] *= -1 if action[0] > self.max_advance: # Advance must be less or equal self.max_advance action[0] = self.max_advance if action[1] < -self.max_rotation: # Rotation must be higher than -self.max_rotation action[1] = -self.max_rotation elif action[1] > +self.max_rotation: # Rotation must be lower than +self.max_rotation action[1] = +self.max_rotation return action def update_robot_and_humans(self, action): # update robot moved = action[0] * self.timestep # a. speed x time xp = self.robot[0, 0] + np.cos(self.robot[0, 2]) * moved yp = self.robot[0, 1] + np.sin(self.robot[0, 2]) * moved self.robot[0, 0] = xp self.robot[0, 1] = yp self.robot[0, 2] -= action[1] * self.timestep # r. speed x time # update humans' positions for human in range(self.humans.shape[0]): moved = self.humans[human, 3] * self.timestep # speed x time xp = self.humans[human, 0] + np.cos(self.humans[human, 2]) * moved yp = self.humans[human, 1] + np.sin(self.humans[human, 2]) * moved self.humans[human, 0] = xp self.humans[human, 1] = yp if self.robot_is_done: raise Exception("step call within a finished episode!") action = process_action(action_pre) update_robot_and_humans(self, action) observation = self.get_observation() reward = self.compute_reward_and_ticks() done = self.robot_is_done info = {} self.cumulative_reward += reward if self.debug > 0 and self.ticks % 50 == 0: self.render() elif self.debug > 1: self.render() if self.debug > 0 and self.robot_is_done: print(f"cumulative reward: {self.cumulative_reward}") return observation, reward, done, False, info def compute_reward_and_ticks(self): self.ticks += 1 # check for the goal's distance distance_to_goal = np.linalg.norm(self.robot[0, 0:2] - self.goal[0, 0:2], ord=2) # check for human-robot collisions collision_with_a_human = False for human in range(self.humans.shape[0]): distance_to_human = np.linalg.norm(self.robot[0, 0:2] - self.humans[human, 0:2], ord=2) if distance_to_human < self.human_threshold: collision_with_a_human = True break # calculate the reward and update is_done if ( self.map_size / 2 < self.robot[0, 0] or self.robot[0, 0] < -self.map_size / 2 or self.map_size / 2 < self.robot[0, 1] or self.robot[0, 1] < -self.map_size / 2 ): self.robot_is_done = True reward = self.outofmap_reward elif distance_to_goal < self.goal_threshold: self.robot_is_done = True reward = self.reach_reward elif collision_with_a_human is True: self.robot_is_done = True reward = self.collision_reward elif self.ticks > self.max_ticks: self.robot_is_done = True reward = self.maxticks_reward else: self.robot_is_done = False reward = -distance_to_goal / self.distance_reward_divisor + self.alive_reward return reward def reset(self, seed: Optional[int] = None, options: Optional[Dict[str, Any]] = None): self.cumulative_reward = 0 HALF_SIZE = self.map_size / 2.0 - self.margin # robot self.robot[0, 0] = random.uniform(-HALF_SIZE, HALF_SIZE) # x self.robot[0, 1] = random.uniform(-HALF_SIZE, HALF_SIZE) # y self.robot[0, 2] = 0.2 # random.uniform(-np.pi, np.pi) # orientation # goal self.goal[0, 0] = random.uniform(-HALF_SIZE, HALF_SIZE) # x self.goal[0, 1] = random.uniform(-HALF_SIZE, HALF_SIZE) # y # humans for i in range(self.number_of_humans): self.humans[i, 0] = random.uniform(-HALF_SIZE, HALF_SIZE) # x self.humans[i, 1] = random.uniform(-HALF_SIZE, HALF_SIZE) # y self.humans[i, 2] = random.uniform(-np.pi, np.pi) # orientation self.humans[i, 3] = random.uniform(0.0, self.max_advance) # speed self.robot_is_done = False self.ticks = 0 return self.get_observation(), {} def get_observation(self): def observation_with_cos_sin_rather_than_angle(i): output = np.empty((i.shape[0], i.shape[1] + 1)) output[:, 0:2] = i[:, 0:2] output[:, 2] = np.cos(i[:, 2]) output[:, 3] = np.sin(i[:, 2]) if i.shape[1] > 3: output[:, 4] = i[:, 3] return output.flatten() robot_obs = observation_with_cos_sin_rather_than_angle(self.robot) goal_obs = self.goal.flatten() humans_obs = observation_with_cos_sin_rather_than_angle(self.humans) return np.concatenate((robot_obs, goal_obs, humans_obs)).astype(np.float32) def render(self): def w2px(i): return int(self.pixel_to_world * (i + (self.map_size / 2))) def w2py(i): return int(self.pixel_to_world * ((self.map_size / 2) - i)) def draw_oriented_point(image, input_data, colour, radius=0.15, nose=0.1): left = np.array( [ input_data[0, 0] + np.cos(input_data[0, 2] + np.pi / 2) * radius, input_data[0, 1] + np.sin(input_data[0, 2] + np.pi / 2) * radius, ] ) right = np.array( [ input_data[0, 0] + np.cos(input_data[0, 2] - np.pi / 2) * radius, input_data[0, 1] + np.sin(input_data[0, 2] - np.pi / 2) * radius, ] ) front = np.array( [input_data[0, 0] + np.cos(input_data[0, 2]) * nose, input_data[0, 1] + np.sin(input_data[0, 2]) * nose] ) centre = np.array([input_data[0, 0], input_data[0, 1]]) cv2.line(image, (w2px(centre[0]), w2py(centre[1])), (w2px(left[0]), w2py(left[1])), colour, 3) cv2.line(image, (w2px(centre[0]), w2py(centre[1])), (w2px(right[0]), w2py(right[1])), colour, 3) cv2.line(image, (w2px(centre[0]), w2py(centre[1])), (w2px(front[0]), w2py(front[1])), colour, 5) if not self.window_initialised: cv2.namedWindow("world", cv2.WINDOW_NORMAL) cv2.resizeWindow("world", int(self.resolution_view), int(self.resolution_view)) self.window_initialised = True # empty the image self.world_image = (np.ones((int(self.resolution), int(self.resolution), 3)) * 255).astype(np.uint8) # draws robot draw_oriented_point(self.world_image, self.robot, (255, 0, 0)) # draws goal cv2.circle( self.world_image, (w2px(self.goal[0, 0]), w2py(self.goal[0, 1])), int(self.goal_radius * 100.0), (0, 255, 0), 2, ) # draws humans for human in range(self.number_of_humans): draw_oriented_point(self.world_image, self.humans[human : human + 1, :], (0, 0, 255)) # shows image cv2.imshow("world", self.world_image) k = cv2.waitKey(self.milliseconds) if k % 255 == 27: sys.exit(0) def close(self): pass class DiscreteSocNavEnv(SocNavEnv): def __init__(self, advance_split=5, rotation_split=5): super().__init__() self.advance_split = advance_split self.rotation_split = rotation_split self.action_map = self.build_action_map(advance_split, rotation_split) def build_action_map(self, advance_split, rotation_split): advance_grid, rotation_grid = np.meshgrid(np.linspace(-1, 1, advance_split), np.linspace(-1, 1, rotation_split)) return np.hstack( ( advance_grid.reshape((advance_split * rotation_split, 1)), rotation_grid.reshape((advance_split * rotation_split, 1)), ) ) @property def action_space(self): return spaces.Discrete(self.advance_split * self.rotation_split) def step(self, action_idx): # print(action_idx, self.action_map[action_idx]) return SocNavEnv.step(self, self.action_map[action_idx])
The config for the environment is the following, to be placed under
sheeprl/configs/envs/socnav.yaml
:defaults: - default - _self_ # Override from `default` config id: SocNav num_envs: 16 frame_stack: 1 sync_env: False screen_size: 64 action_repeat: 1 grayscale: False clip_rewards: False capture_video: True frame_stack_dilation: 1 max_episode_steps: null reward_as_observation: False # Wrapper to be instantiated wrapper: _target_: sheeprl.envs.socnav.SocNavEnv resolution: 700.0 resolution_view: 1000.0 map_size: 8.0 margin: 0.5 max_ticks: 250 timestep: 0.1 robot_radius: 0.15 goal_radius: 0.5 number_of_humans: 5 human_threshold: 0.4 reach_reward: 1.0 outofmap_reward: -0.5 maxticks_reward: -0.5 alive_reward: -0.00001 collision_reward: -1.0 distance_reward_divisor: 1000 max_advance: 1.4 milliseconds: 30 debug: 0
Then you can run any algorithm with:
python sheeprl.py algo=ppo algo.mlp_keys.encoder=[state] env=socnav
Can you try this?
Thanks very much I will try it now
Its working now thanks
Thank you for simplifying reinforcement learning algorithms and models. I have been trying to add my custom environment to SharpRL, but despite following your steps, I am still encountering errors.
I would greatly appreciate your assistance. I want to train my agent using Dreamer v3. My custom gym environment, designed for Social Navigation (Socnav), is relatively simple and includes four mobile human-like obstacles and two static obstacles, along with an agent and its goal. The goal is to train the agent to navigate towards its objective while avoiding obstacles.
The observation space in my environment is a flat, vector-based observation with 23 dimensions, not an image. The action space is continuous and has two dimensions.
I am looking to write a script that will use Dreamer v3, specifically tailored for my environment. The script should include the following components:
envs
: My environmentconfigs.yaml
dreamer.py
exploration.py
models.py
networks.py
parallel.py
requirements.txt
tools.py
I need specific and clear code that I can later modify for other environments, including those that are image-based. I am finding it challenging to adapt Dreamer v3 to my current flat vector-based observation space.
Attached is a zip file that contains my environment and a simple script for testing it. I would greatly appreciate your help or guidance on how to use Dreamer v3 with a flat vector-based observation space. Thank you!
.
https://www.dropbox.com/scl/fi/z13wkqgjtuszlt3oimxd4/SacNav.zip?rlkey=f0y2yctxa5ajjt442na8abk4l&dl=0