vutran1710 / PyrateLimiter

⚔️Python Rate-Limiter using Leaky-Bucket Algorithm Family
https://pyratelimiter.readthedocs.io
MIT License
354 stars 36 forks source link

Flaky failures in test_concurrency #94

Closed musicinmybrain closed 1 year ago

musicinmybrain commented 1 year ago

On Fedora Linux 37, x86_64, I see the following maybe one time out of every three to five runs:

$ pytest tests/test_concurrency.py -v
============================================== test session starts ===============================================
platform linux -- Python 3.11.1, pytest-6.2.5, py-1.11.0, pluggy-1.0.0 -- /home/ben/src/forks/PyrateLimiter/_e/bin
/python3
cachedir: .pytest_cache
rootdir: /home/ben/src/forks/PyrateLimiter
plugins: forked-1.4.0, cov-3.0.0, xdist-2.5.0, asyncio-0.12.0
collected 3 items                                                                                                

tests/test_concurrency.py::test_concurrency[ThreadPoolExecutor-SQLiteBucket] PASSED                        [ 33%]
tests/test_concurrency.py::test_concurrency[ProcessPoolExecutor-SQLiteBucket] FAILED                       [ 66%]
tests/test_concurrency.py::test_filelock_concurrency PASSED                                                [100%]

==================================================== FAILURES ====================================================
_______________________________ test_concurrency[ProcessPoolExecutor-SQLiteBucket] _______________________________

executor_class = <class 'concurrent.futures.process.ProcessPoolExecutor'>
bucket_class = <class 'pyrate_limiter.sqlite_bucket.SQLiteBucket'>

    @pytest.mark.parametrize("bucket_class", [SQLiteBucket])
    @pytest.mark.parametrize("executor_class", [ThreadPoolExecutor, ProcessPoolExecutor])
    def test_concurrency(executor_class, bucket_class):
        """Make a fixed number of concurrent requests using a shared Limiter, and check the total time
        they take to run
        """
        logger.info(f"Testing {bucket_class.__name__} with {executor_class.__name__}")

        # Set up limiter
        bucket_kwargs = {
            "path": join(gettempdir(), f"test_{executor_class.__name__}.sqlite"),
        }
        limiter = Limiter(
            RequestRate(LIMIT_REQUESTS_PER_SECOND, Duration.SECOND),
            bucket_class=bucket_class,
            bucket_kwargs=bucket_kwargs,
        )

        # Set up request function
        bucket_ids = [f"{executor_class.__name__}_bucket_{i}" for i in range(N_BUCKETS)]
        start_time = perf_counter()
        request_func = partial(_send_request, limiter, bucket_ids, start_time)

        # Distribute requests across workers
        with executor_class(max_workers=N_WORKERS) as executor:
            list(executor.map(request_func, range(N_REQUESTS), timeout=300))

        # Check total time, with debug logging
        elapsed = perf_counter() - start_time
        expected_min_time = (N_REQUESTS - 1) / LIMIT_REQUESTS_PER_SECOND
        worker_type = "threads" if executor_class is ThreadPoolExecutor else "processes"
        logger.info(
            f"Ran {N_REQUESTS} requests with {N_WORKERS} {worker_type} in {elapsed:.2f} seconds\n"
            f"With a rate limit of {LIMIT_REQUESTS_PER_SECOND}/second, expected at least "
            f"{expected_min_time} seconds"
        )
>       assert elapsed >= expected_min_time
E       assert 9.634221867017914 >= 10.0

tests/test_concurrency.py:68: AssertionError
---------------------------------------------- Captured stderr call ----------------------------------------------
INFO:pyrate_limiter.tests:Testing SQLiteBucket with ProcessPoolExecutor
INFO:pyrate_limiter.tests:Ran 101 requests with 7 processes in 9.63 seconds
With a rate limit of 10/second, expected at least 10.0 seconds
----------------------------------------------- Captured log call ------------------------------------------------
INFO     pyrate_limiter.tests:test_concurrency.py:38 Testing SQLiteBucket with ProcessPoolExecutor
INFO     pyrate_limiter.tests:test_concurrency.py:63 Ran 101 requests with 7 processes in 9.63 seconds
With a rate limit of 10/second, expected at least 10.0 seconds
============================================ short test summary info =============================================
FAILED tests/test_concurrency.py::test_concurrency[ProcessPoolExecutor-SQLiteBucket] - assert 9.634221867017914...
========================================== 1 failed, 2 passed in 50.16s ==========================================

While trying to package PyrateLimiter as an RPM for Fedora Linux Rawhide (the development version), we have also seen:

