tomerfiliba-org / rpyc

RPyC (Remote Python Call) - A transparent and symmetric RPC library for python
http://rpyc.readthedocs.org
Other
1.57k stars 243 forks source link

deque mutated during iteration #383

Open msidana opened 4 years ago

msidana commented 4 years ago

I am using rpyc 4.1.4 to implement RPC in my components. The server being used is ThreadedServer. The RPC calls run smoothly most of the times, but sometimes I get the below issue. Error: deque mutated during iteration

  File "/opt/touchstone/touchstone_engine_venv/lib/python3.7/site-packages/rpyc/core/netref.py", line 253, in __call__
    return syncreq(_self, consts.HANDLE_CALL, args, kwargs)
  File "/opt/touchstone/touchstone_engine_venv/lib/python3.7/site-packages/rpyc/core/netref.py", line 76, in syncreq
    return conn.sync_request(handler, proxy, *args) 
  File "/opt/touchstone/touchstone_engine_venv/lib/python3.7/site-packages/rpyc/core/protocol.py", line 469, in sync_request
    return self.async_request(handler, *args, timeout=timeout).value
  File "/opt/touchstone/touchstone_engine_venv/lib/python3.7/site-packages/rpyc/core/async_.py", line 102, in value
    raise self._obj
_get_exception_class.<locals>.Derived: deque mutated during iteration
Environment

Server Side:

import rpyc
from rpyc import ThreadedServer

class MyRPCManager(Singleton):
    """ Mongo RPC Manager class."""

    def __init__(self):
        self.rpc_handler = MyRPCService()
        self.rpc_server = None
        self.rpc_port = 'localhost'
        self.rpc_host = 4242
        self._client = None
        self.server = None
        self.server_thread = None
        self._root = None
        self.config = {'allow_all_attrs': True, 'allow_pickle': True,
                       'sync_request_timeout': 300}

    def start_server(self):
        """Run RPC server."""
        if self.server:
            LOG.debug('RPC server is already started.')
            return
        self.server = ThreadedServer(MongoRPCService, port=self.rpc_port,
                                     protocol_config=self.config,
                                     listener_timeout=None)
        self.server_thread = Thread(target=self.server.start)

        self.server_thread.daemon = True
        self.server_thread.start()

    @property
    def client(self):
        if not self._client:
            self._client = rpyc.connect(self.rpc_host, self.rpc_port,
                                        config=self.config)
            self._root = self._client.root
        return self._client

    def run_client(self):
        self._root = self.client.root
        return self._root

    def my_func(self):
        """ RPC Wrapper to Mark unfinished executions as force
        aborted.
        """
        self._root.call_my_function()

 class MyRPCService(rpyc.Service):

    ALIASES = ['MYRPC']

    @staticmethod
    def exposed_call_my_function():
        #My code runs here to read a record from MySQL

if __name__ == '__main__':
    rpc_manager = MyRPCManager()
    rpc_manager.start_server()

Client Side:

rpc_manager = MongoRPCManager()
rpc_manager.run_client()
rpc_manager.call_my_function()
msidana commented 4 years ago

Any update on this ?