tomerfiliba-org / rpyc

RPyC (Remote Python Call) - A transparent and symmetric RPC library for python
http://rpyc.readthedocs.org
Other
1.57k stars 243 forks source link

strange behavior when using threads #360

Open hagaisela opened 4 years ago

hagaisela commented 4 years ago

Hi, I am trying to use rpyc alongside with scapy's AsyncSniffer. the AsyncSniffer opens a background thread which listens for packets. It also enables using a filter function for filtering packets.

I have tests running on one machine which use rpyc to connect to another machine that runs the sniffer. the packet filter runs on the test machine, not the sniffer machine.

The sequence of events is that the test machine calls start_sniffer via rpyc, this creates the scapy thread on the remote machine, then packets start arriving and each packet is filterered. meanwhile the test machine calls rx() and this waits for the scapy thread to end of the remote machine.

What I am seeing is that on some of the test runs calling the packet filter somehow triggers rx, and I can't understand why. see this stack trace:

2019-11-17 09:58:05.207 utils.sniffers.raw_pkts_sniffer: [INFO] @@@ hagai before join, thread name=Thread-3, gevent=False
  File "/usr/lib/python3.6/threading.py", line 884, in _bootstrap
    self._bootstrap_inner()
  File "/usr/lib/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.6/threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/lib/python3.6/dist-packages/scapy/sendrecv.py", line 921, in _run
    if lfilter and not lfilter(p):
  File "/tmp/test_services.zip/utils/sniffers/raw_pkts_sniffer.py", line 97, in filter_pkt
    if self.pkt_filter is None or self.pkt_filter(pkt):
  File "/tmp/test_services.zip/services/communicator_service.py", line 22, in _run_filter
    ret = pkt_filter is None or pkt_filter(bytes(pkt))
  File "/usr/local/lib/python3.6/dist-packages/rpyc/core/netref.py", line 247, in __call__
    return syncreq(_self, consts.HANDLE_CALL, args, kwargs)
  File "/usr/local/lib/python3.6/dist-packages/rpyc/core/netref.py", line 76, in syncreq
    return conn.sync_request(handler, proxy, *args)
  File "/usr/local/lib/python3.6/dist-packages/rpyc/core/protocol.py", line 464, in sync_request
    return self.async_request(handler, *args, timeout=timeout).value
  File "/usr/local/lib/python3.6/dist-packages/rpyc/core/async_.py", line 100, in value
    self.wait()
  File "/usr/local/lib/python3.6/dist-packages/rpyc/core/async_.py", line 47, in wait
    self._conn.serve(self._ttl)
  File "/usr/local/lib/python3.6/dist-packages/rpyc/core/protocol.py", line 387, in serve
    self._dispatch(data)
  File "/usr/local/lib/python3.6/dist-packages/rpyc/core/protocol.py", line 353, in _dispatch
    self._dispatch_request(seq, args)
  File "/usr/local/lib/python3.6/dist-packages/rpyc/core/protocol.py", line 323, in _dispatch_request
    res = self._HANDLERS[handler](self, *args)
  File "/usr/local/lib/python3.6/dist-packages/rpyc/core/protocol.py", line 585, in _handle_call
    return obj(*args, **dict(kwargs))
  File "/tmp/test_services.zip/services/communicator_service.py", line 38, in exposed_rx
    pkts = self._communicator.rx(timeout)
  File "/tmp/test_services.zip/utils/communicators/scapy_communicator.py", line 21, in rx
    return self._sniffer.getres(timeout=timeout)
  File "/tmp/test_services.zip/utils/sniffers/raw_pkts_sniffer.py", line 36, in getres
    traceback.print_stack()

I am using rpyc 4.1.2, and python 3.6 on ubuntu 18.04.

comrumino commented 4 years ago

If it works for your use case, I would recommend using the sniffer to write to a pcap and send the pcap back to test machine periodically. This would effectively batch process packets, to me it seems like you currently filter packet by packet that would mean there is a round-trip for each packet. To provide a more exact answer, I would need an example that reproduces the issue---it might be implementation dependent (i.e. round-trip latency, threading, default timeouts, etc).

Is processing a batch of packets using by sending a PCAP possible or do you have a minimal test case?

comrumino commented 4 years ago

As an update, I added some debugging documentation. A PDF of the TCP stream that the exception occurs in would be helpful. If there is any concern about confidentiality, you could always encrypt it with my PGP and email under my profile.