CATIA-Systems / FMPy

Simulate Functional Mockup Units (FMUs) in Python
Other
427 stars 117 forks source link

Problems with multiprocessing caused by the zipfile module #26

Open wulmer opened 6 years ago

wulmer commented 6 years ago

I was investigating on a really strange issue today: I was using fmpy in our project together with a larger multiprocessing setup (doing parallel simulations) and I encountered a strange error (which was hard to debug due to multiprocess stuff) when running on Linux (funnily, Windows worked). The result was a BrokenProcessPool error or a queue.Full error. After several hours of slicing away code line by code line, I ended up with the following script:

def use_fmpy(*args):
    import fmpy
    return fmpy.read_model_description('bouncingBall_me.fmu') 

if __name__ == "__main__":
    import tables  ##### TRIGGER HERE

    import concurrent.futures as cf
    with cf.ProcessPoolExecutor(max_workers=2) as pool:
        futures = []
        for i in range(10):
            future = pool.submit(use_fmpy, None)
            futures.append(future)
        cf.wait(futures)

What made it even harder was the fact that the import tables line triggered the error in the multiprocessing subprocesses. With the pytables import I got the error, without pytables, the error wasn't appearing.

So I first suspected pytables for being the source of the problem, but after a while I took a closer look at the read_model_description function. (BTW I tried out all available fmpy versions) The first lines of this function look like this:

https://github.com/CATIA-Systems/FMPy/blob/35245dd8bf2b8be2a8e750b04bd29df4557d3032/fmpy/model_description.py#L210-L212

It uses the zipfile module to read the modelDescription.xml file from the FMU file. Putting a return just before this code block made my example above work in all cases, so it must have had to do with the zipfile use.

Googling brought immediately a possible issue: It seems that the zipfile module is not threadsafe. So my idea was to replace the zipfile use by a call to unzip:

import subprocess
root = etree.fromstring(subprocess.check_output(
    ['unzip', '-p', filename, 'modelDescription.xml'])) 

And surprise: it worked! No more problems with BrokenProcessPool or queue.Full errors.

So my question here: would it be possible that you replace all the zipfile usages by something more robust? I would really love to use fmpy instead of pyfmi, but this issue cost me already 5 hours of debugging and it is currently a blocker for us.

t-sommer commented 6 years ago

Thanks a lot for digging into this and the detailed description! Can you please try if e32601c solves your problem? It adds a lock to synchronize the section that causes the problem so only one thread can access it at a time.

wulmer commented 6 years ago

Thanks. I tried your version with the thread lock, however with no change. The process still crashes. I then tried different Python versions, went from 3.6.2, 3.6.1, 3.6.0 down to 3.5.1. At least the 3.5 version gave me a more helpful C stacktrace. Here is the stacktrace of the crash in 3.5.1 (no difference with or without the thread lock):

*** glibc detected *** python: free(): invalid pointer: 0x00002aab8a4316a0 ***
======= Backtrace: =========
/lib64/libc.so.6(+0x75dee)[0x2aab8afaadee]
/lib64/libc.so.6(+0x78c3d)[0x2aab8afadc3d]
/home/.../lib/python3.5/site-packages/tables/utilsextension.cpython-35m-x86_64-linux-gnu.so(inflateReset2+0x80)[0x2aab91608f50]
/home/.../lib/python3.5/site-packages/tables/../../.././libz.so.1(inflateInit2_+0xb1)[0x2aab922b3421]
/home/.../lib/python3.5/lib-dynload/zlib.cpython-35m-x86_64-linux-gnu.so(+0x42a1)[0x2aab964192a1]
/home/.../lib/libpython3.5m.so.1.0(PyCFunction_Call+0x131)[0x2aab8a02e1e1]
...

I can now see the connection to the tables package -- somehow the tables package gets mixed up with the libz.so.1 library. I'll try to find out more on the tables internals and how they affect the zipfile behaviour.

wulmer commented 6 years ago

Ok, the problem doesn't appear if I import zipfile before tables (at least not in my minimal example script). It seems the tables package is preloading a few compression libraries and somehow this interferes with the use of the zipfile module.

I tried to use this solution in our full project setup and tried to import zipfile right at the beginning. However this somehow fails. I can't make fmpy work with our library where we make use of tables and a lot of other packages. I'll try to find another Linux machine where I can run the same tests. Let's see if the problem lies in the Linux system I am using.

t-sommer commented 6 years ago

While it absolutely makes sens to synchronize calls to zipfile, is it possible that PyTables itself is not thread save?

t-sommer commented 6 years ago

Although it's not a direct solution for your problem you might want to try Dask to distribute the work (like in parameter_variation.py). It allows you to switch between multi-processesing, multi-threading and synchronized execution for debugging.

wulmer commented 6 years ago

