python / cpython

The Python programming language
https://www.python.org
Other
62.33k stars 29.94k forks source link

Creating `multiprocessing.Queues` operates strangely with `Process`'s created in classes #123208

Open Moosems opened 3 weeks ago

Moosems commented 3 weeks ago

Bug report

Bug description:

The following from the docs works:

from multiprocessing import Process, Queue

def f(q):
    q.put([42, None, 'hello'])

if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print(q.get())    # prints "[42, None, 'hello']"
    p.join()

The following also works:

from multiprocessing import Process, Queue
from time import sleep

def f(q):
    pass

if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    sleep(1)

This works just fine:

class Foo:
    def __init__(self, q) -> None:
        pass

if __name__ == '__main__':
    q = Queue()
    p = Process(target=Foo, args=(q,))
    p.start()
    sleep(1)

But the following does NOT work:

from multiprocessing import Process, Queue
from time import sleep

class Foo:
    def __init__(self, q) -> None:
        pass

class Bar:
    def __init__(self) -> None:
        q = Queue()
        p = Process(target=Foo, args=(q,))
        p.start()

if __name__ == '__main__':
    Bar()
    sleep(1)

Which now gives this traceback:

Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/usr/local/Cellar/python@3.12/3.12.4/Frameworks/Python.framework/Versions/3.12/lib/python3.12/multiprocessing/spawn.py", line 122, in spawn_main
    exitcode = _main(fd, parent_sentinel)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/Cellar/python@3.12/3.12.4/Frameworks/Python.framework/Versions/3.12/lib/python3.12/multiprocessing/spawn.py", line 132, in _main
    self = reduction.pickle.load(from_parent)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/Cellar/python@3.12/3.12.4/Frameworks/Python.framework/Versions/3.12/lib/python3.12/multiprocessing/synchronize.py", line 115, in __setstate__
    self._semlock = _multiprocessing.SemLock._rebuild(*state)
                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
FileNotFoundError: [Errno 2] No such file or directory

The following does work, though:

from multiprocessing import Process, Queue
from time import sleep

class Foo:
    def __init__(self, q) -> None:
        q.put("INIT COMPLETE")

class Bar:
    def __init__(self) -> None:
        q = Queue()
        p = Process(target=Foo, args=(q,))
        p.start()
        q.get()

if __name__ == '__main__':
    Bar()
    sleep(1)

This should either be documented or fixed, preferably the latter. This behavior is strange and, quite frankly, unexpected. It took me quite a while to figure this out and fix my code but adding an init queue item is not ideal. Note that the following also breaks with the same error:

from multiprocessing import Process, Queue
from time import sleep

def foo(q) -> None:
    pass

class Bar:
    def __init__(self) -> None:
        q = Queue()
        p = Process(target=foo, args=(q,))
        p.start()

if __name__ == '__main__':
    Bar()
    sleep(1)

Moving the Queue outside of the class (when calling it) also doesn't fix the issue:

from multiprocessing import Process, Queue
from time import sleep

def foo(q) -> None:
    pass

class Bar:
    def __init__(self, q) -> None:
        p = Process(target=foo, args=(q,))
        p.start()

if __name__ == '__main__':
    Bar(Queue())
    sleep(1)

But the following works just fine:

from multiprocessing import Process, Queue
from time import sleep

def foo(q) -> None:
    pass

class Bar:
    def __init__(self, q) -> None:
        p = Process(target=foo, args=(q,))
        p.start()

if __name__ == '__main__':
    q = Queue()
    Bar(q)
    sleep(1)

This may be related to #116526 but I don't personally think this should be classified as the same issue. May be related to #23376 as well but this may still be a separate issue. Neither address this kind of behavior, however.

CPython versions tested on:

3.12

Operating systems tested on:

macOS

Moosems commented 3 weeks ago

Note that if I give two queues (for the put get init) and only put get for one of them, the second works just fine

picnixz commented 3 weeks ago

I cannot reproduce the issue on Linux. Can you try with the latest 3.12 and with 3.13/3.14 as well please?

Zheaoli commented 3 weeks ago

This problem exists. I can reproduce it on MacOS. I think this is a spawn mode only problem. maybe you can try multiprocessing.set_start_method('spawn') on Linux to reproduce it.

Would you mind to assign this issue to me? cc @picnixz

