PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
15.95k stars 1.57k forks source link

2.x on_completion hook failing on large number of tasks #15451

Open derekahuang opened 1 week ago

derekahuang commented 1 week ago

Bug summary

Slack link

I run into a timeout error when using the on_completion hook with 100+ tasks. Even if I just do something like grab state.result() in the hook, I run into this error.

  File "/app/src/python/studio/backend/py3_image.binary.runfiles/pip_deps_asgi_lifespan/site-packages/asgi_lifespan/_concurrency/asyncio.py", line 17, in wait
    await self._event.wait()
  File "/app/src/python/studio/backend/py3_image.binary.runfiles/python_3_10_x86_64-unknown-linux-gnu/lib/python3.10/asyncio/locks.py", line 214, in wait
    await fut
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/app/src/python/studio/backend/py3_image.binary.runfiles/python_3_10_x86_64-unknown-linux-gnu/lib/python3.10/asyncio/tasks.py", line 456, in wait_for
    return fut.result()
asyncio.exceptions.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/app/src/python/studio/backend/py3_image.binary.runfiles/pip_deps_asgi_lifespan/site-packages/asgi_lifespan/_concurrency/asyncio.py", line 44, in run_and_fail_after
    await asyncio.wait_for(coroutine(), timeout=seconds)
  File "/app/src/python/studio/backend/py3_image.binary.runfiles/python_3_10_x86_64-unknown-linux-gnu/lib/python3.10/asyncio/tasks.py", line 458, in wait_for
    raise exceptions.TimeoutError() from exc
asyncio.exceptions.TimeoutError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/app/src/python/studio/backend/py3_image.binary.runfiles/pip_deps_prefect/site-packages/prefect/engine.py", line 2367, in _run_task_hooks
    await from_async.call_in_new_thread(
  File "/app/src/python/studio/backend/py3_image.binary.runfiles/pip_deps_prefect/site-packages/prefect/_internal/concurrency/calls.py", line 327, in aresult
    return await asyncio.wrap_future(self.future)
  File "/app/src/python/studio/backend/py3_image.binary.runfiles/pip_deps_prefect/site-packages/prefect/_internal/concurrency/calls.py", line 352, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
  File "/app/src/python/studio/backend/py3_image.binary.runfiles/strap/src/python/prefect_utils/prompt.py", line 171, in progress_callback
    output = state.result()
  File "/app/src/python/studio/backend/py3_image.binary.runfiles/pip_deps_prefect/site-packages/prefect/client/schemas/objects.py", line 244, in result
    return get_state_result(self, raise_on_failure=raise_on_failure, fetch=fetch)
  File "/app/src/python/studio/backend/py3_image.binary.runfiles/pip_deps_prefect/site-packages/prefect/states.py", line 71, in get_state_result
    return _get_state_result(state, raise_on_failure=raise_on_failure)
  File "/app/src/python/studio/backend/py3_image.binary.runfiles/pip_deps_prefect/site-packages/prefect/utilities/asyncutils.py", line 311, in coroutine_wrapper
    return call()
  File "/app/src/python/studio/backend/py3_image.binary.runfiles/pip_deps_prefect/site-packages/prefect/_internal/concurrency/calls.py", line 432, in __call__
    return self.result()
  File "/app/src/python/studio/backend/py3_image.binary.runfiles/pip_deps_prefect/site-packages/prefect/_internal/concurrency/calls.py", line 318, in result
    return self.future.result(timeout=timeout)
  File "/app/src/python/studio/backend/py3_image.binary.runfiles/pip_deps_prefect/site-packages/prefect/_internal/concurrency/calls.py", line 179, in result
    return self.__get_result()
  File "/app/src/python/studio/backend/py3_image.binary.runfiles/python_3_10_x86_64-unknown-linux-gnu/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/app/src/python/studio/backend/py3_image.binary.runfiles/pip_deps_prefect/site-packages/prefect/_internal/concurrency/calls.py", line 389, in _run_async
    result = await coro
  File "/app/src/python/studio/backend/py3_image.binary.runfiles/pip_deps_prefect/site-packages/prefect/states.py", line 98, in _get_state_result
    result = await state.data.get()
  File "/app/src/python/studio/backend/py3_image.binary.runfiles/pip_deps_prefect/site-packages/prefect/client/utilities.py", line 98, in with_injected_client
    async with context as new_client:
  File "/app/src/python/studio/backend/py3_image.binary.runfiles/pip_deps_prefect/site-packages/prefect/client/orchestration.py", line 3350, in __aenter__
    self._ephemeral_lifespan = await self._exit_stack.enter_async_context(
  File "/app/src/python/studio/backend/py3_image.binary.runfiles/python_3_10_x86_64-unknown-linux-gnu/lib/python3.10/contextlib.py", line 619, in enter_async_context
    result = await _cm_type.__aenter__(cm)
  File "/app/src/python/studio/backend/py3_image.binary.runfiles/python_3_10_x86_64-unknown-linux-gnu/lib/python3.10/contextlib.py", line 199, in __aenter__
    return await anext(self.gen)
  File "/app/src/python/studio/backend/py3_image.binary.runfiles/pip_deps_prefect/site-packages/prefect/client/base.py", line 131, in app_lifespan_context
    await context.__aenter__()
  File "/app/src/python/studio/backend/py3_image.binary.runfiles/pip_deps_asgi_lifespan/site-packages/asgi_lifespan/_manager.py", line 99, in __aenter__
    await self.startup()
  File "/app/src/python/studio/backend/py3_image.binary.runfiles/pip_deps_asgi_lifespan/site-packages/asgi_lifespan/_manager.py", line 40, in startup
    await self._concurrency_backend.run_and_fail_after(
  File "/app/src/python/studio/backend/py3_image.binary.runfiles/pip_deps_asgi_lifespan/site-packages/asgi_lifespan/_concurrency/asyncio.py", line 46, in run_and_fail_after
    raise TimeoutError

Is there a timeout setting I can change? I am currently setting the hook like this

    def progress_callback(task: Task, task_run: TaskRun, state: State) -> None:
        output = state.result()
        # some progress_fn

    for i in range(0, len(df), sample_size_per_task):
        futures.append(
            predict_raw_output_task.with_options(
                on_completion=[progress_callback],
            ).submit(
                df_chunk=df.iloc[i : i + sample_size_per_task],
            )
        )

I'm guessing it has something to do with the on_completion hooks being run entirely in one thread, these are not time sensitive hooks for me and I would rather just extend the timeout if possible

Version info (prefect version output)

2.20.7

Additional context

No response

zzstoatzz commented 2 days ago

hey @derekahuang - thanks for the issue! do you happen to have an MRE for this?