python / cpython

The Python programming language
https://www.python.org
Other
63.53k stars 30.44k forks source link

Sharing instance of proxy object across processes results in pickle errors #101320

Open vincent-grosbois opened 1 year ago

vincent-grosbois commented 1 year ago

Hello

I'm trying to understand if a SyncManager proxy object instances are supposed to be shareable accoss processes, and if so, how

Bug report

See https://github.com/python/cpython/issues/101320#issuecomment-1404187868 below

What I'm doing is the following:

Is this a bug or something that should be possible to do in python ? I assume that the objects in syncmanager can be accessed through several processes, is it true ? If so, what is the proper way to do it ? (I think that just using the same reference of proxy object accross forked processes is not supposed to work)

Your environment

python 3.9

vincent-grosbois commented 1 year ago

Actually, I am wondering the following:

in manager class, method handle_request is spawned for each new request. It will call whatever function was received from the request (https://github.com/python/cpython/blob/main/Lib/multiprocessing/managers.py#L202) As I understand from the methods exposed in DictProxy, not all of them are thead-safe, because for instance it's possible that 2 processes both call values()and __delitem__ at the same time In that case it seems that nothing is done to synchronize the accesses to a given instance in the Manager. Am I correct ?

vincent-grosbois commented 1 year ago

Here is a repro case :

import os
from multiprocessing.managers import SyncManager

if __name__ == '__main__':

    manager = SyncManager(authkey=b'test')
    manager.start()
    address = manager.address
    d = manager.dict()
    pickled_dict = d.__reduce__()
    pickled_dict[1][-1]["authkey"] = b"test"
    print(pickled_dict)
    for i in range(1000):
        d[i] = i

    child_id = os.fork()

    if child_id != 0:
        # in parent, do work on the proxy object forever
        i = 0
        while True:
            d[i%1000] = i%3434
            i += 1
    else:
        # in children

        # connect to manager process
        child_manager = SyncManager(address=address, authkey=b'test')
        child_manager.connect()

        # rebuild the dictionary proxy
        proxy_obj = pickled_dict[0](*pickled_dict[1])

        # read on the proxy object forever
        while True:
            print(list(proxy_obj.values())[:10])

Doing this just raises _pickle.UnpicklingError: invalid load key, '\x0a'. in one of the processes randomly

vincent-grosbois commented 1 year ago

And here is an example where I have no problem at all. This is the same example as before except that I'm using a custom class that I registered, where every access is done through a lock:

import os
from multiprocessing.managers import SyncManager
from threading import RLock

class SyncedDictionary:
    def __init__(self):
        # store the data in the instance
        self.data = {}
        self.lock = RLock()
        print(f"init from {os.getpid()}")

    def add(self, k, v):
        with self.lock:
            self.data[k] = v

    def get_values(self):
        with self.lock:
            return list(self.data.values())

if __name__ == '__main__':

    # custom class

    SyncManager.register("custom", SyncedDictionary)
    manager = SyncManager(authkey=b'test')
    manager.start()
    address = manager.address

    print(f"from main pid {os.getpid()}")
    custom_dict = manager.custom()
    pickled_dict = custom_dict.__reduce__()
    pickled_dict[1][-1]["authkey"] = b"test"
    print(pickled_dict)
    child_id = os.fork()

    if child_id != 0:
        # in parent, do work on the proxy object forever
        i = 0
        while True:
            custom_dict.add(i % 1000, i % 3434)
            i += 1
    else:

        for i in range(3):
            os.fork() # even more child processes...

        print(os.getpid())
        # in children
        # connect to manager process
        child_manager = SyncManager(address=address, authkey=b'test')
        child_manager.connect()

        # rebuild the dictionary proxy
        proxy_obj = pickled_dict[0](*pickled_dict[1])
        # read on the proxy object forever
        while True:
            list(proxy_obj.get_values())[:10]

This will run forever without issues

vincent-grosbois commented 1 year ago

Actually, I managed to create a more complex example where even when the custom class is fully synced, I still have corrupted data. I've just seen that pickle itself isn't thread safe apparently so I wonder how this Manager class can work in the general case ? given that it processes input requests by threads without locking and that pickle (default serializer) is not thread-safe

relent95 commented 1 year ago

I'm not a core developer, but IMHO it's not guaranteed that the multiprocessing API will work correctly with an os.fork() call. Try multiprocessing.Process() and pass proxy objects in the args argument.

vincent-grosbois commented 1 year ago

In both examples I tried either to reuse the proxy object instance directly (forked from main process) and also to rebuild it from scratch from what we get from the call to __reduce__ (which is exactly what would happen if I tried as you suggested to pass proxy objects in args, I think). In any case, the implementation for proxy object isn't so complex, it's basically an id and type info to indentify objects on the server. On first connection there is a validity check, that passes (otherwise exception would be thrown). I don't think there is some hidden way to use the proxy object that would be even more valid than what I tried

knwng commented 1 year ago

Hi @vincent-grosbois, I believe the superficial reason of this error is that the DictProxy objects in parent and sub processes share the same socket which leads to conflict when processes are sending commands through this socket concurrently. You can fix that by creating new connection deliberately with proxy_obj._connect()

The complete code is:

import os
from multiprocessing.managers import SyncManager

if __name__ == '__main__':
    manager = SyncManager(authkey=b'test')
    manager.start()
    address = manager.address
    d = manager.dict()
    pickled_dict = d.__reduce__()
    pickled_dict[1][-1]["authkey"] = b"test"
    print(pickled_dict)
    for i in range(1000):
        d[i] = i

    child_id = os.fork()

    if child_id != 0:
        # in parent, do work on the proxy object forever
        i = 0
        while True:
            d[i%1000] = i%3434
            i += 1
    else:
        # in children

        # connect to manager process
        child_manager = SyncManager(address=address, authkey=b'test')
        child_manager.connect()

        # rebuild the dictionary proxy
        proxy_obj = pickled_dict[0](*pickled_dict[1])

        # create a new connection here
        proxy_obj._connect()

        # read on the proxy object forever
        while True:
            print(list(proxy_obj.values())[:10])
knwng commented 1 year ago

And there's an important difference between the two snippets: you didn't assign data to custom dict before forking in the second snippet. If you do, you should get the same result:

import os
from multiprocessing.managers import SyncManager
from threading import RLock

class SyncedDictionary:
    def __init__(self):
        # store the data in the instance
        self.data = {}
        self.lock = RLock()
        print(f"init from {os.getpid()}")

    def add(self, k, v):
        with self.lock:
            self.data[k] = v

    def get_values(self):
        with self.lock:
            return list(self.data.values())

if __name__ == '__main__':

    # custom class

    SyncManager.register("custom", SyncedDictionary)
    manager = SyncManager(authkey=b'test')
    manager.start()
    address = manager.address

    print(f"from main pid {os.getpid()}")
    custom_dict = manager.custom()
    pickled_dict = custom_dict.__reduce__()
    pickled_dict[1][-1]["authkey"] = b"test"
    print(pickled_dict)

    # missing assignment here
    for i in range(1000):
        custom_dict.add(i, i)

    child_id = os.fork()

    if child_id != 0:
        # in parent, do work on the proxy object forever
        i = 0
        while True:
            custom_dict.add(i % 1000, i % 3434)
            i += 1
    else:

        for i in range(3):
            os.fork() # even more child processes...

        print(os.getpid())
        # in children
        # connect to manager process
        child_manager = SyncManager(address=address, authkey=b'test')
        child_manager.connect()

        # rebuild the dictionary proxy
        proxy_obj = pickled_dict[0](*pickled_dict[1])
        # read on the proxy object forever
        while True:
            list(proxy_obj.get_values())[:10]

In this snippet, the proxy object in the subprocess will also reuse the same connection as that in the parent process. The solution is still proxy_obj._connect().

knwng commented 1 year ago

So what's the underlying reason? Let's go through some code snippets first.

When a proxy is created, it tries to find existing connection from TLS. https://github.com/python/cpython/blob/main/Lib/multiprocessing/managers.py#L760

Then when we invoke methods through this proxy by calling _callmethod, it will check if self._tls has an attribute connection. If so, the connection will be reused, or a new one will be created. https://github.com/python/cpython/blob/main/Lib/multiprocessing/managers.py#L813

The TLS data is stored in ForkAwareLocal who registers a hook to clear its attibutes after fork. https://github.com/python/cpython/blob/main/Lib/multiprocessing/util.py#L387

And the registered hooks is called by the Process class. https://github.com/python/cpython/blob/main/Lib/multiprocessing/process.py#L342

So the underlying solution is, like @relent95 said, using Process to create your subprocess.

import os
import multiprocessing as mp
from multiprocessing.managers import SyncManager

def worker(pickled_dict, address):
    child_manager = SyncManager(address=address, authkey=b'test')
    child_manager.connect()

    # rebuild the dictionary proxy
    proxy_obj = pickled_dict[0](*pickled_dict[1])

    # read on the proxy object forever
    while True:
        print(list(proxy_obj.values())[:10])

if __name__ == '__main__':
    mp.set_start_method('fork')
    log_to_stderr(logging.DEBUG)

    manager = SyncManager(authkey=b'test')
    manager.start()
    address = manager.address
    d = manager.dict()
    pickled_dict = d.__reduce__()
    pickled_dict[1][-1]["authkey"] = b"test"
    print(pickled_dict)
    for i in range(1000):
        d[i] = i

    p = mp.Process(target=worker, args=(pickled_dict, address))
    p.start()

    i = 0
    while True:
        d[i%1000] = i%3434
        i += 1

    p.join()
ed2050 commented 1 year ago

Thanks all, this thread is incredibly helpful. Just have a few questions on things I don't understand / aren't covered above.

I know forking and managers may not be an ideal combination. But sometimes you don't have a choice. For instance, see my example below about using HTTPServer with ForkingMixin. I can't control how new processes are created, but I need to share some objects between them.

Questions

Refering to examples from @vincent-grosbois and @knwng above:

  1. What is the point of child_manager? It's created and connected, then never used again. Seems rather useless. The proxy object should know how to connect to the parent manager (that's the whole point of the proxy object). Once you have proxy object in child (either inherited from fork or passing to Process) then there's no point to creating another manager object, from what I can tell.
  2. What is the point of pickled_dict? It looks like you're just manually pickling the proxy obj instead of passing / inheriting it directly. Isnt't that what proxy objects are for? To pass between processes. The proxy object knows how to pass calls back to the manager that created it. I don't see what pickled_dict accomplishes. E.g. in @knwng's last example, why not p = mp.Process(target=worker, args=(d, address)) instead of of p = mp.Process(target=worker, args=(pickled_dict, address))?
  3. If you need to do a manual fork (e.g. fork in lib you don't control), it seems the issue is just resetting the TLS connection on the inherited proxy obj. Instead of mp.Process invoking the hooks for you, you just have to invoke them yourself. Is that accurate? Looks like proxy objects have an _after_fork method, perhaps calling that is what's needed?

Revised example

My current understanding is that the last example above from @knwng could be rewritten as follows for the same effect. Am I missing something?

import os
import multiprocessing as mp
from multiprocessing.managers import SyncManager

def worker (proxy_obj) :
    # if we did a manual fork, need to reset TLS connection in proxy_obj before using it
    # something like : proxy_obj._after_fork () or proxy_obj._connect () ?
    # since this example uses Process.start, manual reset is not needed

    while True :
        print (list (proxy_obj.values ()) [ :10])

if __name__ == '__main__' :
    mp.set_start_method ('fork')
    log_to_stderr (logging.DEBUG)

    manager = SyncManager (authkey = b'test')
    manager.start ()

    d = manager.dict ()
    for i in range (1000) :
        d [i] = i 

    # no need for manual pickling and unpickling of d.  Process handles that for us.
    p = mp.Process (target = worker, args = (d,))
    p.start ()

    i = 0 
    while True :
        d [i % 1000] = i % 3434
        i + = 1

    p.join ()

My Setup

I have a similar issue as OP with pickle errors on managed objects. In my case, I'm running a local webserver with HTTPServer and ForkingMixin. Every connection forks a new process to handle the request.

A single process server won't work because I'm serving several clients at once. Threading server won't work because of GIL (many requests are compute-intensive, threads would blow up response times). Plus threads are harder to get right, trampling on global state in libs and such.

My server has a cache object for quick return of already computed responses. When new response is needed, it's slow to compute, so cache updates are expensive. Cache updates made by one forked handler should be added to cache for future requests in other handlers. This requires a shared cache object with a manager to handle updates from multiple processes.

I have this setup working with a SyncManager in parent that creates the cache proxy. Forked handlers inherit the cache proxy object. All works fine for awhile. But eventually, there's a pickle error on cache access from one of the children. After that, manager stops responding and server needs restarted. I think it's the TLS connection issue knwg mentioned above. Children are reusing the same proxy obj without resetting the connection.

If I'm right, then I just have to figure out how to reset TLS connection in inherited proxy object. Looks like proxy objects have a method _after_fork. That sounds promising.

Sample code

Here's simplified code to illustrate my setup (nonstandard conventions, I know).

import multiprocessing as mp
import multiprocessing.managers as managers ; mp.managers = managers

from http.server import BaseHTTPRequestHandler, HTTPServer
from socketserver import ForkingMixIn

# ----- setup manager 
if __name__ == '__main__' :

    mp.managers.SyncManager.register ('mycache', MyCache)
    manager = mp.managers.SyncManager ()
    manager.start ()
    cache = manager.mycache ()

# ----------
class MyReqHandler (BaseHTTPRequestHandler) :

    def do_GET (me) :
        # first entry point in forked child created by ForkingHTTPServer
        query = urllib.parse.urlparse (me.path).query
        data = urllib.parse.parse_qs (query, keep_blank_values = true)

        # need to reset TLS connection here before using cache proxy object
        # maybe cache._after_fork () ?
        # update: _after_fork just increments ref counters.  use cache._connect to reset connection
        # see next post for more info

        resp = cache.locate (data)  # <-- expensive, computes and adds to cache if doesnt exist

        me.send_response (200, 'ok')
        me.wfile.write (resp)

class ForkingHTTPServer (ForkingMixIn, HTTPServer) :
    pass

# ----------
def main (args) :
    httpd = ForkingHTTPServer (listen_address, MyReqHandler)
    while true :
        httpd.handle_request ()   # handle one request in forked child
        httpd.collect_children ()  # reap forked children, since ForkingMixIn doesn't forking do it
ed2050 commented 1 year ago

Update

Well calling _after_fork isn't enough. It seems to help a bit, server lasts a little longer, but still eventually hits a pickle error and cache stops responding.

I finally managed to fix my problem. Here's the takeaway lesson: CALL _connect ON PROXY OBJECTS AFTER FORK !!! This should really be documented in stdlib somewhere.

Here's how I got there, hope it helps someone else:

Data Collection

I built a test client to make my server fail reproducibly. Test client makes process pool to send requests to the server, with random 0-50 ms delay before calls. Here's what I observed:

Taken together, the above suggests the issue may be a race condition triggering a deadlock. I dug into my code to investigate further.

Deadlock

This is where things get strange.

Let's start with my expectation. Based on pydocs, I expect that a multiprocessing manager is a single threaded, single process server. Or rather, it may use concurrency to handle multiple concurrent requests on a managed object, but actual operations on that object will be performed sequentially.

Eg if two processes call dictproxy.update (other) at the same time, the manager will complete one update call on the underlying object before performing the second. The update calls may happen out of order, but only one update call will be done at a time.

Based on my tests, it seems that is correct, but not the whole story.

First I examined debug logs from my server. When it hangs, there are several calls to cache.locate () that have started but never finished. So the hang is either inside the cache proxy object function, or in the manager process that services the call.

Next I turned on debug logs for the underlying cache object. By logging pid and thread id on every call, I confirmed that the manager object is single process, single threaded for accessing cache. Every log record on calls to cache object shows the same pid and thread id, regardless of test client pool size and forked server connections.

This is the part I don't understand. My cache object runs fine, up to the point locate () has the return value. Then it just gets stuck somewhere during return. I'll illustrate with code:

# --- in cache object run by manager (see my previous post)

def locate (data) :
    log (f'{ os.getpid () } ; { threading.get_ident () }')
    ret = do some stuff
    log (f'locate : { ret = }')     # <-- got here, return value ok
    return ret

# --- in webserver

if __name__ == '__main__' :
    SyncManager.register ('cache', Mycache)
    manager.start ()
    cache = SyncManager.cache ()

def get_data (data) :
    # this gets called in child after fork

    log ('calling cache.locate')
    ret = cache.locate (data)
    log (f'got cache result : { ret }')     # <-- never reaches here

    # flush logs
    print ('flushing logger', file = sys.__stdout__, flush = true)
    for handler in logger.handlers :
        handler.flush ()

# --- output in logfile

calling cache.locate
12345 ; 129483726582658000
locate : ret = foobar
# hangs here, no more log entries and server stops responding.
# "flushing logger" never hits console either.

No exception happens. The server just never sees return value. I reached 'return ret' in cache object inside manager, but never got the return value in calling code.

I thought it might be a logging issue, but logs were flushed and still nothing. Return value is given to manager, but never sent back to server. It seems manager is stuck on something.

I looked at the code for BaseProxy and SyncManager. Couldn't find an obvious place where they get stuck. Didn't want to send a long time tracing through every line though.

I also looked at the proxy object to see how it connects to the manager. For a proxy object d, my system shows that d._token.address has the value /var/folders/25/_rx0b0ps3bs3kx889jmp7pz00000gp/T/pymp-mu4manf5/listener-00qwjqqx. It's a unix socket. Not sure if you can do tls over a unix socket, but that's what my system uses.

At this point I suspect the problem is a data race condition. Two processes are reading or writing to the socket at the same time, trampling on each other's data. Somehow this reaches a state where the manager has the return value, but is unable to write it to the socket. So my program gets stuck waiting for the return value, which never appears to the caller.

The Fix

Trying cache._after_fork () had little effect. Then I tried cache._connect () in each handler after fork. Wow did that fix it! Suddenly my server can respond to 50 requests from 5 concurrent clients almost instantly! Ok bit of an exaggeration but within a couple seconds.

Takeway Lesson: CALL _connect ON PROXY OBJECTS AFTER FORK !!!

Footnotes

[1] Not sure why _after_fork helps. Checking stdlib code for ForkingMixIn, it calls os._exit () when done handling a request. So cleanup on proxy objects and decrementing ref cnt shouldn't be triggered when forked handler exits. Not sure why _after_fork makes any difference in server lifetime. Maybe it triggers another side effect. Or maybe it's just coincidence and would disappear with larger sample size.