CATIA-Systems / FMPy

Simulate Functional Mockup Units (FMUs) in Python
Other
424 stars 117 forks source link

Run FMU in parallel and avoid initialization for unscented Kalman Filter? #565

Open brazilbruno opened 1 year ago

brazilbruno commented 1 year ago

I have tried looking for an answer, but haven't found anything, so if this question has already been asked, I apologize.

I am trying to run the UKF on a Co-Simulation FMU exported from Dymola. The algorithm is starting to become a little slow, since there are many states "n", and therefore "2n+1" simulations need to be done for every time-step. I want to run the simulations in parallel, but this is my first time trying parallelization, so I apologize. I saw an example parameter_variation.py, but in this example, for every run the FMU is initialized. Is it possible to perform this with an already initialized FMU? Maybe by initializing "2n+1" FMUs beforehand and passing it to the function?

Thanks for the help.

t-sommer commented 1 year ago

If the FMU supports it, you can get the FMU state after the initialization and then set it before every iteration (see also https://github.com/CATIA-Systems/FMPy/blob/main/tests/test_fmu_state.py).

brazilbruno commented 1 year ago

Sure, that works in the example I stated above as well. The problem is that for every function call of "simulate_fmu()", I instantiate the fmu every time. I haven't found a way to give an already instantiated FMU as an input to the function, because, if I understand correctly, the FMU can't be pickled.

Is there a workaround?

m-kormann commented 1 year ago

It sounds like you want to create your A matrix in each operating point by generating difference quotients for all states evaluating the FMU.

Why don't you use the Dymola Python interface, set the start values of all states as initial equations with the values of the current operating point and linearize the system with the linearizeModel or Modelica_LinearSystems2.ModelAnalysis.Linearize command? Then you would get the analytical result.

brazilbruno commented 1 year ago

Dear @m-kormann, I am not sure I understand your suggestion. For the extended Kalman filter, this makes sense. But for the UKF, where you need to simulate the FMU at every time step for multiple times, it should be advantageous to parallelize the simulation at each time point. I don't understand your suggestion exactly, could you elaborate a little?

t-sommer commented 1 year ago

I haven't found a way to give an already instantiated FMU as an input to the function, because, if I understand correctly, the FMU can't be pickled.

You can pass an FMU instance to simulate_fmu() (see examples/continue_simulation.py for an example).

brazilbruno commented 1 year ago

@t-sommer I see that, but when creating a list of tuples with the necessary inputs for the function and trying to use the starmap function of multiprocessing.Pool(), it still says that the function can't be pickled.

t-sommer commented 1 year ago

Can you share some code and an FMU to reproduce the problem?

brazilbruno commented 1 year ago

Here a modified version of this example that I was hopeful would work

""" Parallel example modified
"""

import dask
from dask import bag, array
from dask.diagnostics import ProgressBar
import numpy as np
import fmpy
from fmpy.fmi2 import FMU2Slave
from fmpy import read_model_description, platform
from fmpy.util import download_test_file
import shutil

fmu_filename = 'Rectifier.fmu'

# define the parameter space for the variation
v_ac = [100, 150, 200, 250, 300, 350, 400, 450, 500]  # AC voltage [V]
i_dc = [20, 30, 40, 50, 60, 70, 80, 90, 100]          # DC current [A]

sync = False  # synchronized execution (for debugging)

stop_time = 0.1

dll_handle = None

n_chunks = 10  # number of chunks to divide the workload

# create the parameters
V_AC, I_DC = np.meshgrid(v_ac, i_dc, indexing='ij')

def simulate_fmu(*args):
    """ Worker function that simulates the FMU

    Parameters:
        args = [indices, fmu_args, start_vrs, result_vrs]

        indices     a list of indices into the parameter arrays
        fmu_args    FMU constructor arguments

    Returns:
        a list of tuples (i, [u_dc, losses]) that contain the index 'i' and the averaged results of the
        'uDC' and 'Losses' variables
    """

    indices, fmu, state_before, start_vrs, result_vrs = args

    zipped = []

    # global fmu_args, start_vrs, result_vrs, dll_handle

    # iterate over the all indices in this batch
    for i in indices:
        fmu.setFMUstate(state_before)
        # get the start values for the current index
        start_values = [V_AC[i], I_DC[i]]

        # set the start values
        fmu.setReal(vr=start_vrs, value=start_values)

        time = 0.0
        step_size = 1e-4

        results = []

        # simulation loop
        while time < stop_time:
            fmu.doStep(currentCommunicationPoint=time, communicationStepSize=step_size)
            time += step_size

            if time > 0.02:
                result = fmu.getReal(result_vrs)
                results.append(result)

        u_dc, losses = zip(*results)

        # store the index and the averaged signals
        zipped.append((i, [np.average(u_dc), np.average(losses)]))
    fmu.freeFMUstate(state_before)
    fmu.terminate()

    # call the FMI API directly to avoid unloading the share library
    fmu.fmi2FreeInstance(fmu.component)

    if sync:
        # remember the shared library handle so we can unload it later
        global dll_handle
        dll_handle = fmu.dll._handle
    else:
        # unload the shared library directly
        fmpy.freeLibrary(fmu.dll._handle)

    return zipped

def run_experiment(show_plot=True):

    if platform not in ['win32', 'win64']:
        raise Exception("Rectifier.fmu is only available for Windows")

    print("Parameter variation on %s:" % fmu_filename)
    print("  VAC", v_ac)
    print("  IDC", i_dc)

    if sync:
        dask.config.set(scheduler='synchronous')  # synchronized scheduler

    # download the FMU
    download_test_file('2.0', 'CoSimulation', 'Dymola', '2017', 'Rectifier', fmu_filename)

    # read the model description
    model_description = read_model_description(fmu_filename)

    # collect the value references for the variables to read / write
    vrs = {}
    for variable in model_description.modelVariables:
        vrs[variable.name] = variable.valueReference

    # extract the FMU
    unzipdir = fmpy.extract(fmu_filename)

    fmu_args = {'guid': model_description.guid,
                'modelIdentifier': model_description.coSimulation.modelIdentifier,
                'unzipDirectory': unzipdir}

    # get the value references for the start and output values
    start_vrs = [vrs['VAC'], vrs['IDC']]
    result_vrs = [vrs['uDC'], vrs['Losses']]

    indices = list(np.ndindex(I_DC.shape))

    fmu = FMU2Slave(**fmu_args)

    fmu.instantiate()

    fmu.setupExperiment()
    fmu.enterInitializationMode()
    fmu.exitInitializationMode()
    state_before = fmu.getFMUstate()

    print("Running %d simulations (%d chunks)..." % (V_AC.size, n_chunks))
    with ProgressBar():
        # calculate the losses for every chunk
        b = bag.from_sequence(indices, npartitions=n_chunks)
        results = b.map_partitions(simulate_fmu, fmu, state_before, start_vrs, result_vrs).compute()

    print(results)
    LOSSES = np.zeros_like(V_AC)

    # put the results together
    for i, res in results:
        LOSSES[i] = res[1]
    print(LOSSES)
    # unload the shared library
    if sync:
        while True:
            try:
                fmpy.freeLibrary(dll_handle)
            except:
                break

    # clean up
    shutil.rmtree(unzipdir, ignore_errors=True)

    if show_plot:
        print("Plotting results...")

        import matplotlib.pyplot as plt

        figure = plt.figure()
        figure.patch.set_facecolor('white')
        ax = figure.add_subplot(1, 1, 1)

        CS = plt.contourf(V_AC, I_DC, LOSSES, 10)
        plt.colorbar(CS, aspect=30)

        CS = ax.contour(V_AC, I_DC, LOSSES, 10, colors='k', linewidths=0.8)
        ax.clabel(CS=CS, fmt='%.0f', fontsize=9, inline=1)

        ax.set_title('Losses / W')
        ax.set_xlabel('AC Voltage / V')
        ax.set_ylabel('DC Current / A')

        plt.show()
    else:
        print("Plotting disabled")

    print("Done.")

    return LOSSES

if __name__ == '__main__':

    run_experiment()
brazilbruno commented 1 year ago

And here is another example, this time trying to use the recommended simulate_fmu() function:

import numpy as np
import fmpy
from fmpy import read_model_description, extract, simulate_fmu, instantiate_fmu
from fmpy.fmi2 import FMU2Slave
from fmpy.util import plot_result, download_test_file
import os
from numba import njit, jit
import multiprocessing as mp

n_fmu = 2
fmu_filename = 'Rectifier.fmu'
x0 = [100,20]
input_names = ['VAC','IDC']
download_test_file('2.0', 'CoSimulation', 'Dymola', '2017', 'Rectifier', fmu_filename)
model_description = read_model_description(fmu_filename)

# Creating input dict
input_dict = {}

for i in range(len(input_names)):
    input_dict[input_names[i]] = x0[i]

# extract the FMU
unzipdir = fmpy.extract(fmu_filename)

"""fmu_args = {'guid': model_description.guid,
                'modelIdentifier': model_description.coSimulation.modelIdentifier,
                'unzipDirectory': unzipdir}"""

fmu_instances = []
state_before = []

for i in range(n_fmu):
    name = f"instance{i}"
    """fmu_instances.append(FMU2Slave(guid=model_description.guid, \
                        unzipDirectory=unzipdir, \
                        modelIdentifier=model_description.coSimulation.modelIdentifier, \
                        instanceName=name))"""
    fmu_instances.append(instantiate_fmu(unzipdir=unzipdir, model_description=model_description))

    fmu_instances[i].setupExperiment()
    fmu_instances[i].enterInitializationMode()
    fmu_instances[i].exitInitializationMode()
    state_before.append(fmu_instances[i].getFMUstate())

inputs1 = (unzipdir, False, None, 1.0, 'CVode', None, None, \
        None, \
        True, \
        None, \
        input_dict, \
       False, \
        None, \
       None, \
       None, \
        False, \
        False, \
        None, \
        None, \
        None, \
        model_description, \
        fmu_instances[0], \
        False, \
        'auto', \
        False, \
        False, \
        False, \
        True, \
        state_before[0])

inputs2 = (unzipdir, False, None, 1.0, 'CVode', None, None, \
        None, \
        True, \
        None, \
        input_dict, \
       False, \
        None, \
       None, \
       None, \
        False, \
        False, \
        None, \
        None, \
        None, \
        model_description, \
        fmu_instances[1], \
        False, \
        'auto', \
        False, \
        False, \
        False, \
        True, \
        state_before[1])

input_tuple_list = [inputs1, inputs2]

if __name__ == '__main__':
    #mp.freeze_support()
    pool = mp.Pool()
    results = pool.starmap(simulate_fmu, input_tuple_list)
t-sommer commented 1 year ago

You should be able to adapt this example for your use case. It stores the FMU instance as an attribute of the function to be executed, so there is exactly one instance per process that does not need to be pickled.

jonathanwvd commented 4 months ago

Interesting. I have a very similar application where I need to linearize a model for a Kalman Filter. Using the library pyfmi, I can do this using the function get_state_space_representation. I am transcribing my code into fmpy. Is there an easy way to do the same?

@brazilbruno, could you tell me more about your application?