lululxvi / deeponet

Learning nonlinear operators via DeepONet
Other
474 stars 130 forks source link

How to improve the result #39

Open yejiwi opened 12 months ago

yejiwi commented 12 months ago

Hi, I am working on deeponet_pde.py code to learn the operator from u -> I for ODE dI/dt = u gamma (1 - I) I - gamma I. In deeponet_pde.py, I basically only modified the ode_system(T) but the result is really bad even I increase the number of training and test set. At this point, I think maybe the ode I am working with is not proper ode for the code. I am not sure what to modify.

Here is my code below. Any suggestion will be appreciated!


from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import itertools

import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt

import deepxde as dde
from spaces import FinitePowerSeries, FiniteChebyshev, GRF
from system import LTSystem, ODESystem, DRSystem, CVCSystem, ADVDSystem
from utils import merge_values, trim_to_65535, mean_squared_error_outlier, safe_test

import scipy
from scipy.integrate import solve_ivp

# Define the length of period
T = 29

def test_u_ode(nn, system, T, m, model, data, u, fname, num=100):
    """Test ODE"""

    sensors = np.linspace(0, T, num=m)[:, None]
    sensor_values = u(sensors)

    x = np.linspace(0, T, num=num)[:, None]
    X_test = [np.tile(sensor_values.T, (num, 1)), x]
    y_test = system.eval_s_func(u, x)

    if nn != "opnn":
        X_test = merge_values(X_test)

    y_pred = model.predict(data.transform_inputs(X_test))
    np.savetxt(fname, np.hstack((x, y_test, y_pred)))
    print("L2relative error:", dde.metrics.l2_relative_error(y_test, y_pred))

    return X_test[1], y_pred

def ode_system(T):

    def g(s, u, x):

        gamma = 0.1

        return u * gamma * (1-s) * s - gamma * s

    s0 = [0.1] 

    return ODESystem(g, s0, T)

def solve_ode(u, t_range):
   """
   Solve the ordinary differential equation (ODE) dI/dt = u * gamma * (1 - I) * I - gamma * I
   over the range of `t` values specified by `t_range`. The solution is used
   to compare with the prediction of DeepONet.
   """
   # Define the initial condition of I
   gamma = 0.1

   I_0 = 0.1

   def ode_func(t, I):
       return u * gamma * (1 - I) * I - gamma * I

   sol = solve_ivp(ode_func, t_range, [I_0], dense_output=True)

   t = sol.t
   I = sol.y[0]

   return t, I

t_range = [0, T]  # Start and end time

def run(problem, system, space, T, m, nn, net, lr, epochs, num_train, num_test):
    # space_test = GRF(1, length_scale=0.1, N=1000, interp="cubic")

    X_train, y_train = system.gen_operator_data(space, m, num_train)

    X_test, y_test = system.gen_operator_data(space, m, num_test)
    if nn != "opnn":
        X_train = merge_values(X_train)
        X_test = merge_values(X_test)

    X_test_trim = trim_to_65535(X_test)[0]
    y_test_trim = trim_to_65535(y_test)[0]

    if nn == "opnn":
        data = dde.data.OpDataSet(
            X_train=X_train, y_train=y_train, X_test=X_test_trim, y_test=y_test_trim
        )
    else:
        data = dde.data.DataSet(
            X_train=X_train, y_train=y_train, X_test=X_test_trim, y_test=y_test_trim
        )

    model = dde.Model(data, net)
    model.compile("adam", lr=lr, metrics=[mean_squared_error_outlier])
    checker = dde.callbacks.ModelCheckpoint(
        "model/model.ckpt", save_better_only=True, period=10
    )
    losshistory, train_state = model.train(epochs=epochs, callbacks=[checker])
    print("# Parameters:", np.sum([np.prod(v.get_shape().as_list()) for v in tf.compat.v1.trainable_variables()]))
    dde.saveplot(losshistory, train_state, issave=True, isplot=True)

    model.restore("model/model.ckpt-" + str(train_state.best_step), verbose=1)
    safe_test(model, data, X_test, y_test)

    # Function to test the trained model

    u1 = 2.0
    u2 = 1.0

    t1,I1 = solve_ode(u1, t_range)
    t2,I2 = solve_ode(u2, t_range)

    tests = [
        (lambda x: u1*np.ones_like(x), "x1.dat", t1, I1),
        (lambda x: u2*np.ones_like(x), "x2.dat", t2, I2),
    ]

    for u, fname, x, sol_s in tests:

        if problem == "ode":

            # Predicted solution of trained model
            x_pred, y_pred = test_u_ode(nn, system, T, m, model, data, u, fname)

            plt.figure()
            plt.title('%s' %fname)
            plt.plot(x_pred,y_pred,label = 'Predicted')
            plt.plot(x,sol_s, label = 'True')
            plt.xlabel('x')
            plt.ylabel('s(x)')
            plt.legend()

def main():

    problem = "ode"

    if problem == "ode":
        system = ode_system(T)

    # Function space

    space = GRF(T, length_scale=0.1, N=3000, interp="cubic")

    # Hyperparameters

    # Number of sensors
    m = 15

    # Number of training set
    num_train = 500

    # Number of test set
    num_test = 500

    # Learning rate 
    lr = 0.01

    # Number of epochs
    epochs = 100

    # Network
    nn = "opnn"
    activation = "relu"
    initializer = "Glorot normal"  # "He normal" or "Glorot normal"
    dim_x = 1 
    if nn == "opnn":
        net = dde.maps.OpNN(
            [m, 40, 40],
            [dim_x, 40, 40],
            activation,
            initializer,
            use_bias=True,
            stacked=False,
        )
    elif nn == "fnn":
        net = dde.maps.FNN([m + dim_x] + [100] * 2 + [1], activation, initializer)
    elif nn == "resnet":
        net = dde.maps.ResNet(m + dim_x, 1, 128, 2, activation, initializer)

    run(problem, system, space, T, m, nn, net, lr, epochs, num_train, num_test)

if __name__ == "__main__":
    main()

And this is the prediction.

image