Closed HankerSia closed 3 years ago
We've got a benchmark repository that should allow you to reproduce our RL results that are described in our whitepaper.
To add to what @amandlek said:
Robosuite in general isn't meant to do any RL on its own -- it's merely an implementation of an "environment" for an RL agent (algorithm) to interact with. We use a gym-style format, so any RL algorithm that works with a Gym environment can be used with our repo (just use the GymWrapper class). We also have our own full set of documentation which might be very helpful.
I would recommend starting with OpenAI's Gym environments / tutorials just to get familiar with the code style -- again, we utilize a similar interface with our environments here.
In terms of RL Algorithms, I would recommend getting familiar with the stable-baseline repo, which includes many state-of-the-art RL algorithms that you can directly use off-the-shelf with gym-style environments.
I am not sure what your exact background is or your familiarity with RL in general, but I would highly encourage you to become familiar with the underlying theory of RL (this blog post might be a good and informal starting point) -- once you understand that, understanding RL codebases will become much easier!
To add to what @amandlek said:
Robosuite in general isn't meant to do any RL on its own -- it's merely an implementation of an "environment" for an RL agent (algorithm) to interact with. We use a gym-style format, so any RL algorithm that works with a Gym environment can be used with our repo (just use the GymWrapper class). We also have our own full set of documentation which might be very helpful.
I would recommend starting with OpenAI's Gym environments / tutorials just to get familiar with the code style -- again, we utilize a similar interface with our environments here.
In terms of RL Algorithms, I would recommend getting familiar with the stable-baseline repo, which includes many state-of-the-art RL algorithms that you can directly use off-the-shelf with gym-style environments.
I am not sure what your exact background is or your familiarity with RL in general, but I would highly encourage you to become familiar with the underlying theory of RL (this blog post might be a good and informal starting point) -- once you understand that, understanding RL codebases will become much easier!
Thank you firstly! I had learned the basic knowledge of RL and gym. A pendulum control experiment using ddpg had been built by myself. I also read the introduction of the robosuite. I can build the basic environment which include robot, gripper, and objects and so on. But, there are not the following reference tutorials about how to use this environment, or a simple code demo like my pendulum control experiment, it only include one python source file... `
""" Created on Sun Nov 29 21:38:39 2020
@author: lenovo """
import os import random import gym from collections import deque
import numpy as np import tensorflow as tf
from keras.layers import Input, Dense, Lambda, concatenate from keras.models import Model from keras.optimizers import Adam import keras.backend as K
class DDPG(): """Deep Deterministic Policy Gradient Algorithms. """ def init(self): super(DDPG, self).init()
self.sess = K.get_session()
self.env = gym.make('Pendulum-v0')
self.bound = self.env.action_space.high[0]
# update rate for target model.
self.TAU = 0.01
# experience replay.
self.memory_buffer = deque(maxlen=4000)
# discount rate for q value.
self.gamma = 0.95
# epsilon of action selection
self.epsilon = 1.0
# discount rate for epsilon.
self.epsilon_decay = 0.995
# min epsilon of ε-greedy.
self.epsilon_min = 0.01
# actor learning rate
self.a_lr = 0.0001
# critic learining rate
self.c_lr = 0.001
# ddpg model
self.actor = self._build_actor()
self.critic = self._build_critic()
# target model
self.target_actor = self._build_actor()
self.target_actor.set_weights(self.actor.get_weights())
self.target_critic = self._build_critic()
self.target_critic.set_weights(self.critic.get_weights())
# gradient function
self.get_critic_grad = self.critic_gradient()
self.actor_optimizer()
if os.path.exists('model/ddpg_actor.h5') and os.path.exists('model/ddpg_critic.h5'):
self.actor.load_weights('model/ddpg_actor.h5')
self.critic.load_weights('model/ddpg_critic.h5')
def _build_actor(self):
"""Actor model.
"""
inputs = Input(shape=(3,), name='state_input')
x = Dense(40, activation='relu')(inputs)
x = Dense(40, activation='relu')(x)
x = Dense(1, activation='tanh')(x)
output = Lambda(lambda x: x * self.bound)(x)
model = Model(inputs=inputs, outputs=output)
model.compile(loss='mse', optimizer=Adam(lr=self.a_lr))
return model
def _build_critic(self):
"""Critic model.
"""
sinput = Input(shape=(3,), name='state_input')
ainput = Input(shape=(1,), name='action_input')
s = Dense(40, activation='relu')(sinput)
a = Dense(40, activation='relu')(ainput)
x = concatenate([s, a])
x = Dense(40, activation='relu')(x)
output = Dense(1, activation='linear')(x)
model = Model(inputs=[sinput, ainput], outputs=output)
model.compile(loss='mse', optimizer=Adam(lr=self.c_lr))
return model
def actor_optimizer(self):
"""actor_optimizer.
Returns:
function, opt function for actor.
"""
self.ainput = self.actor.input
aoutput = self.actor.output
trainable_weights = self.actor.trainable_weights
self.action_gradient = tf.placeholder(tf.float32, shape=(None, 1))
# tf.gradients will calculate dy/dx with a initial gradients for y
# action_gradient is dq / da, so this is dq/da * da/dparams
params_grad = tf.gradients(aoutput, trainable_weights, -self.action_gradient)
grads = zip(params_grad, trainable_weights)
self.opt = tf.train.AdamOptimizer(self.a_lr).apply_gradients(grads)
self.sess.run(tf.global_variables_initializer())
def critic_gradient(self):
"""get critic gradient function.
Returns:
function, gradient function for critic.
"""
cinput = self.critic.input
coutput = self.critic.output
# compute the gradient of the action with q value, dq/da.
action_grads = K.gradients(coutput, cinput[1])
return K.function([cinput[0], cinput[1]], action_grads)
def OU(self, x, mu=0, theta=0.15, sigma=0.2):
"""Ornstein-Uhlenbeck process.
formula:ou = θ * (μ - x) + σ * w
Arguments:
x: action value.
mu: μ, mean fo values.
theta: θ, rate the variable reverts towards to the mean.
sigma:σ, degree of volatility of the process.
Returns:
OU value
"""
return theta * (mu - x) + sigma * np.random.randn(1)
def get_action(self, X):
"""get actor action with ou noise.
Arguments:
X: state value.
"""
action = self.actor.predict(X)[0][0]
# add randomness to action selection for exploration
noise = max(self.epsilon, 0) * self.OU(action)
action = np.clip(action + noise, -self.bound, self.bound)
return action
def remember(self, state, action, reward, next_state, done):
"""add data to experience replay.
Arguments:
state: observation.
action: action.
reward: reward.
next_state: next_observation.
done: if game done.
"""
item = (state, action, reward, next_state, done)
self.memory_buffer.append(item)
def update_epsilon(self):
"""update epsilon.
"""
if self.epsilon >= self.epsilon_min:
self.epsilon *= self.epsilon_decay
def process_batch(self, batch):
"""process batch data.
Arguments:
batch: batch size.
Returns:
states: states.
actions: actions.
y: Q_value.
"""
y = []
# ranchom choice batch data from experience replay.
data = random.sample(self.memory_buffer, batch)
states = np.array([d[0] for d in data])
actions = np.array([d[1] for d in data])
next_states = np.array([d[3] for d in data])
# Q_target。
next_actions = self.target_actor.predict(next_states)
q = self.target_critic.predict([next_states, next_actions])
# update Q value
for i, (_, _, reward, _, done) in enumerate(data):
target = reward
if not done:
target += self.gamma * q[i][0]
y.append(target)
return states, actions, y
def update_model(self, X1, X2, y):
"""update ddpg model.
Arguments:
states: states.
actions: actions.
y: Q_value.
Returns:
loss: critic loss.
"""
loss = self.critic.fit([X1, X2], y, verbose=0)
loss = np.mean(loss.history['loss'])
X3 = self.actor.predict(X1)
a_grads = np.array(self.get_critic_grad([X1, X3]))[0]
self.sess.run(self.opt, feed_dict={
self.ainput: X1,
self.action_gradient: a_grads
})
return loss
def update_target_model(self):
"""soft update target model.
formula:θt ← τ * θ + (1−τ) * θt, τ << 1.
"""
critic_weights = self.critic.get_weights()
actor_weights = self.actor.get_weights()
critic_target_weights = self.target_critic.get_weights()
actor_target_weights = self.target_actor.get_weights()
for i in range(len(critic_weights)):
critic_target_weights[i] = self.TAU * critic_weights[i] + (1 - self.TAU) * critic_target_weights[i]
for i in range(len(actor_weights)):
actor_target_weights[i] = self.TAU * actor_weights[i] + (1 - self.TAU) * actor_target_weights[i]
self.target_critic.set_weights(critic_target_weights)
self.target_actor.set_weights(actor_target_weights)
def train(self, episode, batch):
"""training model.
Arguments:
episode: ganme episode.
batch: batch size of episode.
Returns:
history: training history.
"""
history = {'episode': [], 'Episode_reward': [], 'Loss': []}
for i in range(episode):
observation = self.env.reset()
reward_sum = 0
losses = []
for j in range(200):
# chocie action from ε-greedy.
x = observation.reshape(-1, 3)
# actor action
action = self.get_action(x)
observation, reward, done, _ = self.env.step(action)
# add data to experience replay.
reward_sum += reward
self.remember(x[0], action, reward, observation, done)
if len(self.memory_buffer) > batch:
X1, X2, y = self.process_batch(batch)
# update DDPG model
loss = self.update_model(X1, X2, y)
# update target model
self.update_target_model()
# reduce epsilon pure batch.
self.update_epsilon()
losses.append(loss)
loss = np.mean(losses)
history['episode'].append(i)
history['Episode_reward'].append(reward_sum)
history['Loss'].append(loss)
print('Episode: {}/{} | reward: {} | loss: {:.3f}'.format(i, episode, reward_sum, loss))
self.actor.save_weights('model/ddpg_actor.h5')
self.critic.save_weights('model/ddpg_critic.h5')
return history
def play(self):
"""play game with model.
"""
print('play...')
observation = self.env.reset()
reward_sum = 0
random_episodes = 0
while random_episodes < 10:
self.env.render()
x = observation.reshape(-1, 3)
action = self.actor.predict(x)[0]
observation, reward, done, _ = self.env.step(action)
reward_sum += reward
if done:
print("Reward for this episode was: {}".format(reward_sum))
random_episodes += 1
reward_sum = 0
observation = self.env.reset()
self.env.close()
if name == 'main': model = DDPG()
history = model.train(10, 128)
model.play()
`
So, if I want train a pick-place task, is there any simple project using robosuite. Current project in github cite all kinds of RL lib...It is hard for me to understand... Besides, these projects always cite other RL lib project...
As mentioned by @cremebrule , we have a simple Gym interface that you can see here. If you have indeed used gym with your own DDPG code before, then you should be able to swap the gym environment you were using with a robosuite environment, by using the wrapper that we've linked. Are there any other points of confusion?
@amandlek Thanks! Can you made a short introduction about the observation and reward of a simple picking task. Then what actions (like torque of the joints of the robotic arm)should be input for this environment. And how to define an object in the picking task. In short, Can you made a tutorial about a simple picking task using ddpg and robosuite?
For I am a fresh man, so I am so confused that how to use robosuite and how to train a grasp agent? Can anyone give me a beginner level tutorial. Or tell me how can I create a simple demo of grasp based on robosuite and typical RL algorithm. Thank you in advance!