csu-hmc / opty

A library for using direct collocation in the optimization of dynamic systems.
http://opty.readthedocs.io
Other
92 stars 20 forks source link

opty does not 'respect' the bounds #145

Closed Peter230655 closed 2 months ago

Peter230655 commented 5 months ago

I tried to simulate a simple 2D rocket. opty converged to a local optimum, and all terminal constraints and other errors, as found in the info['g'] dictionary seem to be within numerical accuracy. However, it does not seem to respect the bounds on the control variables, here called strength and q0.

No idea what I did wrong.

Below is the code.

Any help is greatly appreciated!

import sympy.physics.mechanics as me
from collections import OrderedDict
import time
import numpy as np
import sympy as sm
from scipy.optimize import root
from scipy.interpolate import CubicSpline

from opty.direct_collocation import Problem
from opty.utils import parse_free
import matplotlib.pyplot as plt
import matplotlib
from IPython.display import HTML
matplotlib.rcParams['animation.embed_limit'] = 2**128
from matplotlib.animation import FuncAnimation
#=========================================================================================
start = time.time()

def create_objective_function(
    objective, state_symbols, input_symbols, unknown_symbols,
    N, node_time_interval=1.0, integration_method="backward euler"):
    """Creates function to evaluate the objective and objective gradient.

    Parameters
    ----------
    objective : sympy.Expr
        The objective function to be minimized, which is a function of the
        states and inputs. The objective function can contain non-nested
        indefinite integrals of time, e.g. ``Integral(f(t)**2, t)``.
    state_symbols : iterable of symbols
        The state variables.
    input_symbols : iterable of symbols
        The input variables.
    unknown_symbols : iterable of symbols
        The unknown parameters.
    N : int
        Number of collocation nodes, i.e. the number of time steps.
    node_time_interval : float
        The value of the time interval. The default is 1.0, as this term only
        appears in the objective function as a scaling factor.
    integration_method : str, optional
        The method used to integrate the system. The default is "backward
        euler".

    """
    def lambdify_function(expr, multiplication_array, take_sum):
        if take_sum:
            def integration_function(x):
                return node_time_interval * np.sum(x * multiplication_array)
        else:
            def integration_function(x):
                return node_time_interval * x * multiplication_array
        return sm.lambdify(
            (states, inputs, params), expr,
            modules=[{int_placeholder.name: integration_function}, "numpy"],
            cse=True)

    def parse_expr(expr, in_integral=False):
        if not expr.args:
            return expr
        if isinstance(expr, sm.Integral):
            if in_integral:
                raise NotImplementedError("Nested integrals are not supported.")
            if expr.limits != ((me.dynamicsymbols._t,),):
                raise NotImplementedError(
                    "Only indefinite integrals of time are supported.")
            return int_placeholder(parse_expr(expr.function, True))
        return expr.func(*(parse_expr(arg) for arg in expr.args))

    # Parse function arguments
    states = sm.ImmutableMatrix(state_symbols)
    inputs = sm.ImmutableMatrix(sort_sympy(input_symbols))
    params = sm.ImmutableMatrix(sort_sympy(unknown_symbols))
    if states.shape[1] > 1 or inputs.shape[1] > 1 or params.shape[1] > 1:
        raise ValueError(
            'The state, input, and unknown symbols must be column matrices.')
    n, q = states.shape[0], inputs.shape[0]
    i_idx, r_idx = n * N, (n + q) * N

    # Compute analytical gradient of the objective function
    objective_grad = sm.ImmutableMatrix([objective]).jacobian(
        states[:] + inputs[:] + params[:])

    # Replace the integral with a custom function
    int_placeholder = sm.Function("_IntegralFunction")
    objective = parse_expr(objective)
    objective_grad = tuple(parse_expr(objective_grad))

    # Replace zeros with an array of zeros, otherwise lambdify will return a
    # scalar zero instead of an array of zeros.
    objective_grad = tuple(
        np.zeros(N) if grad == 0 else grad for grad in objective_grad[:n + q]
        ) + tuple(objective_grad[n + q:])

    # Define evaluation functions based on the integration method
    if integration_method == "backward euler":
        obj_expr_eval = lambdify_function(
            objective, np.hstack((0, np.ones(N - 1))), True)
        obj_grad_time_expr_eval = lambdify_function(
            objective_grad[:n + q], np.hstack((0, np.ones(N - 1))), False)
        obj_grad_param_expr_eval = lambdify_function(
            objective_grad[n + q:], np.hstack((0, np.ones(N - 1))), True)
        def obj(free):
            states = free[:i_idx].reshape((n, N))
            inputs = free[i_idx:r_idx].reshape((q, N))
            return obj_expr_eval(states, inputs, free[r_idx:])

        def obj_grad(free):
            states = free[:i_idx].reshape((n, N))
            inputs = free[i_idx:r_idx].reshape((q, N))
            return np.hstack((
                *obj_grad_time_expr_eval(states, inputs, free[r_idx:]),
                obj_grad_param_expr_eval(states, inputs, free[r_idx:])
             ))

    elif integration_method == "midpoint":
        obj_expr_eval = lambdify_function(
            objective, np.ones(N - 1), True)
        obj_grad_time_expr_eval = lambdify_function(
            objective_grad[:n + q], np.hstack((0.5, np.ones(N - 2), 0.5)),
            False)
        obj_grad_param_expr_eval = lambdify_function(
            objective_grad[n + q:], np.ones(N - 1), True)
        def obj(free):
            states = free[:i_idx].reshape((n, N))
            states_mid = 0.5 * (states[:, :-1] + states[:, 1:])
            inputs = free[i_idx:r_idx].reshape((q, N))
            inputs_mid = 0.5 * (inputs[:, :-1] + inputs[:, 1:])
            return obj_expr_eval(states_mid, inputs_mid, free[r_idx:])

        def obj_grad(free):
            states = free[:i_idx].reshape((n, N))
            states_mid = 0.5 * (states[:, :-1] + states[:, 1:])
            inputs = free[i_idx:r_idx].reshape((q, N))
            inputs_mid = 0.5 * (inputs[:, :-1] + inputs[:, 1:])
            return np.hstack((
                *obj_grad_time_expr_eval(states, inputs, free[r_idx:]),
                obj_grad_param_expr_eval(states_mid, inputs_mid, free[r_idx:])
            ))

    else:
        raise NotImplementedError(
            f"Integration method '{integration_method}' is not implemented.")

    return obj, obj_grad

