tomerfiliba-org / rpyc

RPyC (Remote Python Call) - A transparent and symmetric RPC library for python
http://rpyc.readthedocs.org
Other
1.56k stars 244 forks source link

result expired: deadlock with multiple threads(?) #381

Open sergeyyatmellanox opened 4 years ago

sergeyyatmellanox commented 4 years ago

My code uses the forking server. During my program execution it creates many threads that all use the same connection. From time to time some sort of deadlock appears.

I've added a bunch of prints to rpyc/packages/core/protocol.py:

def serve(self, timeout=1, wait_for_lock=True):  # serving
        protocol_logger.info('entered: serve({})'.format(locals()))
        timeout = Timeout(timeout)
        with self._recv_event:
            protocol_logger.info('entered: with self._recv_event')
            if not self._recvlock.acquire(False):
                protocol_logger.info('entered: if not self._recvlock.acquire(False)')
                return wait_for_lock and self._recv_event.wait(timeout.timeleft())
        try:
            protocol_logger.info('calling: self._channel.poll(timeout) and self._channel.recv()')
            data = self._channel.poll(timeout) and self._channel.recv()
            if not data:
                protocol_logger.info('entered: if not data')
                return False
        except EOFError:
            protocol_logger.info('entered: except EOFError')
            self.close()
            raise
        finally:
            protocol_logger.info('entered: finally')
            self._recvlock.release()
            protocol_logger.info('released: self._recvlock.release()')
            with self._recv_event:
                protocol_logger.info('entered: with self._recv_event:')
                self._recv_event.notify_all()
                protocol_logger.info('called: self._recv_event.notify_all()')
        protocol_logger.info('calling: self._dispatch(data)')
        self._dispatch(data)
        protocol_logger.info('done: self._dispatch(data)')
        return True

and to rpyc/packages/core/async_.py:

def wait(self):
    async_logger.info('entered: wait({})'.format(locals()))
    while not self._is_ready and not self._ttl.expired():
        async_logger.info('calling: self._conn.serve(self._ttl)')
        self._conn.serve(self._ttl)
        async_logger.info('done: self._conn.serve(self._ttl)')
    if not self._is_ready:
        async_logger.info('raising: AsyncResultTimeout("result expired")')
        raise AsyncResultTimeout("result expired")
    async_logger.info('done: wait({})'.format(locals()))

and what got is:

78039 Thread-3 - 2020-03-29 20:25:28,352 INFO:AsyncResult:entered: wait({'self': <AsyncResult object (pending) at 0x7f477ee764c0>}) 78040 Thread-3 - 2020-03-29 20:25:28,352 INFO:AsyncResult:calling: self._conn.serve(self._ttl) 78041 Thread-3 - 2020-03-29 20:25:28,353 INFO:ProtocolLogger:entered: serve({'wait_for_lock': True, 'timeout': <rpyc.packages.lib.Timeout object at 0x7f477c3e2cf8>, 'self': <rpyc.packages.core.protocol.Connection 'conn1' object at 0x7f478f1fdba8>}) 78042 Thread-3 - 2020-03-29 20:25:28,353 INFO:ProtocolLogger:entered: with self._recv_event 78043 Thread-3 - 2020-03-29 20:25:28,353 INFO:ProtocolLogger:entered: if not self._recvlock.acquire(False)

78044 Thread-2 - 2020-03-29 20:25:28,353 INFO:ProtocolLogger:done: self._dispatch(data) 78045 Thread-2 - 2020-03-29 20:25:28,353 INFO:AsyncResult:done: self._conn.serve(self._ttl) 78046 Thread-2 - 2020-03-29 20:25:28,353 INFO:AsyncResult:done: wait({'self': <AsyncResult object (ready) at 0x7f477ee76468>})