=================================== FAILURES ===================================
______________ test_concurrency[ProcessPoolExecutor-SQLiteBucket] ______________
concurrent.futures.process._RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/usr/lib64/python3.11/concurrent/futures/process.py", line 256, in _process_worker
    r = call_item.fn(*call_item.args, **call_item.kwargs)
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib64/python3.11/concurrent/futures/process.py", line 205, in _process_chunk
    return [fn(*args) for args in chunk]
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib64/python3.11/concurrent/futures/process.py", line 205, in <listcomp>
    return [fn(*args) for args in chunk]
            ^^^^^^^^^
  File "/builddir/build/BUILD/PyrateLimiter-103252ca8d5336dc19b69fda6b65798eac932fd2/tests/test_concurrency.py", line 75, in _send_request
    with limiter.ratelimit(*bucket_ids, delay=True):
  File "/builddir/build/BUILD/PyrateLimiter-103252ca8d5336dc19b69fda6b65798eac932fd2/pyrate_limiter/limit_context_decorator.py", line 66, in __enter__
    self.delayed_acquire()
  File "/builddir/build/BUILD/PyrateLimiter-103252ca8d5336dc19b69fda6b65798eac932fd2/pyrate_limiter/limit_context_decorator.py", line 82, in delayed_acquire
    self.try_acquire()
  File "/builddir/build/BUILD/PyrateLimiter-103252ca8d5336dc19b69fda6b65798eac932fd2/pyrate_limiter/limiter.py", line 92, in try_acquire
    volume = bucket.size()
             ^^^^^^^^^^^^^
  File "/builddir/build/BUILD/PyrateLimiter-103252ca8d5336dc19b69fda6b65798eac932fd2/pyrate_limiter/sqlite_bucket.py", line 95, in size
    self._size = self._query_size()
                 ^^^^^^^^^^^^^^^^^^
  File "/builddir/build/BUILD/PyrateLimiter-103252ca8d5336dc19b69fda6b65798eac932fd2/pyrate_limiter/sqlite_bucket.py", line 100, in _query_size
    return self.connection.execute(f"SELECT COUNT(*) FROM {self.table}").fetchone()[0]
           ^^^^^^^^^^^^^^^
  File "/builddir/build/BUILD/PyrateLimiter-103252ca8d5336dc19b69fda6b65798eac932fd2/pyrate_limiter/sqlite_bucket.py", line 73, in connection
    self._connection.execute(
sqlite3.OperationalError: database is locked
"""
The above exception was the direct cause of the following exception:
executor_class = <class 'concurrent.futures.process.ProcessPoolExecutor'>
bucket_class = <class 'pyrate_limiter.sqlite_bucket.SQLiteBucket'>
    @pytest.mark.parametrize("bucket_class", [SQLiteBucket])
    @pytest.mark.parametrize("executor_class", [ThreadPoolExecutor, ProcessPoolExecutor])
    def test_concurrency(executor_class, bucket_class):
        """Make a fixed number of concurrent requests using a shared Limiter, and check the total time
        they take to run
        """
        logger.info(f"Testing {bucket_class.__name__} with {executor_class.__name__}")

        # Set up limiter
        bucket_kwargs = {
            "path": join(gettempdir(), f"test_{executor_class.__name__}.sqlite"),
        }
        limiter = Limiter(
            RequestRate(LIMIT_REQUESTS_PER_SECOND, Duration.SECOND),
            bucket_class=bucket_class,
            bucket_kwargs=bucket_kwargs,
        )

        # Set up request function
        bucket_ids = [f"{executor_class.__name__}_bucket_{i}" for i in range(N_BUCKETS)]
        start_time = perf_counter()
        request_func = partial(_send_request, limiter, bucket_ids, start_time)

        # Distribute requests across workers
        with executor_class(max_workers=N_WORKERS) as executor:
>           list(executor.map(request_func, range(N_REQUESTS), timeout=300))
tests/test_concurrency.py:57: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
/usr/lib64/python3.11/concurrent/futures/process.py:597: in _chain_from_iterable_of_lists
    for element in iterable:
/usr/lib64/python3.11/concurrent/futures/_base.py:621: in result_iterator
    yield _result_or_cancel(fs.pop(), end_time - time.monotonic())
/usr/lib64/python3.11/concurrent/futures/_base.py:317: in _result_or_cancel
    return fut.result(timeout)
/usr/lib64/python3.11/concurrent/futures/_base.py:449: in result
    return self.__get_result()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
self = None
    def __get_result(self):
        if self._exception:
            try:
>               raise self._exception
E               sqlite3.OperationalError: database is locked
/usr/lib64/python3.11/concurrent/futures/_base.py:401: OperationalError
----------------------------- Captured stderr call -----------------------------
INFO:pyrate_limiter.tests:Testing SQLiteBucket with ProcessPoolExecutor
------------------------------ Captured log call -------------------------------
INFO     pyrate_limiter.tests:test_concurrency.py:38 Testing SQLiteBucket with ProcessPoolExecutor
=========================== short test summary info ============================
FAILED tests/test_concurrency.py::test_concurrency[ProcessPoolExecutor-SQLiteBucket]
=================== 1 failed, 54 passed in 209.79s (0:03:29) ===================

but this is harder to reliably reproduce.

JWCook commented 1 year ago

Hmm, I haven't been able to reproduce that locally, but both of those errors would seem to indicate that SQLiteBucket isn't safe to use with multiprocessing, and only FileLockSQLiteBucket should be used in those cases (see #68). I'll submit a PR for that.