def sort_sympy(seq):
    """Returns a sorted list of the symbols."""
    seq = list(seq)
    try:  # symbols
        seq.sort(key=lambda x: x.name)
    except AttributeError:  # functions
        seq.sort(key=lambda x: x.__class__.__name__)
    return seq
#=========================================================================================
tart = time.time()

N, A0 = sm.symbols('N A0', cls= me.ReferenceFrame)
t = me.dynamicsymbols._t
O, Dmc = sm.symbols('O Dmc', cls= me.Point)
O.set_vel(N, 0)

x, y   = me.dynamicsymbols('x y')                                           # position of tip of the rocket
ux, uy = me.dynamicsymbols('u_x u_y')                                       # speed of the tip of the rocket
q0     = me.dynamicsymbols('q0')                                            # direction of the thrust
u0     = me.dynamicsymbols('u0')                                            
m, um  = me.dynamicsymbols('m, um')                                         # mass of fuel of the rocket

strength = me.dynamicsymbols('strength')                                    # thrust strength, control parameter

m0, g, v0 = sm.symbols('m0 g v0')                                           # mass of the rocket, gravity, speedof the exhaust gas
reibung   = sm.symbols('reibung')                                           # friction coefficient

A0.orient_axis(N, -q0, N.z)                                                  # direction of the thrust  
A0.set_ang_vel(N, -u0 * N.z)

Dmc.set_pos(O, x*N.x + y*N.y)
Dmc.set_vel(N, ux*N.x + uy*N.y)

# Create the rocket
body0 = me.Particle('body0', Dmc, (m0 + m))
BODY = [body0]

# Forces
# The friction force arbitrarily depends on the height, to kinf of model lower air friction at higher elevantions.
# as dm/dt <= 0, the driving force points in positiy A0.y direction.
FL = [(Dmc, -(m0 + m) * g * N.y), (Dmc, -v0*um*A0.y), (Dmc, -reibung/(1 + y)*Dmc.vel(N))]                                                    

kd = sm.Matrix([ux - x.diff(t), uy - y.diff(t), um - m.diff(t)])       # kinematic differential equations    
speed_constr1 = sm.Matrix([um + strength * m])                         # speed constraint, determines the driving force
q_ind = [x, y, m]
u_ind = [ux, uy]
u_dep = [um]

# Create the KanesMethod object
KM = me.KanesMethod(
                    N, q_ind=q_ind, u_ind=u_ind, 
                    kd_eqs=kd, 
                    u_dependent=u_dep, 
                    velocity_constraints=speed_constr1,
                    )
(fr, frstar) = KM.kanes_equations(BODY, FL)

EOM = kd.col_join(fr + frstar) 
EOM = EOM.col_join(speed_constr1)

