Closed Felixvillas closed 11 hours ago
@yukezhu @cremebrule Sorry to bother. May I ask you two to clarify my doubts?
It's relative to the current end effector pose. However, the OSC controller uses a specific mujoco site as the reference. The name of the site can be found in env.robots[0].controller.eef_name
, and then you can query its position and rotation with obj_id = env.sim.model.site_name2id(env.robots[0].controller.eef_name)
and then obj_pos = np.array(env.sim.data.site_xpos[obj_id])
and obj_rot = np.array(env.sim.data.site_xmat[obj_id].reshape(3, 3))
.
Reference this issue comment: https://github.com/ARISE-Initiative/robosuite/issues/400#issuecomment-2276790526.
I have modified (perhaps not an improvement, I am not sure) the code in this issue comment as following for data collection.
import os
import sys
import joblib
import datetime
import argparse
import pdb
import numpy as np
import robosuite as suite
import robosuite.macros as macros
from robosuite.controllers import load_controller_config
from robosuite.utils import camera_utils as CU
from robosuite.utils import transform_utils as TU
from PIL import Image
import cv2
import random
import pickle as pkl
import json
from copy import deepcopy
import imageio
# if only use colors, set "opencv"
macros.IMAGE_CONVENTION = "opencv"
RED = "\033[31m"
GREEN = "\033[32m"
RESET = "\033[0m"
from OpenGL import GL
def ignore_gl_errors(*args, **kwargs):
pass
GL.glCheckError = ignore_gl_errors
def imgs2video(imgs, video_dir, fps=20):
assert len(imgs) != 0
frame = imgs[0]
h, w, l = frame.shape
video = cv2.VideoWriter(video_dir, cv2.VideoWriter_fourcc(*'mp4v'), fps, (w, h))
for img in imgs:
video.write(img)
video.release()
# def save_video(video_path_name, datas, fps=20):
# if not video_path_name.endswith(".mp4"):
# video_path_name += ".mp4"
# video_writer = imageio.get_writer(video_path_name, fps=fps)
# # print(f"number of eps: {len(datas)}")
# for data in datas:
# for img in data:
# video_writer.append_data(img[::-1])
# video_writer.close()
imgs = []
demo_lists = []
# def osc_control_to(env, ref_state, desc=20, threshold=0.02, info=""):
# """
# Works for most robots
# Args
# ----
# ref_state: np.array([x, y, z, ax, ay, az, grip])
# desc: Number of waypoints to goal
# threshold: Error allowed
# Notes
# -----
# Control using OSC
# Might need tuning for each robot based on the impedances
# """
# robot = env.robots[0]
# controller = robot.controller
# # control logic
# ref_pose = ref_state[:-1]
# gripper_pos = ref_state[-1]
# cur_pose = np.array(
# [*controller.ee_pos, *TU.quat2axisangle(TU.mat2quat(controller.ee_ori_mat))]
# )
# ori_delta = (ref_pose - cur_pose) / desc
# next_pose = cur_pose + ori_delta
# step_itr = 1
# step_itr_total = 0
# while np.linalg.norm(ref_pose - cur_pose) > threshold:
# itr = 0
# delta = ori_delta
# while np.linalg.norm(next_pose - cur_pose) > threshold:
# # print(f"delta: {delta}")
# delta = ref_pose - cur_pose
# delta[3:6] = 0
# # delta *= 10
# obs, reward, done, _ = env.step(np.concatenate((delta, np.array([gripper_pos])), 0))
# img = obs[args.view + '_image']
# imgs.append(img)
# print(f"{info}, reward: {reward}, step_itr_total: {step_itr_total}")
# # if reward >= 0.99:
# # break
# # env.render()
# cur_pose = np.array(
# [
# *controller.ee_pos,
# *TU.quat2axisangle(TU.mat2quat(controller.ee_ori_mat)),
# ]
# )
# delta = next_pose - cur_pose
# # to avoid oscillations
# # ideally should decay kp?
# delta = delta / np.log2(2 + itr) # slowly decaying
# itr += 1
# step_itr_total += 1
# if itr > 200:
# break
# # if reward >= 0.99:
# # break
# if step_itr < desc:
# next_pose += ori_delta
# step_itr += 1
# return step_itr_total
def osc_control_to(env, obs, ref_state, desc=20, threshold=0.02, info="", ingore_break=False):
"""
Works for most robots
Args
----
ref_state: np.array([x, y, z, ax, ay, az, grip])
desc: Number of waypoints to goal
threshold: Error allowed
Notes
-----
Control using OSC
Might need tuning for each robot based on the impedances
"""
demo_list = []
robot = env.robots[0]
controller = robot.controller
# control logic
ref_pose = ref_state[:-1]
gripper_pos = ref_state[-1]
cur_pose = np.array(
[*controller.ee_pos, *TU.quat2axisangle(TU.mat2quat(controller.ee_ori_mat))]
)
step_itr = 1
step_itr_total = 0
# print(f"ref_pos: {ref_pose}, cur_pos: {cur_pose}, the: {np.linalg.norm(ref_pose - cur_pose)}")
reward = None
cur_state = deepcopy(obs)
while ingore_break or np.linalg.norm(ref_pose - cur_pose) > threshold:
itr = 0
delta = ref_pose - cur_pose
delta[3:6] = 0
action = np.concatenate((delta, np.array([gripper_pos])), 0) * 2
try:
obs, reward, done, _ = env.step(action)
except Exception as e:
break
demo_list.append(deepcopy((cur_state, action, reward, obs, done)))
cur_state = deepcopy(obs)
# img = obs[args.view + '_image']
# imgs.append(img)
step_itr_total += 1
# print(f"{info}, reward: {reward}, step_itr_total: {step_itr_total}")
# if reward >= 0.99 or done:
# break
if (not ingore_break) and (reward >= 0.99 or done):
break
# env.render()
cur_pose = np.array(
[
*controller.ee_pos,
*TU.quat2axisangle(TU.mat2quat(controller.ee_ori_mat)),
]
)
return step_itr_total, reward, demo_list, cur_state
parser = argparse.ArgumentParser()
parser.add_argument('-n', '--num', default=20, type=int)
parser.add_argument('-v', '--view', default='frontview', choices=['frontview', 'agentview', 'birdview'])
parser.add_argument('-t', '--task', default='lift', choices=['lift', 'stack', 'jimu'])
args = parser.parse_args()
# ENV_NAME="Lift"
ENV_NAME = args.task.capitalize()
CONTROL_FREQ = 20
META_REWARD = False
LIFTED = False
RGBD = False
### comment init_envs and init your envs by suite.make ###
from env_robosuite import init_envs
train_envs, test_envs, env = init_envs(
variant=json.load(open(f"./config/{ENV_NAME.lower()}.json")),
training_num=1,
test_num=1,
video=RGBD,
control_freq=CONTROL_FREQ,
meta_world=META_REWARD,
lifted=LIFTED,
demo_gen=True,
)
#### saved parameters ####
saved_paras = {
"control_freq": CONTROL_FREQ,
"args_num": args.num,
"rgbd": RGBD,
"meta_world": META_REWARD,
"lifted": LIFTED
}
#### saved parameters ####
eps = 3000
number_interval = 500
cur_eps = 0
total_eps = 0
demo_dir = os.path.join(os.environ.get("project_data_path"), 'robosuite', ENV_NAME)
# os.makedirs(demo_dir, exist_ok=True)
current_time = datetime.datetime.now()
save_dir = os.path.join(demo_dir, current_time.strftime("%Y-%m-%d_%H-%M-%S"))
os.makedirs(save_dir, exist_ok=True)
# try:
while cur_eps < eps:
demo_list = []
step_itr_total = 0
obs = env.reset()
# pdb.set_trace()
if ENV_NAME == "Jimu":
pick_pos = env.sim.data.body_xpos[env.sim.model.body_name2id(env.jimu_tgt_cubes[-1].root_body)]
cubeB_pos_ranges = env.tgt_cube_poses[-1]
cubeB_pos = [
(cubeB_pos_ranges[0][0]+cubeB_pos_ranges[0][1])/2,
(cubeB_pos_ranges[1][0]+cubeB_pos_ranges[1][1])/2,
(cubeB_pos_ranges[2][0]+cubeB_pos_ranges[2][1])/2-0.02,
]
place_pos = np.array(cubeB_pos)
elif ENV_NAME == "Stack":
# pick cubeA and place it ob cubeB? or
# pick cubeB and place it ob cubeA?
pick_pos = env.sim.data.body_xpos[env.sim.model.body_name2id(env.cubeA.root_body)]
place_pos = env.sim.data.body_xpos[env.sim.model.body_name2id(env.cubeB.root_body)]
elif ENV_NAME == "Lift":
pick_pos = env.sim.data.body_xpos[env.sim.model.body_name2id(env.cube.root_body)]
# pdb.set_trace()
# raise NotImplementedError
else:
raise NotImplementedError
def rotation_matrix(rot_angle, axis = 'z'):
if axis == "x":
return TU.quat2mat(np.array([np.cos(rot_angle / 2), np.sin(rot_angle / 2), 0, 0]))
elif axis == "y":
return TU.quat2mat(np.array([np.cos(rot_angle / 2), 0, np.sin(rot_angle / 2), 0]))
elif axis == "z":
return TU.quat2mat(np.array([np.cos(rot_angle / 2), 0, 0, np.sin(rot_angle / 2)]))
# final_angle = TU.quat2axisangle(TU.mat2quat(rotation_matrix(0.5*np.pi, axis="x")@rotation_matrix(0, axis="y")@rotation_matrix(0, axis='z')))
final_angle = TU.quat2axisangle(TU.mat2quat(env.robots[0].controller.ee_ori_mat))
# 1.reach
action = np.zeros(7)
action[:3] = pick_pos
action[2] -= 0.018
action[3:6] = final_angle
# try:
osc_r = osc_control_to(env, obs, ref_state=action, info="1. reaching")
step_itr_total += osc_r[0]
demo_list.extend(deepcopy(osc_r[2]))
# print(GREEN, f"1. reaching, step_itr_total: {step_itr_total}", RESET)
# 2.picking
# action[:3] = pick_pos
# action[3:6] = final_angle
# osc_r = osc_control_to(env, ref_state=action, info="2. picking")
# step_itr_total += osc_r[0]
# print(GREEN, f"2. picking, step_itr_total: {step_itr_total}", RESET)
# 2.picking gripper
action[6] = 1
osc_r = osc_control_to(env, osc_r[3], ref_state=action, info="2. picking gripper")
step_itr_total += osc_r[0]
demo_list.extend(deepcopy(osc_r[2]))
# print(GREEN, f"2. picking gripper, step_itr_total: {step_itr_total}", RESET)
# 3.lifting
action[2] += 0.1
osc_r = osc_control_to(env, osc_r[3], ref_state=action, info="3. lifting", ingore_break=(ENV_NAME=="Lift"))
step_itr_total += osc_r[0]
demo_list.extend(deepcopy(osc_r[2]))
# print(GREEN, f"3. lifting, step_itr_total: {step_itr_total}", RESET)
if ENV_NAME == "Stack":
# 4. moving
action[:3] = place_pos
action[2] += 0.1
osc_r = osc_control_to(env, osc_r[3], ref_state=action, info="4. moving")
step_itr_total += osc_r[0]
demo_list.extend(deepcopy(osc_r[2]))
# 5. placing
# action[2] -= 0.12
# osc_r = osc_control_to(env, osc_r[3], ref_state=action, info="5. placing")
# step_itr_total += osc_r[0]
# demo_list.extend(deepcopy(osc_r[2]))
# 6. placing griper
action[6] = -1
osc_r = osc_control_to(env, osc_r[3], ref_state=action, threshold=0.01, info="6. placing gripper", ingore_break=True)
step_itr_total += osc_r[0]
demo_list.extend(deepcopy(osc_r[2]))
total_eps += 1
if osc_r[1] is not None and osc_r[1] >= 0.99:
# success episode
cur_eps += 1
success_flag = True
demo_lists.append(demo_list)
else:
success_flag = False
print(GREEN, f"ep: {cur_eps}, step_itr_total: {step_itr_total}, success: {success_flag}", RESET)
if (cur_eps % number_interval == 0) and (cur_eps != 0):
print("+" * 20, "save demos", "+" * 20)
# save demos
demo_idx = cur_eps // number_interval
pkl_dir = os.path.join(save_dir, f'demo_cFreq{CONTROL_FREQ}_eps{eps}_idx{demo_idx}.pkl')
# joblib.dump(demo_lists, pkl_dir)
# while True:
with open(pkl_dir, mode="wb") as f:
pkl.dump(demo_lists, f)
# with open(pkl_dir, mode='rb') as f:
# data = pkl.load(f)
# if data != []:
# break
# else:
# assert demo_lists != []
demo_lists = []
# except KeyboardInterrupt as ke:
# video_dir = "./demo_gen/osc_control_video"
# os.makedirs(video_dir, exist_ok=True)
# video_dir = os.path.join(video_dir, "demo.mp4")
# imgs2video(
# imgs=imgs,
# video_dir=video_dir,
# fps=CONTROL_FREQ
# )
# sys.exit(0)
# video_dir = "./demo_gen/osc_control_video"
# os.makedirs(video_dir, exist_ok=True)
# video_dir = os.path.join(video_dir, "demo.mp4")
# imgs2video(
# imgs=imgs,
# video_dir=video_dir,
# fps=CONTROL_FREQ
# )
print(f"Eps {eps}, total_eps: {total_eps}, Eps success rate: {eps / total_eps}")
I collected a sufficient number of correct demonstrations (at least 1 million transitions), and then trained offline with tianshou's TD3_BC (this algorithm performs very well on d4rl's halfCheetah), but the results look poor. The visualization is as follows:
https://github.com/user-attachments/assets/1f50f3a1-5a27-4ac0-9fd8-b81959093403
https://github.com/user-attachments/assets/6952b9dc-3de0-46a5-8b14-b0e7b54552a1
I don't know where the problem lies, and I guess it might be because the actions cannot be accurately simulated under control_delta=True?
Hey @Felixvillas , I believe you should be able to collect demos using (the now-renamed) input_type=delta
by running:
python robosuite/scripts/collect_human_demonstrations.py --robots UR5e
Closing for now - feel free to re-open
Hi~ I'd like to collect demonstrations in the task of "Lift" using a script implemented by myself. Here is my specific config of environment:
Here is the core of the script for collecting demonstrations in the config of "control_delta = False":
For simplicity, I only present the 'reach' section in the code above. As the 'control_delta = False', so when I pass an action with absolute value, it works.
Now I'd like to collect demos in the config of "control_delta = True", so I need to pass an action with relative values. However, I am not clear whether the relative value here refers to the relative to the current states or to the initial state, so I have tried both ways.
However, both attempts failed to collect good demonstrations under the condition "control_delta=True" (here, the gripper could not correctly approach the cube. The gripper either moved very slowly over a short distance or the robotic arm would rotate with the gripper as the center of rotation).
I wonder where the problem lies?
Thanks~