I will temporarily disable Linux multiprocessing support for FMUs in our application and hope that the issue will resolve by itself soon. Thanks for the support.

Sonyoyo commented 1 year ago

I think I am getting a similar issue with parallelization and fmpy. My scénario is quite complicated since I am running "nested parallelization" via joblib on a Linux cluster.

In fact, I run in parallel a list of main tasks. Then, each tasks, at some point, start a new set of parallelized sub-tasks. Here, multiple fmpy runs are performed in series.

And I then get a FileNotFoundError stating that the .fmu file is not there. This error does not appear if I use just 1 jobs for each parallelization (so that no parallel process is run) : the .fmu is at the right address. That's why I am assuming that some conflicts may appears because of the parallelization.

Can you see why?

I can try to generate a simple code to generate the error if needed.

t-sommer commented 1 year ago

I can try to generate a simple code to generate the error if needed.

Yes, please.

t-sommer commented 1 year ago

Have you tried to extract the FMU before passing it to FMPy with simulate_fmu(filename='/path/to/extracted/fmu') and moving the import zipfile into the else clause? This way you could avoid the import of zipfile altogether.

Sonyoyo commented 1 year ago

Here follow my simple code to reproduce it

import numpy as np
import pandas as pd

from fmpy import simulate_fmu
from joblib import Parallel, delayed

fmu_filename = 'path/to/fmu'

def fmu_model(data,tstep) :
    data.index = np.arange(data.shape[0])*tstep
    rec = data.to_records()

    res = simulate_fmu(
        filename = fmu_filename,
        start_time = 0,
        stop_time = tstep*(data.shape[0]-1),
        input = rec,
        output_interval=tstep,
    )

    res = pd.DataFrame(res,index=data.index)

    return res

def nested_parallel(n_jobs) :
    Parallel(n_jobs=n_jobs, verbose=11)(delayed(inner_loop)() for i in range(100))

def inner_loop() :
    #Generating some fake input data for the fmu
    data = np.stack([
        np.linspace(1.e-3, 1., 1000),
        -0.01 * np.ones(1000)
    ])
    data = pd.DataFrame(data, index=['Wrel','DI']).T
    for i in range(100) :
        fmu_model(data,100)

if __name__ == "__main__":

    n_jobs = 16
    inner_jobs = 16

    Parallel(n_jobs=n_jobs, verbose=11)(delayed(nested_parallel)(inner_jobs) for i in range(100))

The error I get is the following:

Traceback (most recent call last):
  File "/home/d24676/anaconda3/envs/ml_cronos/lib/python3.10/site-packages/joblib/externals/loky/process_executor.py", line 428, in _process_worker
    r = call_item()
  File "/home/d24676/anaconda3/envs/ml_cronos/lib/python3.10/site-packages/joblib/externals/loky/process_executor.py", line 275, in __call__
    return self.fn(*self.args, **self.kwargs)
  File "/home/d24676/anaconda3/envs/ml_cronos/lib/python3.10/site-packages/joblib/_parallel_backends.py", line 620, in __call__
    return self.func(*args, **kwargs)
  File "/home/d24676/anaconda3/envs/ml_cronos/lib/python3.10/site-packages/joblib/parallel.py", line 288, in __call__
    return [func(*args, **kwargs)
  File "/home/d24676/anaconda3/envs/ml_cronos/lib/python3.10/site-packages/joblib/parallel.py", line 288, in <listcomp>
    return [func(*args, **kwargs)
  File "/fscronos/home/d24676/WORK/PYB/coreprediction/scripts/test_double_parallel.py", line 40, in nested_parallel
    Parallel(n_jobs=n_jobs, verbose=11)(delayed(inner_loop)() for i in range(100))
  File "/home/d24676/anaconda3/envs/ml_cronos/lib/python3.10/site-packages/joblib/parallel.py", line 1098, in __call__
    self.retrieve()
  File "/home/d24676/anaconda3/envs/ml_cronos/lib/python3.10/site-packages/joblib/parallel.py", line 975, in retrieve
    self._output.extend(job.get(timeout=self.timeout))
  File "/home/d24676/anaconda3/envs/ml_cronos/lib/python3.10/multiprocessing/pool.py", line 774, in get
    raise self._value
  File "/home/d24676/anaconda3/envs/ml_cronos/lib/python3.10/multiprocessing/pool.py", line 125, in worker
    result = (True, func(*args, **kwds))
  File "/home/d24676/anaconda3/envs/ml_cronos/lib/python3.10/site-packages/joblib/_parallel_backends.py", line 620, in __call__
    return self.func(*args, **kwargs)
  File "/home/d24676/anaconda3/envs/ml_cronos/lib/python3.10/site-packages/joblib/parallel.py", line 288, in __call__
    return [func(*args, **kwargs)
  File "/home/d24676/anaconda3/envs/ml_cronos/lib/python3.10/site-packages/joblib/parallel.py", line 288, in <listcomp>
    return [func(*args, **kwargs)
  File "/fscronos/home/d24676/WORK/PYB/coreprediction/scripts/test_double_parallel.py", line 49, in inner_loop
    fmu_model(data,100)
  File "/fscronos/home/d24676/WORK/PYB/coreprediction/scripts/test_double_parallel.py", line 26, in fmu_model
    res = simulate_fmu(
  File "/home/d24676/anaconda3/envs/ml_cronos/lib/python3.10/site-packages/fmpy/simulation.py", line 731, in simulate_fmu
    tempdir = extract(filename, include=None if remote_platform else lambda n: n.startswith(tuple(required_paths)))
  File "/home/d24676/anaconda3/envs/ml_cronos/lib/python3.10/site-packages/fmpy/__init__.py", line 193, in extract
    with zipfile.ZipFile(filename, 'r') as zf:
  File "/home/d24676/anaconda3/envs/ml_cronos/lib/python3.10/zipfile.py", line 1249, in __init__
    self.fp = io.open(file, filemode)
FileNotFoundError: [Errno 2] No such file or directory: '../annexes/Xenon_2P_linux.fmu'
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/fscronos/home/d24676/WORK/PYB/coreprediction/scripts/test_double_parallel.py", line 57, in <module>
    Parallel(n_jobs=n_jobs, verbose=11)(delayed(nested_parallel)(inner_jobs) for i in range(100))
  File "/home/d24676/anaconda3/envs/ml_cronos/lib/python3.10/site-packages/joblib/parallel.py", line 1098, in __call__
    self.retrieve()
  File "/home/d24676/anaconda3/envs/ml_cronos/lib/python3.10/site-packages/joblib/parallel.py", line 975, in retrieve
    self._output.extend(job.get(timeout=self.timeout))
  File "/home/d24676/anaconda3/envs/ml_cronos/lib/python3.10/site-packages/joblib/_parallel_backends.py", line 567, in wrap_future_result
    return future.result(timeout=timeout)
  File "/home/d24676/anaconda3/envs/ml_cronos/lib/python3.10/concurrent/futures/_base.py", line 458, in result
    return self.__get_result()
  File "/home/d24676/anaconda3/envs/ml_cronos/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
FileNotFoundError: [Errno 2] No such file or directory: '../annexes/Xenon_2P_linux.fmu'
/home/d24676/anaconda3/envs/ml_cronos/lib/python3.10/site-packages/joblib/externals/loky/backend/resource_tracker.py:310: UserWarning: resource_tracker: There appear 
to be 124 leaked semlock objects to clean up at shutdown
  warnings.warn(

The main cause looks to be:

FileNotFoundError: [Errno 2] No such file or directory: '../annexes/Xenon_2P_linux.fmu'

...so my fmu is not found. The error does not appear if only one process is run. And even with one single set of parallelized jobs it looks like everything is fine...

Sonyoyo commented 1 year ago

Have you tried to extract the FMU before passing it to FMPy with simulate_fmu(filename='/path/to/extracted/fmu') and moving the import zipfile into the else clause? This way you could avoid the import of zipfile altogether.

I do not understand what you mean. How can I extract the FMU in advance? I could do that once so that I can pass the extracted-fmu to all the processes? I do not know of what 'else' clause are you talking about.

In any case, thanks for your help.

t-sommer commented 1 year ago

Have a look at https://github.com/CATIA-Systems/FMPy/blob/main/fmpy/examples/efficient_loops.py where the FMU (it's just a ZIP file) is extracted and the unzipdir is passed to simulate_fmu(). In this case zipfile should not be used (see https://github.com/CATIA-Systems/FMPy/blob/main/fmpy/model_description.py#L432).

t-sommer commented 1 year ago

@Sonyoyo, can you also share an FMU and some data to run the above example?

Sonyoyo commented 1 year ago

(sorry for my late answer, I was unavailable)

Have a look at https://github.com/CATIA-Systems/FMPy/blob/main/fmpy/examples/efficient_loops.py where the FMU (it's just a ZIP file) is extracted and the unzipdir is passed to simulate_fmu(). In this case zipfile should not be used (see https://github.com/CATIA-Systems/FMPy/blob/main/fmpy/model_description.py#L432)

Thanks for pointing out this example, at least it provides a performance gain. :) However, in this case I run into a different error :

ValueError: ctypes objects containing pointers cannot be pickled joblib

I will try to share a fmu here (even if I think that any fmu would behave in the same way). In any case, using nested jolib parallelization looks like it leads to some conflicts by the way. For the moment, I switched off one of them and everything works fine...