print('EOM shape', EOM.shape, '\n')
print('EOM DS', me.find_dynamicsymbols(EOM), '\n')
print('EOM FS', EOM.free_symbols)
print(f'EOMs contain {sm.count_ops(EOM)} operations, {sm.count_ops(sm.cse(EOM))} after cse')
EOM = sm.simplify(EOM)
EOM
#=========================================================================================
state_symbols     = tuple((x, y, ux, uy, m, um))
laenge            = len(state_symbols)
constant_symbols  = (m0, g, v0, reibung)
specified_symbols = (strength, q0)
unknown_symbols   = []

#======================================================================================================
methode = 'backward euler'  # The integration method to use. backward euler or midpoint are the choices
opty_info = True            # If True, the optimizer will print some information about the optimization process
#======================================================================================================

duration  = 4.0                                             # The duration of the simulation.
num_nodes = 300                                             # The number of nodes to use in the direct collocation problem.
t0, tf    = 0., duration                                    # The initial and final times

initial_guess = np.ones((len(state_symbols) + len(specified_symbols)) * num_nodes) * 0.01
print('initial guess shape', initial_guess.shape)

interval_value = duration / (num_nodes - 1)

# Specify the known system parameters.
par_map = OrderedDict()
par_map[g]        = 9.81                                    # Acceleration due to gravity
par_map[v0]       = 100.                                    # Speed of the exhaust gas
par_map[m0]       = 1.0                                     # Mass of the rocket
par_map[reibung]  = 0.5                                     # Friction coefficient

objective = sm.Integral(um**2, t)                           # It should use minimum amount of fuel, where m is the mass of the fuel.

# Specify the objective function and it's gradient.
obj, obj_grad = create_objective_function(
    objective, 
    state_symbols, 
    specified_symbols, 
    unknown_symbols,
    num_nodes, 
    interval_value, 
    methode)

#====================================================================================
initial_state_constraints = {
                            x: 0,
                            y: 0,
                            ux: 0.,
                            uy: 0.,
                            m: 10.
                            }

final_state_constraints    = {
                             x: 100,
                             y: 100.,
                             ux: 0.,
                             uy: 0.,
                             }    

bounds = {
          strength: (0., np.inf),
          x: (initial_state_constraints[x], final_state_constraints[x]),
          y: (initial_state_constraints[y], np.inf),
          m: (0, initial_state_constraints[m]),
          um: (-10, 0.),
          q0: (-np.pi, np.pi),
          }

instance_constraints = tuple(xi.subs({t: t0}) - xi_val for xi, xi_val in initial_state_constraints.items()) + tuple(xi.subs({t: tf}) - xi_val for xi, xi_val in final_state_constraints.items())

# Here I can iterate. This helps sometimes.
solution_list = []                        # list to store the solutions of each iteration
g_list        = []                        # list to store the values of the errors

for i in range(2): 

    instance_constraints = tuple(xi.subs({t: t0}) - xi_val for xi, xi_val in initial_state_constraints.items()) + tuple(xi.subs({t: tf}) - xi_val for xi, xi_val in final_state_constraints.items()) 

    prob = Problem(obj, obj_grad, EOM, state_symbols, num_nodes, interval_value,
        known_parameter_map=par_map,
        instance_constraints=instance_constraints,
        bounds=bounds,
        integration_method=methode)

    prob.add_option('max_iter', 30000)           # default is 3000

# Find the optimal solution.
    solution, info = prob.solve(initial_guess)
    print(f'{i+1} - th iteration')
    if opty_info:
        print('message from optimizer:', info['status_msg'])
        print('Iterations needed',len(prob.obj_value))
        print(f"objective value {info['obj_val']:.3e} \n")
#    prob.plot_objective_value()
    initial_guess = solution
    solution_list.append(solution)
    g_list.append(info['g'])
#=========================================================================================
welche = 0
fehler = g_list[welche]
anzahl = len(state_symbols)
fig, ax = plt.subplots(anzahl, 1, figsize=(10, 2.25*anzahl), sharex=True)
times = np.linspace(0.0, duration, num=num_nodes)

for i, j in enumerate(state_symbols):
    if j in final_state_constraints.keys():
        farbe1 = 'red'
    else:
        farbe1 = 'black'

    if i < laenge:
        farbe = 'b'
    else:
        farbe = 'r'
    ax[i].plot(times, fehler[i * num_nodes:(i + 1) * num_nodes], color = farbe, lw=0.5)
    ax[i].grid()
    ax[i].set_ylabel(f'${str(j)}$', color=farbe1)
ax[-1].set_xlabel('time')
ax[0]. set_title(f'Errors in the generalized coordinates and speeds \n coordinates/speeds in the final_state_constraints are red', color = 'b');
print(f'error in the final state constraints x:   {fehler[-4]:.3e}')
print(f'error in the final state constrains  y:   {fehler[-3]:.3e}')
print(f'error in the final state constraint ux:   {fehler[-2]:.3e}')
print(f'error in the final state constraint uy:   {fehler[-1]:.3e}')
#=========================================================================================
solution = solution_list[welche]
anzahl = len(state_symbols) + len(specified_symbols)
fig, ax = plt.subplots(anzahl, 1, figsize=(10, 2.5*anzahl), sharex=True)
times = np.linspace(0.0, duration, num=num_nodes)

