tomerfiliba-org / rpyc

RPyC (Remote Python Call) - A transparent and symmetric RPC library for python
http://rpyc.readthedocs.org
Other
1.56k stars 243 forks source link

multiprocess on remote machine fails #453

Open cr-omermazig opened 3 years ago

cr-omermazig commented 3 years ago

I need to run something on the remote from a different process.

When I try this simple example from https://docs.python.org/3/library/multiprocessing.html

def foo(q):
    q.put('hello')

def test_multiprocess():
    multiprocessing.set_start_method('spawn')
    q = multiprocessing.Queue()
    p = multiprocessing.Process(target=foo, args=(q,))
    p.start()
    p.join()
    assert not q.empty()
    assert q.get() == 'hello'

Everything works (the test passes). Then, I'm trying the same example, but with the remote multiprocessing, so it will run code in a different process of the remote machine:

def foo(q):
    q.put('hello')

def test_multiprocess(rpyc_connection):
    multiprocessing.set_start_method('spawn')
    q = rpyc_connection.modules.multiprocessing.Queue()
    p = rpyc_connection.modules.multiprocessing.Process(target=foo, args=(q,))
    p.start()
    p.join()
    assert not q.empty()
    assert q.get() == 'hello'

And it fails because q is empty (meaning that foo didn't really run)

Why does that happen? Is there another why to do this?

Environment
Jongy commented 3 years ago

You might need to teleport your function (foo) for the multiprocessing module in the other side to handle it correctly. You're using the spawn method which AFAIK means multiprocessing has to serialize all relevant data for the newly invoked process. The new process doesn't have the rpyc connection. Perhaps it fails to serialize/handle the netref function?

cr-omermazig commented 3 years ago

Teleporting works. Initially this:

def test_multiprocess(rpyc_connection):
    multiprocessing.set_start_method('spawn')
    remote_foo = rpyc_connection.teleport(foo)
    q = rpyc_connection.modules.multiprocessing.Queue()
    p = rpyc_connection.modules.multiprocessing.Process(target=remote_foo, args=(q,))
    p.start()
    p.join()
    assert not q.empty()
    assert q.get() == 'hello'

raised a pickling error, but when I used pathos.multiprocessing (which serializes with dill) instead:

def foo():
    return 'hello'

def test_multiprocess(rpyc_connection):
    remote_foo = rpyc_connection.teleport(foo)
    p = rpyc_connection.modules.pathos.multiprocessing.ProcessingPool(1)
    a = p.pipe(remote_foo)
    p.clear()
    assert a == 'hello'

It worked.

I prefer a solution that doesn't involve teleport though, cause AFAIK I can't teleport functions that are not independent (uses module level imports/function):

import os

def foo():
    return os.getpid()

Is there a way around that?

BTW I remember you from Gossip. Say hey to Asaf Ezra for me :-)

Jongy commented 3 years ago

I prefer a solution that doesn't involve teleport though, cause AFAIK I can't teleport functions that are not independent (uses module level imports/function):

You can if you run the imports inside the function. You can't use imports from the module level, because those are global variables in your side of the connection and not in the "teleported" side.

I'm not sure there's a way around it, because you use multiprocessing in spawn mode so it actually runs a new process with a new interpreter and that process needs to have the function object you wish to run. As I said earlier, it doesn't have the rpyc connection.

Perhaps if you use the fork mode, the child process will be able to use the connection? I'm not sure if it's legit multithreading-wise for rpyc. You can try it. (It's not a good design though - if you don't teleport the function - then it actually runs in your side - and what's the reason to use mutliprocessing then? All processes will call the function via rpyc on the "client" side).

BTW I remember you from Gossip. Say hey to Asaf Ezra for me :-)

Haha, done that :)