78047 Thread-2 - 2020-03-29 20:25:28,353 INFO:AsyncResult:entered: wait({'self': <AsyncResult object (pending) at 0x7f477ee76468>}) 78048 Thread-2 - 2020-03-29 20:25:28,354 INFO:AsyncResult:calling: self._conn.serve(self._ttl) 78049 Thread-2 - 2020-03-29 20:25:28,354 INFO:ProtocolLogger:entered: serve({'wait_for_lock': True, 'timeout': <rpyc.packages.lib.Timeout object at 0x7f477dc34940>, 'self': <rpyc.packages.core.protocol.Connection 'conn1' object at 0x7f478f1fdba8>}) 78050 Thread-2 - 2020-03-29 20:25:28,354 INFO:ProtocolLogger:entered: with self._recv_event 78051 Thread-2 - 2020-03-29 20:25:28,354 INFO:ProtocolLogger:entered: if not self._recvlock.acquire(False)

78052 Thread-3 - 2020-03-29 20:30:28,353 INFO:AsyncResult:done: self._conn.serve(self._ttl) 78053 Thread-3 - 2020-03-29 20:30:28,354 INFO:AsyncResult:raising: AsyncResultTimeout("result expired")

78054 Thread-2 - 2020-03-29 20:30:28,355 INFO:AsyncResult:done: self._conn.serve(self._ttl) 78055 Thread-2 - 2020-03-29 20:30:28,355 INFO:AsyncResult:raising: AsyncResultTimeout("result expired")

78057 Traceback (most recent call last): 78058 File "/usr/local/lib64/python3.6/site-packages/rpyc/packages/core/protocol.py", line 324, in _dispatch_request 78059 res = self._HANDLERS[handler](self, args) 78060 File "/usr/local/lib64/python3.6/site-packages/rpyc/packages/core/protocol.py", line 609, in _handle_call 78061 return obj(args, dict(kwargs)) 78062 File "/usr/local/lib64/python3.6/site-packages/my_project.py", line 46, in my_foo 78063 if self.sysfs_agent.exists(dev_port_path): 78064 File "/usr/local/lib64/python3.6/site-packages/my_project.py", line 16, in my_other_foo 78065 self.logger.debug('my print') 78066 File "/usr/lib64/python3.6/logging/init.py", line 1295, in debug 78067 self._log(DEBUG, msg, args, kwargs) 78068 File "/usr/lib64/python3.6/logging/init.py", line 1443, in _log 78069 self.handle(record) 78070 File "/usr/lib64/python3.6/logging/init.py", line 1453, in handle 78071 self.callHandlers(record) 78072 File "/usr/lib64/python3.6/logging/init.py", line 1515, in callHandlers 78073 hdlr.handle(record) 78074 File "/usr/local/lib64/python3.6/site-packages/rpyc/packages/core/netref.py", line 253, in call 78075 return syncreq(_self, consts.HANDLE_CALL, args, kwargs) 78076 File "/usr/local/lib64/python3.6/site-packages/rpyc/packages/core/netref.py", line 76, in syncreq 78077 return conn.sync_request(handler, proxy, args) 78078 File "/usr/local/lib64/python3.6/site-packages/rpyc/packages/core/protocol.py", line 485, in sync_request 78079 return self.async_request(handler, args, timeout=timeout).value 78080 File "/usr/local/lib64/python3.6/site-packages/rpyc/packages/core/async.py", line 109, in value 78081 self.wait() 78082 File "/usr/local/lib64/python3.6/site-packages/rpyc/packages/core/async.py", line 57, in wait 78083 raise AsyncResultTimeout("result expired") 78084 TimeoutError: result expired

As I understand it:

  1. thread 2 got the lock and started the serve first
  2. thread 3 started to wait for the event to get lock
  3. thread 2 finished
  4. thread 3 started another serve and entered the event wait as well
  5. thread 3 raised exception after timeout finished(300 seconds in my config).
Environment
Minimal example

Unfortunately my program is too complex to publish here and I wasn't able to reproduce it otherwise.

comrumino commented 4 years ago

Thank you for taking the time to provide details and opening an issue :100:

I'm hoping that you will help me isolate the issue a bit more. Here are some things that I find nice-to-have; I would be forever grateful if you provided them. So, these items would help me resolve this issue quicker and reduce innate ambiguity.

If you would feel more comfortable using PGP and back-channeling, here is my email is comrumino@archstrike.org and public key---contact info is under my user profile as well.

