PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
15.95k stars 1.57k forks source link

Cached task raising tenacity exception crashes #14417

Open thundercat1 opened 3 months ago

thundercat1 commented 3 months ago

First check

Bug summary

I have a task that calls a library function. Inside the library, tenacity is used for retries. But when a tenacity retry exception is raised, the task crashes without executing its own retries.

Reproduction

import tenacity
from prefect import task
from prefect.tasks import task_input_hash

@tenacity.retry(stop=tenacity.stop_after_attempt(3), reraise=False)
def library_function_with_its_own_retries():
    raise ValueError("Simulated library function failure")

@task(
    cache_key_fn=task_input_hash,
    retries=1,
    log_prints=True,
)
def call_library_function():
    print("You should see this print statement 2 times")
    library_function_with_its_own_retries()

if __name__ == "__main__":
    call_library_function()

Error

14:43:18.918 | INFO    | prefect.engine - Created task run 'call_library_function-83f0e823' for task 'call_library_function'
14:43:18.980 | INFO    | Task run 'call_library_function-83f0e823' - You should see this print statement 2 times
14:43:18.982 | ERROR   | Task run 'call_library_function-83f0e823' - Encountered exception during execution:
Traceback (most recent call last):
  File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/tenacity/__init__.py", line 478, in __call__
    result = fn(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^
  File "/Users/michael.haines/ipp-poc/flows/no-op-flow.py", line 26, in library_function_with_its_own_retries
    raise ValueError("Simulated library function failure")
ValueError: Simulated library function failure

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/prefect/engine.py", line 2103, in orchestrate_task_run
    result = await call.aresult()
             ^^^^^^^^^^^^^^^^^^^^
  File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 327, in aresult
    return await asyncio.wrap_future(self.future)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 352, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/michael.haines/ipp-poc/flows/no-op-flow.py", line 36, in call_library_function
    library_function_with_its_own_retries()
  File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/tenacity/__init__.py", line 336, in wrapped_f
    return copy(f, *args, **kw)
           ^^^^^^^^^^^^^^^^^^^^
  File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/tenacity/__init__.py", line 475, in __call__
    do = self.iter(retry_state=retry_state)
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/tenacity/__init__.py", line 376, in iter
    result = action(retry_state)
             ^^^^^^^^^^^^^^^^^^^
  File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/tenacity/__init__.py", line 419, in exc_check
    raise retry_exc from fut.exception()
tenacity.RetryError: RetryError[<Future at 0x112e22b10 state=finished raised ValueError>]
14:43:18.985 | ERROR   | Task run 'call_library_function-83f0e823' - Crash detected! Execution was interrupted by an unexpected exception: TypeError: cannot pickle '_thread.RLock' object
Traceback (most recent call last):
  File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/tenacity/__init__.py", line 478, in __call__
    result = fn(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^
  File "/Users/michael.haines/ipp-poc/flows/no-op-flow.py", line 26, in library_function_with_its_own_retries
    raise ValueError("Simulated library function failure")
ValueError: Simulated library function failure

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/prefect/engine.py", line 2103, in orchestrate_task_run
    result = await call.aresult()
             ^^^^^^^^^^^^^^^^^^^^
  File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 327, in aresult
    return await asyncio.wrap_future(self.future)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 352, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/michael.haines/ipp-poc/flows/no-op-flow.py", line 36, in call_library_function
    library_function_with_its_own_retries()
  File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/tenacity/__init__.py", line 336, in wrapped_f
    return copy(f, *args, **kw)
           ^^^^^^^^^^^^^^^^^^^^
  File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/tenacity/__init__.py", line 475, in __call__
    do = self.iter(retry_state=retry_state)
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/tenacity/__init__.py", line 376, in iter
    result = action(retry_state)
             ^^^^^^^^^^^^^^^^^^^
  File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/tenacity/__init__.py", line 419, in exc_check
    raise retry_exc from fut.exception()
tenacity.RetryError: RetryError[<Future at 0x112e22b10 state=finished raised ValueError>]

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/michael.haines/ipp-poc/flows/no-op-flow.py", line 40, in <module>
    call_library_function()
  File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/prefect/tasks.py", line 689, in __call__
    return enter_task_run_engine(
           ^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/prefect/engine.py", line 1392, in enter_task_run_engine
    return submit_autonomous_task_run_to_engine(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/prefect/utilities/asyncutils.py", line 304, in coroutine_wrapper
    return call()
           ^^^^^^
  File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 432, in __call__
    return self.result()
           ^^^^^^^^^^^^^
  File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 318, in result
    return self.future.result(timeout=timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 179, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/Users/michael.haines/.pyenv/versions/3.11.6/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 389, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/prefect/task_engine.py", line 67, in submit_autonomous_task_run_to_engine
    future_result_or_state = from_sync.wait_for_call_in_loop_thread(
                             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/prefect/_internal/concurrency/api.py", line 218, in wait_for_call_in_loop_thread
    return call.result()
           ^^^^^^^^^^^^^
  File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 318, in result
    return self.future.result(timeout=timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 179, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/Users/michael.haines/.pyenv/versions/3.11.6/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 389, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/prefect/engine.py", line 1555, in get_task_call_return_value
    return await future._result()
           ^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/prefect/futures.py", line 237, in _result
    return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/prefect/states.py", line 91, in _get_state_result
    raise await get_state_exception(state)
  File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/prefect/task_runners.py", line 231, in submit
    result = await call()
             ^^^^^^^^^^^^
  File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/prefect/engine.py", line 1806, in begin_task_run
    state = await orchestrate_task_run(
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/prefect/engine.py", line 2124, in orchestrate_task_run
    terminal_state = await exception_to_failed_state(
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/prefect/states.py", line 201, in exception_to_failed_state
    data = await result_factory.create_result(exc)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/prefect/results.py", line 453, in create_result
    return await PersistedResult.create(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/prefect/results.py", line 681, in create
    data = serializer.dumps(obj)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/prefect/serializers.py", line 148, in dumps
    blob = pickler.dumps(obj)
           ^^^^^^^^^^^^^^^^^^
  File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/cloudpickle/cloudpickle.py", line 1479, in dumps
    cp.dump(obj)
  File "/Users/michael.haines/.pyenv/versions/3.11.6/envs/ipp-poc/lib/python3.11/site-packages/cloudpickle/cloudpickle.py", line 1245, in dump
    return super().dump(obj)
           ^^^^^^^^^^^^^^^^^
TypeError: cannot pickle '_thread.RLock' object

Versions (prefect version output)

Version:             2.19.6
API version:         0.8.4
Python version:      3.11.6
Git commit:          9d938fe7
Built:               Mon, Jun 24, 2024 10:23 AM
OS/Arch:             darwin/arm64
Profile:             default
Server type:         ephemeral
Server:
  Database:          sqlite
  SQLite version:    3.43.2

Additional context

This only is a problem if a cache_key_fn is included in the task decorator.

Easy workarounds exist -

  1. If you own the library, you can set reraise=True in the tenacity retry
  2. You can catch tenacity exceptions in the task, and raise some other exception type

I'll be implementing one of these workarounds, but it would be nice if the core library included handling for tenacity exceptions by default.

zhen0 commented 3 months ago

Thanks for raising this one @thundercat1 and for providing workarounds in case others come across it. I've added this issue to our backlog.

zzstoatzz commented 2 months ago

hi @thundercat1 - I think this issue has mostly to do with the fact that you cannot pickle a _thread.RLock object in general

import pickle
import threading

def minimal_repro():
    lock = threading.RLock()
    try:
        pickle.dumps(lock)
    except TypeError as e:
        print(f"Pickling error: {e}")

if __name__ == "__main__":
    minimal_repro() # Pickling error: cannot pickle '_thread.RLock' object

if you use result_serializer="json", your example works (i.e. fails as expected). I'm not sure this is something we'd want to add tenacity specific handling for. Does using an alternate serializer solve this issue for you?

thundercat1 commented 2 months ago

I'm not sure this is something we'd want to add tenacity specific handling for.

I imagine the common usage pattern would be to define a serializer that works for your expected / successful return value (since that's what you want to cache). The fact that your serializer needs to handle all possible unhandled exceptions, even though you have no intent to cache them, is very surprising behavior. What if some other situation arises where an exception is raised that isn't json serializable?

I agree that tenacity specific handling probably isn't warranted, but I wonder if it's possible to implement more generic handling that allows retries to be executed, and task state to be correctly assigned (failed) even when the exception isn't serializable?

Does using an alternate serializer solve this issue for you?

I've implemented some other handling here - now that I know of this particular issue, I can catch this specific exception and replace it with something that's serializable by default. But I also think it's something Prefect should probably be able to handle by default.

cicdw commented 1 month ago

The core problem here appears to be that we are persisting (and therefore serializing) exceptions; in general I don't like this and don't think it's necessary - I'll look at whether any functionality depends on that and whether we can safely remove it. That being said, I want to callout that if a user uses a distributed system (like Dask or Ray), then the same error would occur because those systems would need to serialize the exception to send it across the network. I think we can look at opening an issue in tenacity to avoid maintaining a lock reference on raised exceptions.

cicdw commented 1 month ago

Ha, looks like someone has raised this issue with tenacity already: https://github.com/jd/tenacity/issues/429