Closed kwonmha closed 1 year ago
The fact that the policy outputs a1 at every time step is not itself a problem. What do you expect? What is the definition of your POMDP? Have you implemented it correctly? (Transition probability doesn't imply policy.)
The goal of this POMDP is to find the fastest way to final state(s4) where the reward is 100. So I give -1 to almost every transitions. And I give +1 when getting closer to final state.(s1->s2, s2->s3)
Thanks. Can you share your code of the problem definition and the POMDP control loop?
Thank you for your help. I made my problem simpler. It has 3 states(s1, s2, s3) and s3 is the final state. The goal is to find fastest way to arrive s3. You cannot go back(like prob of s2 -> s1 : 0) There are 2 actions(a1, a2), 2 observations(correct and wrong). a1 has higher probability to move s1 to s2 which encourages to select a1 on s1. a2 has higher probability to move s2 to s3.
This is my code. Thanks.
import numpy as np
import random
import sys
import time
import pomdp_py
from pomdp_py import sarsop
from pomdp_py import vi_pruning
from scipy.stats import norm
sarspop_path = "/home/mhkwon/projects/sarsop/src/pomdpsol"
vi_pruning_path = "/home/mhkwon/projects/vi_pruning.bin"
EPSILON = 1e-5
STD = 0.65
class LPRState(pomdp_py.State):
def __init__(self, name):
self.name = name
def __hash__(self):
return hash(self.name)
def __eq__(self, other):
if isinstance(other, LPRState):
return self.name == other.name
return False
def __str__(self):
return self.name
def __repr__(self):
return "LPRState(%s)" % self.name
class LPRAction(pomdp_py.Action):
def __init__(self, name):
self.name = name
def __hash__(self):
return hash(self.name)
def __eq__(self, other):
if isinstance(other, LPRAction):
return self.name == other.name
return False
def __str__(self):
return self.name
def __repr__(self):
return "LPRAction(%s)" % self.name
class LPRObservation(pomdp_py.Observation):
def __init__(self, name):
self.name = name
def __hash__(self):
return hash(self.name)
def __eq__(self, other):
if isinstance(other, LPRObservation):
return self.name == other.name
return False
def __str__(self):
return self.name
def __repr__(self):
return "LPRObservation(%s)" % self.name
def correct_probability(state, action_level):
prob = 1 - (1 / (1 + np.exp(state - action_level)))
noise = np.random.uniform(-EPSILON, EPSILON, 1)[0]
# print(state, action_level, prob, noise, prob + noise)
return max(prob + noise, 0)
# Observation model
class LPRObservationModel(pomdp_py.ObservationModel):
def __init__(self, n_state, n_action, action_levels, noise=0.0):
self.n_state = n_state
self.last_state_name = "s" + str(n_state + 1)
# state x action x 2(O, X) 만큼의 prob을 정의
self.ob_correct_prob = np.zeros((n_state + 1, n_action + 1))
for state in range(1, n_state + 1):
for action in range(1, n_action + 1):
self.ob_correct_prob[state][action] = correct_probability(state, action_levels[action])
# sum = np.sum(self.ob_correct_prob, axis=1)
# self.ob_correct_prob /= sum.reshape(-1, 1)
self.ob_wrong_prob = 1 - self.ob_correct_prob
def probability(self, observation, next_state, action):
"""
probability(self, observation, next_state, action)
Returns the probability of :math:`\Pr(o|s',a)`.
Args:
observation (~pomdp_py.framework.basics.Observation): the observation :math:`o`
next_state (~pomdp_py.framework.basics.State): the next state :math:`s'`
action (~pomdp_py.framework.basics.Action): the action :math:`a`
Returns:
float: the probability :math:`\Pr(o|s',a)`
"""
# return prob[next_state][action][observation]
# print(next_state.name, observation.name, action.name)
# Always give correct observation on last state.
if next_state.name == self.last_state_name:
if observation.name == "correct":
return 1.0
else:
return 0.0
state_idx = int(next_state.name[1:])
action_idx = int(action.name[1:])
if observation.name == "correct":
prob = self.ob_correct_prob[state_idx][action_idx]
return prob
else:
prob = self.ob_wrong_prob[state_idx][action_idx]
return prob
def sample(self, next_state, action):
# master level 도달
if next_state.name == self.last_state_name:
return LPRObservation("correct")
state_idx = int(next_state.name[1:])
action_idx = int(action.name[1:])
print(state_idx, action_idx)
if random.uniform(0, 1) < self.ob_correct_prob[state_idx][action_idx]:
return LPRObservation("correct")
else:
return LPRObservation("wrong")
def get_all_observations(self):
"""Only need to implement this if you're using
a solver that needs to enumerate over the observation space
(e.g. value iteration)"""
return [LPRObservation(s) for s in {"correct", "wrong"}]
def proceed_probability(state, action_level):
prob = norm.pdf(state, action_level, STD)
noise = np.random.uniform(-EPSILON, EPSILON, 1)[0]
# print(state, action_level, prob, noise, prob + noise)
return max(prob + noise, 0)
# Transition Model
class LPRTransitionModel(pomdp_py.TransitionModel):
def __init__(self, n_state, n_action, action_levels):
self.n_state = n_state
self.action_levels = action_levels
print("transition model n_state", self.n_state)
# level 1 -> level 1 or level 2
self.prob_to_next_level = np.zeros((n_state + 2, n_action + 1))
self.prob_to_cur_level = np.zeros((n_state + 2, n_action + 1))
# self.prob_to_next_level[1][1] = 1.0
# self.prob_to_cur_level[2][1] = 1.0
# self.prob_to_cur_level[3][1] = 1
# self.prob_to_cur_level[1][2] = 1.0
# self.prob_to_next_level[2][2] = 1.0
# self.prob_to_cur_level[3][2] = 1
self.prob_to_next_level[1][1] = 0.9
self.prob_to_cur_level[1][1] = 0.1
self.prob_to_cur_level[2][1] = 1.0
self.prob_to_cur_level[3][1] = 1.0
self.prob_to_next_level[1][2] = 0.2
self.prob_to_cur_level[1][2] = 0.8
self.prob_to_next_level[2][2] = 0.8
self.prob_to_cur_level[2][2] = 0.2
self.prob_to_cur_level[3][2] = 1.0
print(self.prob_to_next_level)
print("===============================")
print(self.prob_to_cur_level)
# exit()
def probability(self, next_state, state, action):
state_idx = int(state.name[1:])
action_idx = int(action.name[1:])
next_state_idx = int(next_state.name[1:])
# last state stays.
if state_idx == self.n_state + 1:
if next_state_idx == state_idx:
return 1.0
else:
return 0.0
# state goes only self -> self, self + 1
if not(state_idx == next_state_idx or state_idx + 1 == next_state_idx):
return 0.0
if next_state.name == state.name:
prob = self.prob_to_cur_level[state_idx][action_idx]
return prob
elif state_idx + 1 == next_state_idx:
prob = self.prob_to_next_level[state_idx][action_idx]
return prob
else:
raise ValueError
def sample(self, state, action):
state_idx = int(state.name[1:])
action_idx = int(action.name[1:])
if random.uniform(0, 1) < self.prob_to_cur_level[state_idx][action_idx]:
return LPRState(str(state.name))
else:
return LPRState("s" + str(int(state.name[1:]) + 1))
def get_all_states(self):
"""Only need to implement this if you're using
a solver that needs to enumerate over the observation space (e.g. value iteration)"""
# print([LPRState("s" + str(s)) for s in list(range(1, self.n_state + 2))])
return [LPRState("s" + str(s)) for s in list(range(1, self.n_state + 2))]
# Reward Model
class LPRRewardModel(pomdp_py.RewardModel):
def __init__(self, n_state):
self.n_state = n_state
self.last_state_name = str(n_state + 1)
print("last_state_name:", self.last_state_name)
def sample(self, state, action, next_state):
# deterministic
if state.name[1:] == str(self.n_state) and next_state.name[1:] == self.last_state_name: # final state
return 100.
elif int(state.name[1:]) + 1 == int(next_state.name[1:]):
return 1.0
else:
return -1.0
# Policy Model
class LPRPolicyModel(pomdp_py.RolloutPolicy):
"""A simple policy model with uniform prior over a
small, finite action space"""
def __init__(self, n_actions):
super(LPRPolicyModel, self).__init__()
self.actions = {LPRAction("a" + str(s)) for s in list(range(1, n_actions + 1))}
def sample(self, state):
all_actions = self.get_all_actions()
sampled_action = random.sample(all_actions, 1)[0]
print("sampled action:", sampled_action)
return sampled_action
def rollout(self, state, history=None):
"""Treating this PolicyModel as a rollout policy"""
return self.sample(state)
def get_all_actions(self, state=None, history=None):
return self.actions
class LPRProblem(pomdp_py.POMDP):
def __init__(self, n_state, n_action, action_levels, init_state, init_belief):
self.n_state = n_state
transition_model = LPRTransitionModel(n_state, n_action, action_levels)
reward_model = LPRRewardModel(n_state)
agent = pomdp_py.Agent(init_belief,
LPRPolicyModel(n_action),
transition_model,
LPRObservationModel(n_state, n_action, action_levels),
reward_model)
env = pomdp_py.Environment(init_state,
transition_model,
reward_model)
super(LPRProblem, self).__init__(agent, env, name="LearnPathRecProblem")
def mastered(self, state):
return state.name[1:] == str(self.n_state + 1)
def test_planner(lpr_problem, planner, action_levels, nsteps=3, discount=0.9,
debug_tree=False):
"""
Runs the action-feedback loop of Tiger problem POMDP
Args:
lpr_problem (TigerProblem): a problem instance
planner (Planner): a planner
nsteps (int): Maximum number of steps to run this loop.
debug_tree (bool): True if get into the pdb with a
TreeDebugger created as 'dd' variable.
"""
gamma = 1.0
total_discounted_reward = 0
for i in range(nsteps):
start = time.time()
action = planner.plan(lpr_problem.agent)
if debug_tree:
from pomdp_py.utils import TreeDebugger
dd = TreeDebugger(lpr_problem.agent.tree)
import pdb; pdb.set_trace()
print("==== Step %d ====" % (i+1))
print("True state:", lpr_problem.env.state)
# print("Belief:", lpr_problem.agent.cur_belief)
print("Action:", action)
print("Action level:", action_levels[int(action.name[1:])])
state_index = int(lpr_problem.env.state.name[1:])
action_index = int(action.name[1:])
print("Probability to go to next level:", lpr_problem.agent.transition_model.prob_to_next_level[state_index][action_index])
reward = lpr_problem.env.state_transition(action, execute=True)
print("Reward:", reward)
print("True next state: %s" % lpr_problem.env.state)
real_observation = lpr_problem.agent.observation_model.sample(lpr_problem.env.state, action)
print(">> Observation:", real_observation)
lpr_problem.agent.update_history(action, real_observation)
# Update the belief. If the planner is POMCP, planner.update
# also automatically updates agent belief.
planner.update(lpr_problem.agent, action, real_observation)
total_discounted_reward += reward * gamma
gamma *= discount
if isinstance(planner, pomdp_py.POUCT):
print("Num sims:", planner.last_num_sims)
print("Plan time: %.5f" % planner.last_planning_time)
if isinstance(lpr_problem.agent.cur_belief,
pomdp_py.Histogram):
new_belief = pomdp_py.update_histogram_belief(
lpr_problem.agent.cur_belief,
action, real_observation,
lpr_problem.agent.observation_model,
lpr_problem.agent.transition_model)
lpr_problem.agent.set_belief(new_belief)
print("{} seconds".format(time.time() - start))
if lpr_problem.mastered(lpr_problem.env.state):
break
print("Final Discounted reward", total_discounted_reward)
def main():
n_state = 2
n_action = 2
np.random.seed(100)
action_levels = np.array([0, 1, 2])
init_true_state = LPRState("s1")
init_belief = pomdp_py.Histogram({LPRState("s1"): 1})
lpr_problem = LPRProblem(n_state, n_action, action_levels, init_true_state, init_belief)
# print("** Testing value iteration **")
# vi = pomdp_py.ValueIteration(horizon=3, discount_factor=0.95)
# test_planner(lpr_problem, vi, action_levels, nsteps=10)
print("\n** Testing sarsop **")
ss = sarsop(lpr_problem.agent, sarspop_path, discount_factor=0.95,
timeout=1, memory=50, precision=0.000001,
remove_generated_files=False)
test_planner(lpr_problem, ss, action_levels, nsteps=20)
if __name__ == '__main__':
main()
The problem is with your belief initialization.
init_belief = pomdp_py.Histogram({LPRState("s1"): 1})
should be
init_belief = pomdp_py.Histogram({LPRState("s1"): 1, LPRState("s2"): 0.0, LPRState("s3"): 0.0})
since your state space contains s1, s2, s3.
With this change, I get the following output:
==== Step 1 ====
True state: s1
Action: a1
Action level: 1
Probability to go to next level: 0.9
Reward: 1.0
True next state: s2
2 1
>> Observation: correct
0.0034754276275634766 seconds
==== Step 2 ====
True state: s2
Action: a2
Action level: 2
Probability to go to next level: 0.8
Reward: -1.0
True next state: s2
2 2
>> Observation: correct
0.0029265880584716797 seconds
==== Step 3 ====
True state: s2
Action: a2
Action level: 2
Probability to go to next level: 0.8
Reward: 100.0
True next state: s3
>> Observation: correct
0.0029344558715820312 seconds
Final Discounted reward 81.1
Thank you so much!
I'm trying to solve my own problem but ValueIteration and SARSOP gives me same action everytime.
Can you check my model setting? This is the
temp-pomdp.pomdp
that contains every info about my problem.It has 4 sates, 3 actions. S1 can go to S1, S2, S2 can go to S2, S3, S3 can go to S3, S4. S4 is the goal state.
On, S1, a1 gives the highest probability to go to S2. On, S2, a1 gives the highest probability to go to S3. On, S3, a1 gives the highest probability to go to S4. But the final policy always selects a1 to proceed.