for i, j in enumerate(state_symbols + specified_symbols):
    if j in final_state_constraints.keys():
        farbe1 = 'red'
    else:
        farbe1 = 'black'

    if i < laenge:
        farbe = 'b'
    else:
        farbe = 'r'
    ax[i].plot(times, solution[i * num_nodes:(i + 1) * num_nodes], color = farbe, lw = 0.5)
    ax[i].grid()
    ax[i].set_ylabel(f'${str(j)}$', color=farbe1)
ax[-1].set_xlabel('time')
ax[0]. set_title('Generalized coordinates and speeds', color = 'b')
ax[laenge]. set_title('Control force(s)', color = 'r');
#=========================================================================================

fps = 60
print(solution.shape)
def add_point_to_data(line, x, y):
# to trace the path of the point. Copied from Timo.
    old_x, old_y = line.get_data()
    line.set_data(np.append(old_x, x), np.append(old_y, y))

state_vals, input_vals, _ = parse_free(solution, len(state_symbols), len(specified_symbols), num_nodes)
t_arr = np.linspace(t0, tf, num_nodes)
state_sol = CubicSpline(t_arr, state_vals.T)
input_sol = CubicSpline(t_arr, input_vals.T)

fb = sm.symbols('f_b')                                                     # forces at the axles
Fbq = me.Point('Fbq')                                                      # point where the force is applied
Fbq.set_pos(Dmc, fb * A0.y)

pL, pL_vals = zip(*par_map.items())

coordinates = Fbq.pos_from(O).to_matrix(N)
coordinates = coordinates.row_join(Dmc.pos_from(O).to_matrix(N))
coords_lam = sm.lambdify((x, y, ux, uy, m, um, strength, q0, *pL, fb), coordinates, cse=True)

pL, pL_vals = zip(*par_map.items())

# needed to give the picture the right size.
xmin, xmax = initial_state_constraints[x] - 10, final_state_constraints[x] + 10
ymin, ymax = initial_state_constraints[y] - 10, np.max(state_vals[1, :]) * 1.2

fig = plt.figure(figsize=(9, 9))
ax  = fig.add_subplot(111)
ax.set_xlim(xmin-1, xmax + 1.)
ax.set_ylim(ymin-1, ymax + 1.)
ax.set_aspect('equal')
ax.grid()

# Initialize the block
line1, = ax.plot([], [], color='red', marker='o', markersize=10)                                                    # the rocket
line4  = ax.quiver([], [], [], [], color='green', scale=1250, width=0.004, headwidth=8)                             # the force on the rocket
line5, = ax.plot([], [], color='red', lw=0.5)                                                                       # trace the rocket

# Function to update the plot for each animation frame
def update(t):
    message = (f'running time {t:.2f} sec \n The thrust is green')
    ax.set_title(message, fontsize=12)

    fb1 = state_sol(t)[5] * par_map[v0]
    coords = coords_lam(*state_sol(t), *input_sol(t), *pL_vals, fb1)

    line1.set_data([coords[0, 1]], [coords[1, 1]])
    line4.set_offsets([coords[0, 1], coords[1, 1]])
    line4.set_UVC(coords[0, 0] - coords[0, 1] , coords[1, 0] - coords[1, 1])

    add_point_to_data(line5, coords[0, 1], coords[1, 1])

    return line1, line4,

# Set labels and legend
ax.set_xlabel('X-axis')
ax.set_ylabel('Y-axis')

# Create the animation
animation = FuncAnimation(fig, update, frames=np.arange(t0, tf, 1 / fps), interval=fps, blit=False)
plt.close(fig)  # Prevents the final image from being displayed directly below the animation
# Show the plot
display(HTML(animation.to_jshtml()))
print(f'it took {time.time()-start:.1f} seconds to run the code.')    
moorepants commented 2 months ago

I just ran this code and the bounds seem to be respected: image

moorepants commented 2 months ago

image

moorepants commented 2 months ago

Bounds for strength are within 0 to infinity and bounds for q0 are +/- pi.

Peter230655 commented 2 months ago

image

..but here the bounds do not seem to be observed?

Peter230655 commented 2 months ago

I just ran this code and the bounds seem to be respected: image

Did you change anything on the code? (This was my early days of opty, I may have made even more mistakes then)

Peter230655 commented 2 months ago

I must have made a mistake in my plotting routine. When I use prob.plot_trajectories, all works fine. Sorry about this unnecessary issue!