EdgePi-Cloud / edgepi-python-sdk

Python SDK to control EdgePi, an industrial PC/PLC/IoT device powered by Raspberry Pi CM4
MIT License
5 stars 3 forks source link

Concurrency bug when calling an SDK function with multiple thread #398

Open josiah-tesfu opened 10 months ago

josiah-tesfu commented 10 months ago

When calling an SDK function with multiple threads which use a shared SDK instance, one of the following exceptions occurs:

1.

 /home/pi/.local/lib/python3.9/site-packages/_pytest/unraisableexception.py:78: PytestUnraisableExceptionWarning: Exception ignored in: <function SPI.__del__ at 0x7fb30ab8b0>

  Traceback (most recent call last):
    File "/home/pi/.local/lib/python3.9/site-packages/periphery/spi.py", line 209, in close
      os.close(self._fd)
  OSError: [Errno 9] Bad file descriptor

  During handling of the above exception, another exception occurred:

  Traceback (most recent call last):
    File "/home/pi/.local/lib/python3.9/site-packages/periphery/spi.py", line 78, in __del__
      self.close()
    File "/home/pi/.local/lib/python3.9/site-packages/periphery/spi.py", line 211, in close
      raise SPIError(e.errno, "Closing SPI device: " + e.strerror)
  periphery.spi.SPIError: [Errno 9] Closing SPI device: Bad file descriptor

2.

 /home/pi/.local/lib/python3.9/site-packages/_pytest/unraisableexception.py:78: PytestUnraisableExceptionWarning: Exception ignored in: <function I2C.__del__ at 0x7fb30b2670>

  Traceback (most recent call last):
    File "/home/pi/.local/lib/python3.9/site-packages/periphery/i2c.py", line 162, in close
      os.close(self._fd)
  OSError: [Errno 9] Bad file descriptor

  During handling of the above exception, another exception occurred:

  Traceback (most recent call last):
    File "/home/pi/.local/lib/python3.9/site-packages/periphery/i2c.py", line 61, in __del__
      self.close()
    File "/home/pi/.local/lib/python3.9/site-packages/periphery/i2c.py", line 164, in close
      raise I2CError(e.errno, "Closing I2C device: " + e.strerror)
  periphery.i2c.I2CError: [Errno 9] Closing I2C device: Bad file descriptor

    warnings.warn(pytest.PytestUnraisableExceptionWarning(msg))

This specific stack trace arises from calling set_rtd in the ADC Service with multiple threads using a shared SDK instance, but other functions called also cause the same exceptions to be raised. The bug seems to arise when multiple SDK instances use a shared hardware resource -- in this case I2C and SPI devices.

When testing for this exception, I found that when the threads each have their own instance of the SDK the tests pass. The tests fail if the threads share an SDK instance, with the rate of failure increasing with more threads. Additionally, the tests pass if the threads share an SDK instance but the SDK function call is protected with a lock.

Test code to reproduce the issue:

Shared SDK instance (subset of the tests fail):

import threading
import pytest
from edgepi.adc.edgepi_adc import EdgePiADC
from edgepi.adc.adc_constants import ADCNum

class PropagatingThread(threading.Thread):
    """Propagating thread to raise exceptions in calling function"""
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.exc = None

    def run(self):
        self.exc = None
        try:
            super().run()
        except Exception as e:
            self.exc = e

    def join(self, *args, **kwargs):
        super().join(*args, **kwargs)
        if self.exc:
            raise self.exc

shared_sdk = EdgePiADC()

def set_rtd_client():
    """Reset the RTD for ADC"""
    shared_sdk.set_rtd(set_rtd=False, adc_num=ADCNum.ADC_1)

@pytest.mark.parametrize("iteration", range(100))
def test_adc_concurrency(iteration):
    """Test for ADC concurrency bug"""

    threads = [PropagatingThread(target=set_rtd_client) for _ in range(5)]
    for thread in threads: thread.start()
    for thread in threads: thread.join()

Shared SDK instance with lock (all tests pass):

import threading
import pytest
from edgepi.adc.edgepi_adc import EdgePiADC
from edgepi.adc.adc_constants import ADCNum

class PropagatingThread(threading.Thread):
    """Propagating thread to raise exceptions in calling function"""
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.exc = None

    def run(self):
        self.exc = None
        try:
            super().run()
        except Exception as e:
            self.exc = e

    def join(self, *args, **kwargs):
        super().join(*args, **kwargs)
        if self.exc:
            raise self.exc

shared_sdk = EdgePiADC()
lock = threading.Lock()

def set_rtd_client():
    """Reset the RTD for ADC"""
    with lock:
        shared_sdk.set_rtd(set_rtd=False, adc_num=ADCNum.ADC_1)

@pytest.mark.parametrize("iteration", range(100))
def test_adc_concurrency(iteration):
    """Test for ADC concurrency bug"""

    threads = [PropagatingThread(target=set_rtd_client) for _ in range(30)]
    for thread in threads: thread.start()
    for thread in threads: thread.join()

Non-shared instances (all tests pass):

import threading
import pytest
from edgepi.adc.edgepi_adc import EdgePiADC
from edgepi.adc.adc_constants import ADCNum

class PropagatingThread(threading.Thread):
    """Propagating thread to raise exceptions in calling function"""
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.exc = None

    def run(self):
        self.exc = None
        try:
            super().run()
        except Exception as e:
            self.exc = e

    def join(self, *args, **kwargs):
        super().join(*args, **kwargs)
        if self.exc:
            raise self.exc

def set_rtd_client():
    """Reset the RTD for ADC"""
    adc = EdgePiADC()
    adc.set_rtd(set_rtd=False, adc_num=ADCNum.ADC_1)

@pytest.mark.parametrize("iteration", range(100))
def test_adc_concurrency(iteration):
    """Test for ADC concurrency bug"""

    threads = [PropagatingThread(target=set_rtd_client) for _ in range(2)]
    for thread in threads: thread.start()
    for thread in threads: thread.join()
farzadpanahi commented 10 months ago

@josiah-tesfu include your test code please for both shared sdk and protected.

farzadpanahi commented 10 months ago

@sj608 this is a reproducible issue now. @josiah-tesfu has test codes (which should be added to this ticket soon) that can produce the error consistently.

farzadpanahi commented 10 months ago

Non-shared instances (all tests pass)

regarding this test, I just want to add that I think the reason that it is passing is not because the sdk is non-shared but because sdk init is slow the threads do not have a good chance to overlap

farzadpanahi commented 10 months ago

@sjpark608 to make all tests fail just increase number of threads from 5 to 30 here:

    threads = [PropagatingThread(target=set_rtd_client) for _ in range(5)]
sjpark608 commented 10 months ago

This comment is to sum up our brief discussion about thread safety of SDK

reference_1, reference_2, spidev.h source, reference_3, i2c-dev.h source