PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
17.41k stars 1.64k forks source link

Recursive invocation of a flow fails with `RuntimeError: The task runner is already started!` #8307

Closed rsampaths16 closed 1 year ago

rsampaths16 commented 1 year ago

First check

Bug summary

A flow when it invokes the same flow as a subflow ( recursively ) it fails with a runtime error The task runner is already started!

I assume since flows can invoke sub-flows recursion should be possible till a recursion depth / stack limit is reached.

Reproduction

from prefect import flow

@flow
def rec(a):
    if a > 0:
        print(a)
        return 1 + rec(a // 2)
    return 1

Error

10:24:09.413 | ERROR   | Flow run 'nu-naito' - Encountered exception during execution:
Traceback (most recent call last):
  File "/home/<user>/.local/lib/python3.7/site-packages/prefect/engine.py", line 643, in orchestrate_flow_run
    result = await run_sync(flow_call)
  File "/home/<user>/.local/lib/python3.7/site-packages/prefect/utilities/asyncutils.py", line 161, in run_sync_in_interruptible_worker_thread
    limiter=get_thread_limiter(),
  File "/home/<user>/.local/lib/python3.7/site-packages/anyio/_backends/_asyncio.py", line 662, in __aexit__
    raise exceptions[0]
  File "/home/<user>/.local/lib/python3.7/site-packages/anyio/_backends/_asyncio.py", line 702, in _run_wrapped_task
    await coro
  File "/home/<user>/.local/lib/python3.7/site-packages/anyio/to_thread.py", line 32, in run_sync
    func, *args, cancellable=cancellable, limiter=limiter
  File "/home/<user>/.local/lib/python3.7/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/home/<user>/.local/lib/python3.7/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "/home/<user>/.local/lib/python3.7/site-packages/prefect/utilities/asyncutils.py", line 135, in capture_worker_thread_and_result
    result = __fn(*args, **kwargs)
  File "srec.py", line 7, in rec
    return 1 + rec(a // 2)
  File "/home/<user>/.local/lib/python3.7/site-packages/prefect/flows.py", line 452, in __call__
    return_type=return_type,
  File "/home/<user>/.local/lib/python3.7/site-packages/prefect/engine.py", line 175, in enter_flow_run_engine_from_flow_call
    return run_async_from_worker_thread(begin_run)
  File "/home/<user>/.local/lib/python3.7/site-packages/prefect/utilities/asyncutils.py", line 177, in run_async_from_worker_thread
    return anyio.from_thread.run(call)
  File "/home/<user>/.local/lib/python3.7/site-packages/anyio/from_thread.py", line 49, in run
    return asynclib.run_async_from_thread(func, *args)
  File "/home/<user>/.local/lib/python3.7/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
    return f.result()
  File "/usr/local/lib/python3.7/concurrent/futures/_base.py", line 435, in result
    return self.__get_result()
  File "/usr/local/lib/python3.7/concurrent/futures/_base.py", line 384, in __get_result
    raise self._exception
  File "/home/<user>/.local/lib/python3.7/site-packages/prefect/client/utilities.py", line 47, in with_injected_client
    return await fn(*args, **kwargs)
  File "/home/<user>/.local/lib/python3.7/site-packages/prefect/engine.py", line 516, in create_and_begin_subflow_run
    task_runner = await stack.enter_async_context(flow.task_runner.start())
  File "/usr/local/lib/python3.7/contextlib.py", line 570, in enter_async_context
    result = await _cm_type.__aenter__(cm)
  File "/usr/local/lib/python3.7/contextlib.py", line 170, in __aenter__
    return await self.gen.__anext__()
  File "/home/<user>/.local/lib/python3.7/site-packages/prefect/task_runners.py", line 159, in start
    raise RuntimeError("The task runner is already started!")
RuntimeError: The task runner is already started!
10:24:09.477 | ERROR   | Flow run 'nu-naito' - Finished in state Failed('Flow run encountered an exception. RuntimeError: The task runner is already started!\n')
5
10:24:09.907 | INFO    | prefect.infrastructure.process - Process 'nu-naito' exited cleanly.

Versions

Version:             2.7.10
API version:         0.8.4
Python version:      3.7.9
Git commit:          f269d49b
Built:               Thu, Jan 26, 2023 3:51 PM
OS/Arch:             linux/x86_64
Profile:             default
Server type:         hosted

Additional context

I believe the issue https://github.com/PrefectHQ/prefect/issues/7319 could be relevant

rsampaths16 commented 1 year ago

Reference to my RCA: https://github.com/PrefectHQ/prefect/issues/7319#issuecomment-1409055711

zanieb commented 1 year ago

Thanks for the issue, we're aware of this one. A fix will require using a copy of the configured task runner for each flow run instead of the flow.task_runner attribute directly.

rsampaths16 commented 1 year ago

Hi @madkinsz, is there an ETA for the fix? It'd really help to know. Thanks.

p.s: wouldn't builder make more sense here than creating copies / clones? we'd let builder take the responsibility of creating proper context.