PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
15.77k stars 1.54k forks source link

exponential_backoff with retries=0 causes a 500 internal server error #13794

Open Samreay opened 3 months ago

Samreay commented 3 months ago

First check

Bug summary

We have the number of retires configurable for different tasks, but pass in some common kwargs to all tasks, including an exponential backoff for retries. When a task pulling data from an endpoint was having issues (due to the endpoint adding a more stringent rate limit), we decreased the number of retries to 0, and then noticed many 500 internal server errors.

It seems that, unlike when you specify a list of values to retry_delay_seconds (which works fine with retries=0), the exponential_backoff function causes some issues under the hood.

Reproduction

from prefect import flow, task
from prefect.tasks import exponential_backoff

@task(retries=0, retry_delay_seconds=exponential_backoff(backoff_factor=1))
def a_task():
    raise ValueError()

@flow()
def a_flow():
    a_task()

if __name__ == "__main__":
    a_flow()

Error

File "/home/sam/arenko/flows/tmp2.py", line 11, in a_flow
    a_task()
  File "/home/sam/arenko/flows/.venv/lib/python3.11/site-packages/prefect/tasks.py", line 689, in __call__
    return enter_task_run_engine(
           ^^^^^^^^^^^^^^^^^^^^^^
  File "/home/sam/arenko/flows/.venv/lib/python3.11/site-packages/prefect/engine.py", line 1421, in enter_task_run_engine
    return from_sync.wait_for_call_in_loop_thread(begin_run)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/sam/arenko/flows/.venv/lib/python3.11/site-packages/prefect/_internal/concurrency/api.py", line 218, in wait_for_call_in_loop_thread
    return call.result()
           ^^^^^^^^^^^^^
  File "/home/sam/arenko/flows/.venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 318, in result
    return self.future.result(timeout=timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/sam/arenko/flows/.venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 179, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/home/sam/.pyenv/versions/3.11.4/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/home/sam/arenko/flows/.venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 389, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "/home/sam/arenko/flows/.venv/lib/python3.11/site-packages/prefect/engine.py", line 1555, in get_task_call_return_value
    return await future._result()
           ^^^^^^^^^^^^^^^^^^^^^^
  File "/home/sam/arenko/flows/.venv/lib/python3.11/site-packages/prefect/futures.py", line 237, in _result
    return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/sam/arenko/flows/.venv/lib/python3.11/site-packages/prefect/states.py", line 91, in _get_state_result
    raise await get_state_exception(state)
  File "/home/sam/arenko/flows/.venv/lib/python3.11/site-packages/prefect/task_runners.py", line 231, in submit
    result = await call()
             ^^^^^^^^^^^^
  File "/home/sam/arenko/flows/.venv/lib/python3.11/site-packages/prefect/engine.py", line 1806, in begin_task_run
    state = await orchestrate_task_run(
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/sam/arenko/flows/.venv/lib/python3.11/site-packages/prefect/engine.py", line 2149, in orchestrate_task_run
    state = await propose_state(client, terminal_state, task_run_id=task_run.id)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/sam/arenko/flows/.venv/lib/python3.11/site-packages/prefect/utilities/engine.py", line 381, in propose_state
    response = await set_state_and_handle_waits(set_state)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/sam/arenko/flows/.venv/lib/python3.11/site-packages/prefect/utilities/engine.py", line 368, in set_state_and_handle_waits
    response = await set_state_func()
               ^^^^^^^^^^^^^^^^^^^^^^
  File "/home/sam/arenko/flows/.venv/lib/python3.11/site-packages/prefect/client/orchestration.py", line 2332, in set_task_run_state
    response = await self._client.post(
               ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/sam/arenko/flows/.venv/lib/python3.11/site-packages/httpx/_client.py", line 1892, in post
    return await self.request(
           ^^^^^^^^^^^^^^^^^^^
  File "/home/sam/arenko/flows/.venv/lib/python3.11/site-packages/httpx/_client.py", line 1574, in request
    return await self.send(request, auth=auth, follow_redirects=follow_redirects)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/sam/arenko/flows/.venv/lib/python3.11/site-packages/prefect/client/base.py", line 358, in send
    response.raise_for_status()
  File "/home/sam/arenko/flows/.venv/lib/python3.11/site-packages/prefect/client/base.py", line 171, in raise_for_status
    raise PrefectHTTPStatusError.from_httpx_error(exc) from exc.__cause__
prefect.exceptions.PrefectHTTPStatusError: Server error '500 Internal Server Error' for url 'http://ephemeral-prefect/api/task_runs/03b9ffc8-eabd-4fba-89eb-18d19f4dad5e/set_state'
Response: {'exception_message': 'Internal Server Error'}

Versions

Version:             2.19.1
API version:         0.8.4
Python version:      3.11.4
Git commit:          17a1b1d8
Built:               Thu, May 16, 2024 3:33 PM
OS/Arch:             linux/x86_64
Profile:             default
Server type:         ephemeral
Server:
  Database:          sqlite
  SQLite version:    3.37.2

Additional context

No response

discdiver commented 1 month ago

I can confirm this for Prefect 3.0.0rc10 with Cloud, also.