picnixz commented 3 weeks ago

I can reproduce it in spawn mode. You can work on it if you want, just ping me if you need a review.

YvesDup commented 3 weeks ago

I am wondering if the __init__ class method is truely a callable object as the expected target parameter ?

Moosems commented 3 weeks ago

@YvesDup The last two of the original examples show that it has to do with the Queue being created in a class creating a process. All three of the following break:

from multiprocessing import Process, Queue
from time import sleep

def foo(q) -> None:
    pass

class Bar:
    def __init__(self) -> None:
        pass

    def method(self):
        self.q = Queue()
        p = Process(target=foo, args=(self.q,))
        p.start()

if __name__ == '__main__':
    Bar().method()
    sleep(1)
from multiprocessing import Process, Queue
from time import sleep

def foo(q) -> None:
    pass

class Bar:
    def __init__(self) -> None:
        self.q = Queue()
        self.method()

    def method(self):
        p = Process(target=foo, args=(self.q,))
        p.start()

if __name__ == '__main__':
    Bar()
    sleep(1)
from multiprocessing import Process, Queue
from time import sleep

def foo(q) -> None:
    pass

class Bar:
    def __init__(self) -> None:
        self.method()

    def method(self):
        self.q = Queue()
        p = Process(target=foo, args=(self.q,))
        p.start()

if __name__ == '__main__':
    Bar()
    sleep(1)
Zheaoli commented 3 weeks ago

After diving into this problem, I think it's not a bug.

The issue is caused by the spawn node speed.

TL;DR;

For now, the multiprocessing module have three mode

  1. Fork
  2. Spawn
  3. Fork Server

The default mode on macOS is spawn mode which is much slower than fork mode. So when the subprocess is ready to us e the fd which is input by us, the parent process has existed. So the fd is recycle by the system.

I think maybe we need to update the documentation to add more tips?

Moosems commented 3 weeks ago

@Zheaoli I have found some interesting behavior and I think I understand what is happening: garbage collection. Let's keep Foo constant for all examples as follows:

class Foo:
    def __init__(self, q) -> None:
        pass

Now, let's take a look at Bar:

In this example, the script has no issues:

class Bar:
    def __init__(self) -> None:
        self.q = Queue()
        self.p = Process(target=Foo, args=(self.q,), daemon=True)
        self.p.start()

if __name__ == "__main__":
    x = Bar()
    sleep(1)

But if we make q a local variable q = Queue() or Process(target=Foo, args=(Queue(),), daemon=True) it breaks. In both these cases, the variable is thrown away before the new Process is finished setting up. The Queue is garbage collected as they have no references afterwards. The same happens if we change the if statement to simply contain

if __name__ == "__main__":
    Bar()
    sleep(1)

As such, I believe that this has to do with the Queue being garbage collected before the second process is created and that is what causes the issue. This conclusion is further evidenced by this working without issue:

class Bar:
    def __init__(self) -> None:
        q = Queue()
        self.p = Process(target=Foo, args=(q,), daemon=True)
        self.p.start()
        sleep(1)

if __name__ == "__main__":
    Bar()

Would you agree this seems to be the source of the issue?

Moosems commented 3 weeks ago

This appears to be a viable workaround at the moment that works wether or not the user instantiates Bar as a variable:

from multiprocessing import Process, Queue
from time import sleep

class Foo:
    def __init__(self, q) -> None:
        pass

class Bar:
    def __init__(self) -> None:
        self.create_another()

    def create_another(self):
        if hasattr(self, "q"):
            self.p.terminate()
            del self.q # Just to prove this works
        self.q = Queue()
        self.p = Process(target=Foo, args=(self.q,), daemon=True)
        self.p.start()

    def __del__(self):
        self.p.terminate()
        del self

if __name__ == '__main__':
    # Mini tests
    x = Bar()
    sleep(1)
    x.create_another()
    Bar()
    Bar().create_another()
YvesDup commented 2 weeks ago

FYI, the example below fails whatever foo is: a function, a new object (__init__ as callable) or a callable existing object.

def main():
    q = Queue()
    p = Process(target=foo, args=(q,))
    p.start()

if __name__ == '__main__':
    main()
    sleep(1)
Moosems commented 2 weeks ago

If you return q and keep it alive while the process is instantiated it works normally @YvesDup