Maintainer Notes: Threading issues

Of course, concurrency is difficult to debug. It seems likely that the root cause(s) overlap for this issue, #354, and #360. Even if we are unable to reproduce deadlocks or timeouts, an executable script to emulate program behavior would be more precise and insightful than bullet point outlines.

Abstracting away program specifics, things we should consider while resolving the bug:

Improving the documentation on threading with RPyC would be beneficial for all:

sergeyyatmellanox commented 4 years ago

Hi,

Thank you for taking time to help me. I'm strongly agree with you about documentation. I've been using this package for 3-4 years now, and always felt like the documentation and examples are too basic.

I forgot an important detail! The issue started once I updated the RPyC to 4.1.4, it didn't happen on 3.4.3.

I'm adding a small code snippet that simulates what my actual program does. Basically:

  1. create multiple rpyc servers on 2 different machines.
  2. connect to these servers from 1 of the machines, we will call it the master from now on.
  3. create dictionary on the master.
  4. pass the dictionary to a function running on the servers and have that function access this dictionary.
  5. each function runs from a thread, so we have multiple concurrent accesses to the the shared dictionary over rpyc from different rpyc connections.

It's important to note that the shared object is not always a dictionary, sometimes it's a custom class. I'm using the simple threading.Thread to create and run threads. The synchronization is done by waiting for a value in the dictionary to change(one thread sleeps while it waits for some other thread to update that value. For example a counter that should hit zero, each thread decreases the counter by 1 and sleeps until the counter reaches zero using RLock to sync writes or the synchronization is done between only 2 threads and they simply wait until each one updates designated boolean to True). The number of threads is less than 10. The main thread creates 2 threads and then each such thread creates another thread that runs some function, but the sync is done by the "first-level" threads(the ones created by the Main thread).

import threading
import random
import time
import logging

import rpyc

logger = logging.getLogger()

def init():
    shared_dict = {}
    for i in range(2):
        shared_dict[i] = {}
        for j in range(10):
            shared_dict[i][j] = {'done': False}
    return shared_dict

def fib(n): 
    if n==1: 
        return 0
    elif n==2: 
        return 1
    else: 
        return fib(n-1) + fib(n-2)

def foo(i, j, shared_dict):
    logger.info('start foo: {}'.format((i,j, shared_dict[i])))
    logger.info('foo {} fib = {}'.format((i, j), fib(random.randint(10,20))))
    shared_dict[i][j]['done'] = True
    logger.info('done foo: {}'.format((i,j, shared_dict[i])))

def baz(i, shared_dict):
    logger.info('start baz: {}'.format(i))
    threads = []
    for j in range(10):
        t = threading.Thread(target=foo, args=(i,j,shared_dict))
        t.setDaemon(True)
        t.start()
        threads.append(t)

        for _i in shared_dict:
            while not shared_dict[_i][j]['done']:
                time.sleep(0.1)

    for t in threads:
        t.join()

    logger.info('done baz: {}'.format(i))

def bar(conn, i ,shared_dict):
    logger.info('start bar: {}'.format((conn, i)))
    conn.modules.rpyc_debug_shared_dict.baz(i, shared_dict)
    logger.info('done bar: {}'.format((conn, i)))

def start():
    connections = []
    connections.append(rpyc.classic.connect('10.141.32.7', 18812))
    connections.append(rpyc.classic.connect('10.141.32.8', 18812))
    for conn in connections:
        conn.modules.sys.path.append('/workspace/rpyc_debug')

    for k in range(1000):
        threads = []
        shared_dict = init()
        for i, conn in enumerate(connections):
            thread = threading.Thread(target=bar, args=(conn, i, shared_dict))
            thread.setDaemon(True)
            thread.start()
            threads.append(thread)

        for thread in threads:
            thread.join()

        print('try NO.', k)

if __name__ == '__main__':
    start()
msidana commented 4 years ago

Is there any update on this issue ? Will it be fixed in the next release ?

msidana commented 3 years ago

@comrumino Are you working on this issue ?

comrumino commented 7 months ago

Slowly :), i think there was a PR some time ago on the threading fun.