zarr-developers / numcodecs

A Python package providing buffer compression and transformation codecs for use in data storage and communication applications.
http://numcodecs.readthedocs.io
MIT License
127 stars 88 forks source link

numcodecs.blosc mutex leaked semaphore warning #230

Open rc-conway opened 4 years ago

rc-conway commented 4 years ago

Minimal, reproducible code sample, a copy-pastable example if possible

import zarr

Problem description

Using multiprocessing in a cython context, if zarr is in the import space I get a leaked semaphore warning.

I suspect the issue is the global mutex variable (the lock) in numcodecs.blosc is possibly being garbage collected prior to the multiprocessing finalizer getting called, so the weakref is dropped and the lock is never unregistered. This is an issue when used in a multiprocessing environment. This results in a leaked semaphore warning for me. I don't think there's a functional issue but I would like the warning to go away.

https://github.com/zarr-developers/numcodecs/blob/master/numcodecs/blosc.pyx#L78

Version and installation information

Please provide the following:

alimanfoo commented 4 years ago

Thanks @rc-conway. Do you have any suggestions for how to avoid this?

jakirkham commented 4 years ago

Could we move it to init? Would that help?

rc-conway commented 4 years ago

@jakirkham That would the prevent issues caused by having zarr on the import path.

As a just in case Maybe blosc.destroy might need to call mutex = None to clean it up?

alimanfoo commented 4 years ago

For reference, we call init() here which will be called during zarr (numcodecs) import. We also register a call to destroy at exit (here).

jakirkham commented 4 years ago

I'm guessing there is also something weird going on when Cython cleans up this object (or doesn't...).

Let's trying moving to init 🙂

jakirkham commented 4 years ago

Added PR ( https://github.com/zarr-developers/numcodecs/pull/234 ) to move initialization of the mutex to init. Also mutex is overwritten in destroy, which should trigger reference counting to cleanup the Lock (there doesn't appear to be a method to cleanup the Lock directly). Please take a look and give it a try 🙂

jstriebel commented 2 years ago

Since PR #234 was close, I had another look on this. The following shows a minimal reproduction of the problem, together with with potential fixes (coming with caveats, discussed below):

import multiprocessing as mp
from contextlib import nullcontext

if __name__ == '__main__':
    # This must happen before importing blosc,
    # and must be guarded by the if.
    mp.set_start_method("spawn")

from numcodecs import blosc

### The following line doesn't show anymore leaked semaphores:
# blosc.mutex = mp.get_context("fork").Lock

### Or this, removing the mutex completely, only using
### the _get_use_threads() check
# from contextlib import nullcontext
# blosc.mutex = nullcontext()

def f():
    print("inner")

if __name__ == '__main__':
    print("outer")
    p = mp.Process(target=f)
    p.start()
    p.join()

This outputs There appear to be 1 leaked semaphores to clean up at shutdown, commenting in any of the two fixes removes this warning. Using the first solution with mp.get_context("fork").Lock has two caveats:

  1. When using fork, leaked resources are simply not shown, so they might still leak, just the warning is gone. From the python docs

    On Unix using the spawn or forkserver start methods will also start a resource tracker process which tracks the unlinked named system resources (such as named semaphores or SharedMemory objects) created by processes of the program. When all processes have exited the resource tracker unlinks any remaining tracked object. Usually there should be none, but if a process was killed by a signal there may be some “leaked” resources. (Neither leaked semaphores nor shared memory segments will be automatically unlinked until the next reboot. This is problematic for both objects because the system allows only a limited number of named semaphores, and shared memory segments occupy some space in the main memory.)

  2. Using a lock from a fork context with spawned processes does not work. Also from the python docs :

    Alternatively, you can use get_context() to obtain a context object. Context objects have the same API as the multiprocessing module, and allow one to use multiple start methods in the same program. … Note that objects related to one context may not be compatible with processes for a different context. In particular, locks created using the fork context cannot be passed to processes started using the spawn or forkserver start methods.

    However, it seems that this might not be a problem, since the blosc calls with the global context are guarded by _get_use_threads, which ensures that the global context is only accessed

    from within a single-threaded, single-process program.

    (from this code comment), and not from any subprocesses or -threads. Those only use the non-threaded ctx-version of the calls, so the fork-only lock should actually never be used in spawned subprocesses.

Because of the latter point, I'm wondering if the mutex is actually needed at all. It would be great if someone with more knowledge about the blosc internals could comment on that. In this case simply removing the mutex would be fine, which can be simulated by setting it to contextlib.nullcontext() as shown in the second commented fix, which essentially then only uses the rest of the _get_use_threads logic to guard the global context access.

Here is some more code I used for further testing: ```python import os import multiprocessing as mp if __name__ == '__main__': # This must happen before importing blosc, # and must be guarded by the if. mp.set_start_method("spawn") from numcodecs import blosc ### The following line doesn't show anymore leaked semaphores: # blosc.mutex = mp.get_context(method="fork").Lock() ### Or this, removing the mutex completely, only using ### the _get_use_threads() check from contextlib import nullcontext blosc.mutex = nullcontext() def get_random_bytes(): LENGTH = 10**9 return b"\x00" + os.urandom(LENGTH) + b"\x00" def f(): print("inner") codec = blosc.Blosc("zstd") msg = get_random_bytes() print("use threads (inner)", blosc._get_use_threads(), flush=True) print("before inner encode", flush=True) encoded = codec.encode(msg) print("after inner encode", flush=True) decoded = codec.decode(encoded) assert decoded == msg if __name__ == '__main__': print("outer") # When using fork in the global scope, # one can still use spawn locally, # without leaked semaphore warnings: # mp = mp.get_context("spawn") p = mp.Process(target=f) p.start() blosc.set_nthreads(1) # limit threads to allow subprocess to start codec = blosc.Blosc("lz4") msg = get_random_bytes() print("use threads (outer)", blosc._get_use_threads(), flush=True) print("before outer encode", flush=True) encoded = codec.encode(msg) print("after outer encode", flush=True) decoded = codec.decode(encoded) assert decoded == msg p.join() ```
jakirkham commented 2 years ago

FWIW the PR was closed more because of trying to address issue ( https://github.com/zarr-developers/zarr-python/issues/777 ). So a new PR with those changes or other changes could be opened.

joshmoore commented 2 years ago

Do you have a preference as to which change, @jakirkham?

danielsf commented 2 years ago

I apologize if I appear to be piling on, but running python 3.9, zarr-py version 2.13.3, I cannot even get this to run without emitting the warning about leaked semaphore objects and hanging (commenting out the zarr import resolves the problem).

import zarr
import multiprocessing

def fn(x):
    return x**2

def main():
    p_list = []
    for ii in range(4):
        p = multiprocessing.Process(target=fn, args=(ii,))
        p.start()
        p_list.append(p)
    for p in p_list:
        p.join()

if __name__ == "__main__":
    main()

While I agree with the original poster that results do not actually seem to be affected by this problem, a fix would be greatly appreciated.

joshmoore commented 2 years ago

No piling on perceived, @danielsf. Extra data points welcome. Ping, @jakirkham.