LibraryOfCongress / bagit-python

Work with BagIt packages from Python.
http://libraryofcongress.github.io/bagit-python
216 stars 85 forks source link

bag.validate(processes=4) creates processes that escape their context #173

Open urzbs opened 6 months ago

urzbs commented 6 months ago

Hey, we building a large scale data preservation service, and we are using bagit for data validation purposes.

we currently run into the issue that multiprocessed bagit.validate runs sometimes into an UnboundLocalError due to pool not being referenced when a issue of assigning the multiprocessing map raises. Issues like this can have multiple reasons, for example on OS level.

Problem is hard to reproduce and does not happen always, it is very dependent on CPU and OS.

The way it is handled is not good, and there are two major issues with the current implementation: https://github.com/LibraryOfCongress/bagit-python/blob/master/bagit.py#L886

        try:
            if processes == 1:
                hash_results = [_calc_hashes(i) for i in args]
            else:
                try:
                    pool = multiprocessing.Pool(
                        processes if processes else None, initializer=worker_init
                    )
                    hash_results = pool.map(_calc_hashes, args)
                finally:
                    pool.terminate()

CRITICAL Context Issue:

due to the abscence of pool.join() processes created by map do not wait for completion of their co-processes and run completly seperate. In summary, pool.join() is crucial for synchronizing the main program with the completion of tasks in the pool and for ensuring proper cleanup of resources associated with the pool. By not using pool.join() each process will interfere with the main program and causing unexpected behaviour.

Issue 2:

calling upon pool.terminate even if it is possible that pool does not exist.

suggested solution:

try:
    if processes == 1:
        hash_results = [_calc_hashes(i) for i in args]
    else:
        try:
            with multiprocessing.Pool(processes if processes else None, initializer=worker_init) as pool:
                hash_results = pool.map(_calc_hashes, args)
        except Exception as e:
            # Handle exceptions if needed
            print("An error occurred:", e)

when using with python ensures that all processes created by pool stay contained. It is also handling Termination/Cleanup and Join automatically.

how we currently call bag.validate:

try:
    bag.validate(processes=4)
except Exception as e:
    logger.error(e)
    raise Exception(e)

logger.info(f"successfully validated bag") # this message is triggered multiple times when UnboundLocalError happens. This should not happen
#...

our traceback:

Traceback (most recent call last): File "/home/user/venv/lib/python3.9/site-packages/bagit.py", line 881, in _validate_entries pool = multiprocessing.Pool( File "/usr/lib/python3.9/multiprocessing/context.py", line 119, in Pool return Pool(processes, initializer, initargs, maxtasksperchild, File "/usr/lib/python3.9/multiprocessing/pool.py", line 212, in __init__ self._repopulate_pool() File "/usr/lib/python3.9/multiprocessing/pool.py", line 303, in _repopulate_pool return self._repopulate_pool_static(self._ctx, self.Process, File "/usr/lib/python3.9/multiprocessing/pool.py", line 326, in _repopulate_pool_static w.start() File "/usr/lib/python3.9/multiprocessing/process.py", line 121, in start self._popen = self._Popen(self) File "/usr/lib/python3.9/multiprocessing/context.py", line 277, in _Popen return Popen(process_obj) File "/usr/lib/python3.9/multiprocessing/popen_fork.py", line 19, in __init__ self._launch(process_obj) File "/usr/lib/python3.9/multiprocessing/popen_fork.py", line 73, in _launch os._exit(code) File "/home/user/repo/myproject/src/lib/generic_service.py", line 97, in signal_handler sys.exit() SystemExit During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/home/user/repo/myproject/src/lib/utils/helpers.py", line 112, in validate_bag bag.validate(processes=processes) File "/home/user/venv/lib/python3.9/site-packages/bagit.py", line 602, in validate self._validate_contents( File "/home/user/venv/lib/python3.9/site-packages/bagit.py", line 790, in _validate_contents self._validate_entries(processes) File "/home/user/venv/lib/python3.9/site-packages/bagit.py", line 886, in _validate_entries pool.terminate() UnboundLocalError: local variable 'pool' referenced before assignment