Which means some registration is happening but I am not sure why is it not getting identified when I use some standard baseline code like sac.py
Code
from typing import Any, Dict, List, Optional, Union
import numpy as np
import sapien
import sapien.physx as physx
import torch
import trimesh
from mani_skill import PACKAGE_ASSET_DIR
from mani_skill.agents.robots import Fetch
from mani_skill.envs.sapien_env import BaseEnv
from mani_skill.envs.utils import randomization
from mani_skill.sensors.camera import CameraConfig
from mani_skill.utils import common, sapien_utils
from mani_skill.utils.building import actors, articulations
from mani_skill.utils.building.ground import build_ground
from mani_skill.utils.geometry.geometry import transform_points
from mani_skill.utils.io_utils import load_json
from mani_skill.utils.registration import register_env
from mani_skill.utils.structs import Articulation, Link, Pose
from mani_skill.utils.structs.types import GPUMemoryConfig, SimConfig
@register_env(
"FetchNearCabinet-v1",
asset_download_ids=["partnet_mobility_cabinet"],
max_episode_steps=100,
)
class FetchNearCabinet(BaseEnv):
SUPPORTED_ROBOTS = ["fetch"]
agent: Union[Fetch]
handle_types = ["prismatic"]
# find and fill this: PACKAGE_ASSET_DIR = pass
TRAIN_JSON = (
PACKAGE_ASSET_DIR / "partnet_mobility/meta/info_cabinet_drawer_train.json"
)
def __init__(
self,
*args,
robot_uids="fetch",
robot_init_qpos_noise=0.02,
reconfiguration_freq=None,
num_envs=1,
**kwargs,
):
self.robot_init_qpos_noise = robot_init_qpos_noise
train_data = load_json(self.TRAIN_JSON)
self.all_model_ids = np.array(list(train_data.keys()))
# self.all_model_ids = np.array(["1004", "1004"])
if reconfiguration_freq is None:
# if not user set, we pick a number
if num_envs == 1:
reconfiguration_freq = 1
else:
reconfiguration_freq = 0
super().__init__(
*args,
robot_uids=robot_uids,
reconfiguration_freq=reconfiguration_freq,
num_envs=num_envs,
**kwargs,
)
@property
def _default_sim_config(self):
return SimConfig(
spacing=5,
gpu_memory_config=GPUMemoryConfig(
max_rigid_contact_count=2**21, max_rigid_patch_count=2**19
),
)
@property
def _default_human_render_camera_configs(self):
pose = sapien_utils.look_at(eye=[-1.8, -1.3, 1.8], target=[-0.3, 0.5, 0])
return CameraConfig(
"render_camera", pose=pose, width=512, height=512, fov=1, near=0.01, far=100
)
@property
def _default_sensor_configs(self):
return []
def _load_agent(self, options: dict):
super()._load_agent(options, sapien.Pose(p=[0, 0, 1]))
def _load_scene(self, options: dict):
self.ground = build_ground(self.scene)
# temporarily turn off the logging as there will be big red warnings
# about the cabinets having oblong meshes which we ignore for now.
sapien.set_log_level("off")
self._load_cabinets(self.handle_types)
sapien.set_log_level("warn")
from mani_skill.agents.robots.fetch import FETCH_WHEELS_COLLISION_BIT
self.ground.set_collision_group_bit(
group=2, bit_idx=FETCH_WHEELS_COLLISION_BIT, bit=1
)
self.ground.set_collision_group_bit(
group=2, bit_idx=CABINET_COLLISION_BIT, bit=1
)
def _load_cabinets(self, joint_types: List[str]):
model_id = 1000 # see this, might not be what I want
cabinet_builder = articulations.get_articulation_builder(
self.scene, f"partnet-mobility:{model_id}"
)
cabinet_builder.set_scene_idxs(scene_idxs=[0]) # Single cabinet, index 0
cabinet_builder.initial_pose = sapien.Pose(p=[0, 0, 0], q=[1, 0, 0, 0])
# Build the cabinet
cabinet = cabinet_builder.build(name=f"{model_id}")
self.remove_from_state_dict_registry(cabinet)
# Disable self-collisions for the cabinet. Should this be done?? <----------------------------
for link in cabinet.links:
link.set_collision_group_bit(
group=2, bit_idx=CABINET_COLLISION_BIT, bit=1
)
# Store the cabinet in the object
self.cabinet = cabinet
self.add_to_state_dict_registry(self.cabinet)
# Handle specific links based on joint types
handle_links = []
handle_links_meshes = []
for link, joint in zip(cabinet.links, cabinet.joints):
if joint.type[0] in joint_types:
handle_links.append(link)
# Save the first mesh in the link object that corresponds to a handle
handle_links_meshes.append(
link.generate_mesh(
filter=lambda _, render_shape: "handle" in render_shape.name,
mesh_name="handle",
)[0]
)
# Handle merging logic for links and meshes
if handle_links:
self.handle_link = handle_links[0]
self.handle_link_pos = common.to_tensor(
np.array(
[handle_links_meshes[0].bounding_box.center_mass]
),
device=self.device,
)
else:
self.handle_link = None
self.handle_link_pos = None
# Add a goal sphere for visualization
self.handle_link_goal = actors.build_sphere(
self.scene,
radius=0.02,
color=[0, 1, 0, 1],
name="handle_link_goal",
body_type="kinematic",
add_collision=False,
initial_pose=sapien.Pose(p=[0, 0, 0], q=[1, 0, 0, 0]),
)
def _after_reconfigure(self, options):
# To spawn cabinets in the right place, we need to change their z position such that
# the bottom of the cabinet sits at z=0 (the floor). Luckily the partnet mobility dataset is made such that
# the negative of the lower z-bound of the collision mesh bounding box is the right value
# this code is in _after_reconfigure since retrieving collision meshes requires the GPU to be initialized
# which occurs after the initial reconfigure call (after self._load_scene() is called)
self.cabinet_zs = []
for cabinet in self._cabinets:
collision_mesh = cabinet.get_first_collision_mesh()
self.cabinet_zs.append(-collision_mesh.bounding_box.bounds[0, 2])
self.cabinet_zs = common.to_tensor(self.cabinet_zs, device=self.device)
# get the qmin qmax values of the joint corresponding to the selected links
target_qlimits = self.handle_link.joint.limits # [b, 1, 2]
qmin, qmax = target_qlimits[..., 0], target_qlimits[..., 1]
self.target_qpos = qmin + (qmax - qmin) * self.min_open_frac
def handle_link_positions(self, env_idx: Optional[torch.Tensor] = None):
if env_idx is None:
return transform_points(
self.handle_link.pose.to_transformation_matrix().clone(),
common.to_tensor(self.handle_link_pos, device=self.device),
)
return transform_points(
self.handle_link.pose[env_idx].to_transformation_matrix().clone(),
common.to_tensor(self.handle_link_pos[env_idx], device=self.device),
)
# I have no idea how it works, I am assuming it to work
def _initialize_episode(self, env_idx: torch.Tensor, options: dict):
with torch.device(self.device):
b = len(env_idx)
xy = torch.zeros((b, 3))
xy[:, 2] = self.cabinet_zs[env_idx]
self.cabinet.set_pose(Pose.create_from_pq(p=xy))
# initialize robot
if self.robot_uids == "fetch":
qpos = torch.tensor(
[
0,
0,
0,
0,
0,
0,
0,
-np.pi / 4,
0,
np.pi / 4,
0,
np.pi / 3,
0,
0.015,
0.015,
]
)
qpos = qpos.repeat(b).reshape(b, -1)
dist = randomization.uniform(1.6, 1.8, size=(b,))
theta = randomization.uniform(0.9 * torch.pi, 1.1 * torch.pi, size=(b,))
xy = torch.zeros((b, 2))
xy[:, 0] += torch.cos(theta) * dist
xy[:, 1] += torch.sin(theta) * dist
qpos[:, :2] = xy
noise_ori = randomization.uniform(
-0.05 * torch.pi, 0.05 * torch.pi, size=(b,)
)
ori = (theta - torch.pi) + noise_ori
qpos[:, 2] = ori
self.agent.robot.set_qpos(qpos)
self.agent.robot.set_pose(sapien.Pose())
# close all the cabinets. We know beforehand that lower qlimit means "closed" for these assets.
qlimits = self.cabinet.get_qlimits() # [b, self.cabinet.max_dof, 2])
self.cabinet.set_qpos(qlimits[env_idx, :, 0])
self.cabinet.set_qvel(self.cabinet.qpos[env_idx] * 0)
# NOTE (stao): This is a temporary work around for the issue where the cabinet drawers/doors might open
# themselves on the first step. It's unclear why this happens on GPU sim only atm.
# moreover despite setting qpos/qvel to 0, the cabinets might still move on their own a little bit.
# this may be due to oblong meshes.
if self.gpu_sim_enabled:
self.scene._gpu_apply_all()
self.scene.px.gpu_update_articulation_kinematics()
self.scene.px.step()
self.scene._gpu_fetch_all()
self.handle_link_goal.set_pose(
Pose.create_from_pq(p=self.handle_link_positions(env_idx))
)
def _after_control_step(self):
# after each control step, we update the goal position of the handle link
# for GPU sim we need to update the kinematics data to get latest pose information for up to date link poses
# and fetch it, followed by an apply call to ensure the GPU sim is up to date
if self.gpu_sim_enabled:
self.scene.px.gpu_update_articulation_kinematics()
self.scene._gpu_fetch_all()
self.handle_link_goal.set_pose(
Pose.create_from_pq(p=self.handle_link_positions())
)
if self.gpu_sim_enabled:
self.scene._gpu_apply_all()
# I don't understand, but ok
def evaluate(self):
# even though self.handle_link is a different link across different articulations
# we can still fetch a joint that represents the parent joint of all those links
# and easily get the qpos value.
open_enough = self.handle_link.joint.qpos >= self.target_qpos
handle_link_pos = self.handle_link_positions()
link_is_static = (
torch.linalg.norm(self.handle_link.angular_velocity, axis=1) <= 1
) & (torch.linalg.norm(self.handle_link.linear_velocity, axis=1) <= 0.1)
return {
"success": open_enough & link_is_static,
"handle_link_pos": handle_link_pos,
"open_enough": open_enough,
}
def _get_obs_extra(self, info: Dict):
obs = dict(
tcp_pose=self.agent.tcp.pose.raw_pose,
)
if "state" in self.obs_mode:
obs.update(
tcp_to_handle_pos=info["handle_link_pos"] - self.agent.tcp.pose.p,
target_link_qpos=self.handle_link.joint.qpos,
target_handle_pos=info["handle_link_pos"],
)
return obs
def compute_dense_reward(self, obs: Any, action: torch.Tensor, info: Dict):
tcp_to_handle_dist = torch.linalg.norm(
self.agent.tcp.pose.p - info["handle_link_pos"], axis=1
)
reaching_reward = 1 - torch.tanh(5 * tcp_to_handle_dist)
amount_to_open_left = torch.div(
self.target_qpos - self.handle_link.joint.qpos, self.target_qpos
)
open_reward = 2 * (1 - amount_to_open_left)
reaching_reward[
amount_to_open_left < 0.999
] = 2 # if joint opens even a tiny bit, we don't need reach reward anymore
# print(open_reward.shape)
open_reward[info["open_enough"]] = 3 # give max reward here
reward = reaching_reward + open_reward
reward[info["success"]] = 5.0
return reward
# isme bas max reward se divide kar deta hai
def compute_normalized_dense_reward(
self, obs: Any, action: torch.Tensor, info: Dict
):
max_reward = 5.0
return self.compute_dense_reward(obs=obs, action=action, info=info) / max_reward
Hello! I am stuck on creating my own environment in ManiSkill. I do not seem to find any documentation to do so.
This is the code I have (based on OpenCabinetDrawer).
I tried running this code directly and it runs with no errors. Also if I try to do a
@register_env
again in the code, it says:mani_skill - WARNING - Env FetchNearCabinet-v1 is already registered. Skip registration.
Which means some registration is happening but I am not sure why is it not getting identified when I use some standard baseline code like
sac.py
Code