jd / tenacity

Retrying library for Python
http://tenacity.readthedocs.io
Apache License 2.0
6.82k stars 283 forks source link

TypeError: cannot pickle '_thread.RLock' object with ProcessPoolExecutor #429

Open davetapley opened 10 months ago

davetapley commented 10 months ago

Here's a weird one.

I'm using ProcessPoolExecutor and sometimes that causes in contention on my databases. I already use tenacity elsewhere, so I figure can just wrap in retry_if_exception_type(IOException).

But when IOException is thrown I get TypeError: cannot pickle '_thread.RLock' object.

MRE:


from concurrent.futures import ProcessPoolExecutor, wait
from dataclasses import dataclass
from typing import Callable, Generic, TypeVar

from tenacity import (retry, retry_if_exception_type, stop_after_attempt,
                      wait_fixed)

I = TypeVar('I')
O = TypeVar('O')

class IOException(Exception):
    ...

@dataclass
class Job(Generic[I, O]):
    f: Callable[[I], O]

    @retry(retry=retry_if_exception_type(IOException),
           wait=wait_fixed(0.1),
           stop=stop_after_attempt(10))
    def __call__(self, x: I):
        return self.f(x)

def good_job(x: int) -> int:
    return x * 2

def fail_job(x: int) -> int:
    raise IOException('nope')

data = [1, 2, 3, 4, 5]

with ProcessPoolExecutor() as ex:
    jobs = [ex.submit(Job(good_job), x) for x in data]
    wait(jobs)
    print([job.result() for job in jobs])

with ProcessPoolExecutor() as ex:
    jobs = [ex.submit(Job(fail_job), x) for x in data]
    wait(jobs)
    print([job.result() for job in jobs])

Output:

[2, 4, 6, 8, 10]
concurrent.futures.process._RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/opt/python/3.11.6/lib/python3.11/site-packages/tenacity/__init__.py", line 382, in __call__
    result = fn(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^
  File "/workspaces/ng/tenancity.py", line 23, in __call__
    return self.f(x)
           ^^^^^^^^^
  File "/workspaces/ng/tenancity.py", line 31, in fail_job
    raise IOException('nope')
IOException: nope

The above exception was the direct cause of the following exception:

tenacity.RetryError: RetryError[<Future at 0x7f0a37898e50 state=finished raised IOException>]

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/python/3.11.6/lib/python3.11/concurrent/futures/process.py", line 217, in _sendback_result
    result_queue.put(_ResultItem(work_id, result=result,
  File "/opt/python/3.11.6/lib/python3.11/multiprocessing/queues.py", line 371, in put
    obj = _ForkingPickler.dumps(obj)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python/3.11.6/lib/python3.11/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
TypeError: cannot pickle '_thread.RLock' object
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/workspaces/ng/tenancity.py", line 43, in <module>
    print([job.result() for job in jobs])
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/workspaces/ng/tenancity.py", line 43, in <listcomp>
    print([job.result() for job in jobs])
           ^^^^^^^^^^^^
  File "/opt/python/3.11.6/lib/python3.11/concurrent/futures/_base.py", line 449, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/opt/python/3.11.6/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
TypeError: cannot pickle '_thread.RLock' object
spolloni commented 10 months ago

we're also hitting this but using multiprocessing.Pool instead. Interestingly, setting reraise=True avoids this issue and makes the code behave as expected.

Here is @davetapley's adapted MRE:

from multiprocessing import get_context
from dataclasses import dataclass
from typing import Callable, Generic, TypeVar

from tenacity import (retry, retry_if_exception_type, stop_after_attempt,
                      wait_fixed)

I = TypeVar('I')
O = TypeVar('O')

class IOException(Exception):
    ...

@dataclass
class Job(Generic[I, O]):
    f: Callable[[I], O]

    @retry(
        retry=retry_if_exception_type(IOException),
        wait=wait_fixed(0.1),
        stop=stop_after_attempt(10),
        #reraise=True,
    )
    def __call__(self, x: I):
        return self.f(x)

def good_job(x: int) -> int:
    return x * 2

def fail_job(x: int) -> int:
    raise IOException('nope')

data = [1, 2, 3, 4, 5]

if __name__ == '__main__':
    with get_context('spawn').Pool() as pool:
        for result in pool.imap_unordered(Job(good_job), data):
            print(result)

    with get_context('spawn').Pool() as pool:
        for result in pool.imap_unordered(Job(fail_job), data):
            print(result)

output with reraise=False:

multiprocessing.pool.MaybeEncodingError: Error sending result: '<multiprocessing.pool.ExceptionWithTraceback object at 0x1023aa320>'. Reason: 'TypeError("cannot pickle '_thread.RLock' object")'

output with reraise=True:

__main__.IOException: nope
spolloni commented 10 months ago

^ the issue also seems unrelated to the use of retry_if_exception_type and wait_fixed since removing those still causes the same error.

paqstd-dev commented 3 months ago

I also encountered this problem.

@spolloni's solution reraise=True solves my problem. Thank you