giotto-ai / pyflagser

Python bindings and API for the flagser C++ library (https://github.com/luetge/flagser).
Other
13 stars 15 forks source link

Warnings and crashes for FlagserPersistence with n_jobs>1 #49

Closed wreise closed 3 years ago

wreise commented 3 years ago

Description

With FlagserPersistence (from giotto-tda), for n_jobs>1 , I get many notifications

"Error deleting flagser output file: No such file or directory"

and eventually a

TerminatedWorkerError: A worker process managed by the executor was unexpectedly terminated. This could be caused by a segmentation fault while calling the function or by an excessive memory usage causing the Operating System to kill the worker.
The exit codes of the workers are {UNKNOWN(255)}

It seems to be related to a memory issue, but I could not trace an increased usage (to my surprise, actually).

Steps/Code to Reproduce

import numpy as np

from gtda.point_clouds import ConsecutiveRescaling
from gtda.homology import FlagserPersistence

pc = np.random.randn(1000, 60, 2)
CR = ConsecutiveRescaling(factor=1.)
distance_matrices = CR.fit_transform(pc)
FlagserPersistence(n_jobs=9).fit_transform(distance_matrices)

Expected Results

An array of persistence diagrams.

Actual Results

---------------------------------------------------------------------------
TerminatedWorkerError                     Traceback (most recent call last)
<ipython-input-4-ee0d8999551c> in <module>
      2 CR = ConsecutiveRescaling(factor=1.)
      3 distance_matrices = CR.fit_transform(pc)
----> 4 FlagserPersistence(n_jobs=9).fit_transform(distance_matrices)

~/Libs/giotto-tda/gtda/utils/_docs.py in fit_transform_wrapper(*args, **kwargs)
    104         @wraps(original_fit_transform)
    105         def fit_transform_wrapper(*args, **kwargs):
--> 106             return original_fit_transform(*args, **kwargs)
    107         fit_transform_wrapper.__doc__ = \
    108             make_fit_transform_docs(fit_docs, transform_docs)

~/snap/miniconda3/envs/ts_rp/lib/python3.8/site-packages/sklearn/base.py in fit_transform(self, X, y, **fit_params)
    688         if y is None:
    689             # fit method of arity 1 (unsupervised transformation)
--> 690             return self.fit(X, **fit_params).transform(X)
    691         else:
    692             # fit method of arity 2 (supervised transformation)

~/Libs/giotto-tda/gtda/homology/simplicial.py in transform(self, X, y)
   1306         X = check_point_clouds(X, accept_sparse=True, distance_matrices=True)
   1307 
-> 1308         Xt = Parallel(n_jobs=self.n_jobs)(
   1309             delayed(self._flagser_diagram)(x) for x in X)
   1310 

~/snap/miniconda3/envs/ts_rp/lib/python3.8/site-packages/joblib/parallel.py in __call__(self, iterable)
   1059 
   1060             with self._backend.retrieval_context():
-> 1061                 self.retrieve()
   1062             # Make sure that we get a last message telling us we are done
   1063             elapsed_time = time.time() - self._start_time

~/snap/miniconda3/envs/ts_rp/lib/python3.8/site-packages/joblib/parallel.py in retrieve(self)
    938             try:
    939                 if getattr(self._backend, 'supports_timeout', False):
--> 940                     self._output.extend(job.get(timeout=self.timeout))
    941                 else:
    942                     self._output.extend(job.get())

~/snap/miniconda3/envs/ts_rp/lib/python3.8/site-packages/joblib/_parallel_backends.py in wrap_future_result(future, timeout)
    540         AsyncResults.get from multiprocessing."""
    541         try:
--> 542             return future.result(timeout=timeout)
    543         except CfTimeoutError as e:
    544             raise TimeoutError from e

~/snap/miniconda3/envs/ts_rp/lib/python3.8/concurrent/futures/_base.py in result(self, timeout)
    437                 raise CancelledError()
    438             elif self._state == FINISHED:
--> 439                 return self.__get_result()
    440             else:
    441                 raise TimeoutError()

~/snap/miniconda3/envs/ts_rp/lib/python3.8/concurrent/futures/_base.py in __get_result(self)
    386     def __get_result(self):
    387         if self._exception:
--> 388             raise self._exception
    389         else:
    390             return self._result

~/snap/miniconda3/envs/ts_rp/lib/python3.8/site-packages/joblib/externals/loky/_base.py in _invoke_callbacks(self)
    623         for callback in self._done_callbacks:
    624             try:
--> 625                 callback(self)
    626             except BaseException:
    627                 LOGGER.exception('exception calling callback for %r', self)

~/snap/miniconda3/envs/ts_rp/lib/python3.8/site-packages/joblib/parallel.py in __call__(self, out)
    364         with self.parallel._lock:
    365             if self.parallel._original_iterator is not None:
--> 366                 self.parallel.dispatch_next()
    367 
    368 

~/snap/miniconda3/envs/ts_rp/lib/python3.8/site-packages/joblib/parallel.py in dispatch_next(self)
    797 
    798         """
--> 799         if not self.dispatch_one_batch(self._original_iterator):
    800             self._iterating = False
    801             self._original_iterator = None

~/snap/miniconda3/envs/ts_rp/lib/python3.8/site-packages/joblib/parallel.py in dispatch_one_batch(self, iterator)
    864                 return False
    865             else:
--> 866                 self._dispatch(tasks)
    867                 return True
    868 

~/snap/miniconda3/envs/ts_rp/lib/python3.8/site-packages/joblib/parallel.py in _dispatch(self, batch)
    782         with self._lock:
    783             job_idx = len(self._jobs)
--> 784             job = self._backend.apply_async(batch, callback=cb)
    785             # A job can complete so quickly than its callback is
    786             # called before we get here, causing self._jobs to

~/snap/miniconda3/envs/ts_rp/lib/python3.8/site-packages/joblib/_parallel_backends.py in apply_async(self, func, callback)
    529     def apply_async(self, func, callback=None):
    530         """Schedule a func to be run"""
--> 531         future = self._workers.submit(SafeFunction(func))
    532         future.get = functools.partial(self.wrap_future_result, future)
    533         if callback is not None:

~/snap/miniconda3/envs/ts_rp/lib/python3.8/site-packages/joblib/externals/loky/reusable_executor.py in submit(self, fn, *args, **kwargs)
    175     def submit(self, fn, *args, **kwargs):
    176         with self._submit_resize_lock:
--> 177             return super(_ReusablePoolExecutor, self).submit(
    178                 fn, *args, **kwargs)
    179 

~/snap/miniconda3/envs/ts_rp/lib/python3.8/site-packages/joblib/externals/loky/process_executor.py in submit(self, fn, *args, **kwargs)
   1100         with self._flags.shutdown_lock:
   1101             if self._flags.broken is not None:
-> 1102                 raise self._flags.broken
   1103             if self._flags.shutdown:
   1104                 raise ShutdownExecutorError(

TerminatedWorkerError: A worker process managed by the executor was unexpectedly terminated. This could be caused by a segmentation fault while calling the function or by an excessive memory usage causing the Operating System to kill the worker.

The exit codes of the workers are {UNKNOWN(255), UNKNOWN(255)}

Versions

Linux-5.0.0-1070-oem-osp1-x86_64-with-glibc2.10 Python 3.8.5 (default, Sep 4 2020, 07:30:14) [GCC 7.3.0] pyflagser 0.4.1

MonkeyBreaker commented 3 years ago

Hi everyone,

Thank you @ulupo for assigning me, looking at the issue, the problem was quite evident. I never started multiples instances of Flagser at the same time, and thanks to giotto-tda, this is quite easy to do ... :D

As the message reports, we always creates the same file for each instance of the run, this produces all the errors. I'll prepare a PR fixing this.

wreise commented 3 years ago

Thanks, @MonkeyBreaker !