PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
15.43k stars 1.51k forks source link

Tasks still running with Sequential Task Runner, if any task fails, when also using submit on task execution #12804

Open robc-kagr opened 2 months ago

robc-kagr commented 2 months ago

First check

Bug summary

While using Prefect version 2.16.9 - If using submit on tasks, while using the sequential task runner, it appears as though wait_for needs to be used to ensure sequential tasks don't continue to execute if any given failure occurs in any task. The following example highlights the problem. Uncommenting/commenting out the respective asyncio.run definitions under the main execution produces the provided screenshots + accompanying stack traces.

import asyncio
from typing import NoReturn

from prefect import flow, task
from prefect.task_runners import ConcurrentTaskRunner, SequentialTaskRunner

@task()
async def my_first_task() -> None:
    raise Exception("oops")

@task()
async def my_second_task() -> None:
    print("hi")

@flow(task_runner=SequentialTaskRunner())
async def my_first_flow() -> None:
    my_future = await my_first_task.submit()
    await my_second_task.submit(wait_for=[my_future])

@flow(task_runner=SequentialTaskRunner())
async def my_second_flow() -> None:
    await my_first_task.submit()
    await my_second_task.submit()

if __name__ == "__main__":
    asyncio.run(my_first_flow())
    # asyncio.run(my_second_flow())

example_without_wait_for_used

OUTPUT ```console 16:13:47.884 | DEBUG | prefect.profiles - Using profile 'default' 16:13:49.692 | DEBUG | prefect.client - Using ephemeral application with database at sqlite+aiosqlite:////home/robc/.prefect/prefect.db 16:13:50.211 | INFO | prefect.engine - Created flow run 'quiet-woodlouse' for flow 'my-second-flow' 16:13:50.215 | DEBUG | Flow run 'quiet-woodlouse' - Starting 'SequentialTaskRunner'; submitted tasks will be run sequentially... 16:13:50.218 | DEBUG | prefect.task_runner.sequential - Starting task runner... 16:13:50.229 | DEBUG | prefect.client - Using ephemeral application with database at sqlite+aiosqlite:////home/robc/.prefect/prefect.db 16:13:50.380 | DEBUG | Flow run 'quiet-woodlouse' - Executing flow 'my-second-flow' for flow run 'quiet-woodlouse'... 16:13:50.382 | DEBUG | Flow run 'quiet-woodlouse' - Beginning execution... 16:13:50.502 | INFO | Flow run 'quiet-woodlouse' - Created task run 'my_first_task-0' for task 'my_first_task' 16:13:50.505 | INFO | Flow run 'quiet-woodlouse' - Executing 'my_first_task-0' immediately... 16:13:50.664 | DEBUG | Task run 'my_first_task-0' - Beginning execution... 16:13:50.670 | ERROR | Task run 'my_first_task-0' - Encountered exception during execution: Traceback (most recent call last): File "/home/robc/repos/prefectshared/testenv/lib/python3.8/site-packages/prefect/engine.py", line 2148, in orchestrate_task_run result = await call.aresult() File "/home/robc/repos/prefectshared/testenv/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 327, in aresult return await asyncio.wrap_future(self.future) File "/home/robc/repos/prefectshared/testenv/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 389, in _run_async result = await coro File "test_rob.py", line 10, in my_first_task raise Exception("oops") Exception: oops 16:13:50.779 | ERROR | Task run 'my_first_task-0' - Finished in state Failed('Task run encountered an exception Exception: oops') 16:13:50.872 | INFO | Flow run 'quiet-woodlouse' - Created task run 'my_second_task-0' for task 'my_second_task' 16:13:50.874 | INFO | Flow run 'quiet-woodlouse' - Executing 'my_second_task-0' immediately... 16:13:51.046 | DEBUG | Task run 'my_second_task-0' - Beginning execution... hi 16:13:51.129 | INFO | Task run 'my_second_task-0' - Finished in state Completed() 16:13:51.233 | DEBUG | prefect.task_runner.sequential - Shutting down task runner... 16:13:51.236 | ERROR | Flow run 'quiet-woodlouse' - Finished in state Failed('1/2 states failed.') Traceback (most recent call last): File "test_rob.py", line 32, in asyncio.run(my_second_flow()) File "/usr/lib/python3.8/asyncio/runners.py", line 44, in run return loop.run_until_complete(main) File "/usr/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete return future.result() File "/home/robc/repos/prefectshared/testenv/lib/python3.8/site-packages/prefect/_internal/concurrency/api.py", line 150, in wait_for_call_in_loop_thread return call.result() File "/home/robc/repos/prefectshared/testenv/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 318, in result return self.future.result(timeout=timeout) File "/home/robc/repos/prefectshared/testenv/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 179, in result return self.__get_result() File "/usr/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result raise self._exception File "/home/robc/repos/prefectshared/testenv/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 389, in _run_async result = await coro File "/home/robc/repos/prefectshared/testenv/lib/python3.8/site-packages/prefect/client/utilities.py", line 78, in with_injected_client return await fn(*args, **kwargs) File "/home/robc/repos/prefectshared/testenv/lib/python3.8/site-packages/prefect/engine.py", line 399, in create_then_begin_flow_run return await state.result(fetch=True) File "/home/robc/repos/prefectshared/testenv/lib/python3.8/site-packages/prefect/states.py", line 91, in _get_state_result raise await get_state_exception(state) File "/home/robc/repos/prefectshared/testenv/lib/python3.8/site-packages/prefect/engine.py", line 2148, in orchestrate_task_run result = await call.aresult() File "/home/robc/repos/prefectshared/testenv/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 327, in aresult return await asyncio.wrap_future(self.future) File "/home/robc/repos/prefectshared/testenv/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 389, in _run_async result = await coro File "test_rob.py", line 10, in my_first_task raise Exception("oops") Exception: oops ```

example_with_wait_for_used

OUTPUT ```console 16:21:15.068 | DEBUG | prefect.profiles - Using profile 'default' 16:21:16.872 | DEBUG | prefect.client - Using ephemeral application with database at sqlite+aiosqlite:////home/robc/.prefect/prefect.db 16:21:17.178 | INFO | prefect.engine - Created flow run 'logical-dinosaur' for flow 'my-first-flow' 16:21:17.182 | DEBUG | Flow run 'logical-dinosaur' - Starting 'SequentialTaskRunner'; submitted tasks will be run sequentially... 16:21:17.184 | DEBUG | prefect.task_runner.sequential - Starting task runner... 16:21:17.193 | DEBUG | prefect.client - Using ephemeral application with database at sqlite+aiosqlite:////home/robc/.prefect/prefect.db 16:21:17.331 | DEBUG | Flow run 'logical-dinosaur' - Executing flow 'my-first-flow' for flow run 'logical-dinosaur'... 16:21:17.333 | DEBUG | Flow run 'logical-dinosaur' - Beginning execution... 16:21:17.449 | INFO | Flow run 'logical-dinosaur' - Created task run 'my_first_task-0' for task 'my_first_task' 16:21:17.451 | INFO | Flow run 'logical-dinosaur' - Executing 'my_first_task-0' immediately... 16:21:17.633 | DEBUG | Task run 'my_first_task-0' - Beginning execution... 16:21:17.639 | ERROR | Task run 'my_first_task-0' - Encountered exception during execution: Traceback (most recent call last): File "/home/robc/repos/prefectshared/testenv/lib/python3.8/site-packages/prefect/engine.py", line 2148, in orchestrate_task_run result = await call.aresult() File "/home/robc/repos/prefectshared/testenv/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 327, in aresult return await asyncio.wrap_future(self.future) File "/home/robc/repos/prefectshared/testenv/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 389, in _run_async result = await coro File "test_rob.py", line 10, in my_first_task raise Exception("oops") Exception: oops 16:21:17.745 | ERROR | Task run 'my_first_task-0' - Finished in state Failed('Task run encountered an exception Exception: oops') 16:21:17.842 | INFO | Flow run 'logical-dinosaur' - Created task run 'my_second_task-0' for task 'my_second_task' 16:21:17.845 | INFO | Flow run 'logical-dinosaur' - Executing 'my_second_task-0' immediately... 16:21:18.022 | DEBUG | prefect.task_runner.sequential - Shutting down task runner... 16:21:18.025 | ERROR | Flow run 'logical-dinosaur' - Finished in state Failed('1/2 states failed.') Traceback (most recent call last): File "test_rob.py", line 31, in asyncio.run(my_first_flow()) File "/usr/lib/python3.8/asyncio/runners.py", line 44, in run return loop.run_until_complete(main) File "/usr/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete return future.result() File "/home/robc/repos/prefectshared/testenv/lib/python3.8/site-packages/prefect/_internal/concurrency/api.py", line 150, in wait_for_call_in_loop_thread return call.result() File "/home/robc/repos/prefectshared/testenv/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 318, in result return self.future.result(timeout=timeout) File "/home/robc/repos/prefectshared/testenv/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 179, in result return self.__get_result() File "/usr/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result raise self._exception File "/home/robc/repos/prefectshared/testenv/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 389, in _run_async result = await coro File "/home/robc/repos/prefectshared/testenv/lib/python3.8/site-packages/prefect/client/utilities.py", line 78, in with_injected_client return await fn(*args, **kwargs) File "/home/robc/repos/prefectshared/testenv/lib/python3.8/site-packages/prefect/engine.py", line 399, in create_then_begin_flow_run return await state.result(fetch=True) File "/home/robc/repos/prefectshared/testenv/lib/python3.8/site-packages/prefect/states.py", line 91, in _get_state_result raise await get_state_exception(state) File "/home/robc/repos/prefectshared/testenv/lib/python3.8/site-packages/prefect/engine.py", line 2148, in orchestrate_task_run result = await call.aresult() File "/home/robc/repos/prefectshared/testenv/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 327, in aresult return await asyncio.wrap_future(self.future) File "/home/robc/repos/prefectshared/testenv/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 389, in _run_async result = await coro File "test_rob.py", line 10, in my_first_task raise Exception("oops") Exception: oops ```

If using submit with sequential task runner, I wouldn't expect to need to use wait_for and expect the result of the second image/trace.

Also - of note, not using submit at all with sequential task runner works with respect to expectations. The excpectation being the second set of images/stack trace

Reproduction

import asyncio
from typing import NoReturn

from prefect import flow, task
from prefect.task_runners import ConcurrentTaskRunner, SequentialTaskRunner

@task()
async def my_first_task() -> None:
    raise Exception("oops")

@task()
async def my_second_task() -> None:
    print("hi")

@flow(task_runner=SequentialTaskRunner())
async def my_first_flow() -> None:
    my_future = await my_first_task.submit()
    await my_second_task.submit(wait_for=[my_future])

@flow(task_runner=SequentialTaskRunner())
async def my_second_flow() -> None:
    await my_first_task.submit()
    await my_second_task.submit()

if __name__ == "__main__":
    asyncio.run(my_first_flow())
    # asyncio.run(my_second_flow())

Error

OUTPUT: with wait for used on submit with sequential task runner ```console 16:21:15.068 | DEBUG | prefect.profiles - Using profile 'default' 16:21:16.872 | DEBUG | prefect.client - Using ephemeral application with database at sqlite+aiosqlite:////home/robc/.prefect/prefect.db 16:21:17.178 | INFO | prefect.engine - Created flow run 'logical-dinosaur' for flow 'my-first-flow' 16:21:17.182 | DEBUG | Flow run 'logical-dinosaur' - Starting 'SequentialTaskRunner'; submitted tasks will be run sequentially... 16:21:17.184 | DEBUG | prefect.task_runner.sequential - Starting task runner... 16:21:17.193 | DEBUG | prefect.client - Using ephemeral application with database at sqlite+aiosqlite:////home/robc/.prefect/prefect.db 16:21:17.331 | DEBUG | Flow run 'logical-dinosaur' - Executing flow 'my-first-flow' for flow run 'logical-dinosaur'... 16:21:17.333 | DEBUG | Flow run 'logical-dinosaur' - Beginning execution... 16:21:17.449 | INFO | Flow run 'logical-dinosaur' - Created task run 'my_first_task-0' for task 'my_first_task' 16:21:17.451 | INFO | Flow run 'logical-dinosaur' - Executing 'my_first_task-0' immediately... 16:21:17.633 | DEBUG | Task run 'my_first_task-0' - Beginning execution... 16:21:17.639 | ERROR | Task run 'my_first_task-0' - Encountered exception during execution: Traceback (most recent call last): File "/home/robc/repos/prefectshared/testenv/lib/python3.8/site-packages/prefect/engine.py", line 2148, in orchestrate_task_run result = await call.aresult() File "/home/robc/repos/prefectshared/testenv/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 327, in aresult return await asyncio.wrap_future(self.future) File "/home/robc/repos/prefectshared/testenv/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 389, in _run_async result = await coro File "test_rob.py", line 10, in my_first_task raise Exception("oops") Exception: oops 16:21:17.745 | ERROR | Task run 'my_first_task-0' - Finished in state Failed('Task run encountered an exception Exception: oops') 16:21:17.842 | INFO | Flow run 'logical-dinosaur' - Created task run 'my_second_task-0' for task 'my_second_task' 16:21:17.845 | INFO | Flow run 'logical-dinosaur' - Executing 'my_second_task-0' immediately... 16:21:18.022 | DEBUG | prefect.task_runner.sequential - Shutting down task runner... 16:21:18.025 | ERROR | Flow run 'logical-dinosaur' - Finished in state Failed('1/2 states failed.') Traceback (most recent call last): File "test_rob.py", line 31, in asyncio.run(my_first_flow()) File "/usr/lib/python3.8/asyncio/runners.py", line 44, in run return loop.run_until_complete(main) File "/usr/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete return future.result() File "/home/robc/repos/prefectshared/testenv/lib/python3.8/site-packages/prefect/_internal/concurrency/api.py", line 150, in wait_for_call_in_loop_thread return call.result() File "/home/robc/repos/prefectshared/testenv/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 318, in result return self.future.result(timeout=timeout) File "/home/robc/repos/prefectshared/testenv/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 179, in result return self.__get_result() File "/usr/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result raise self._exception File "/home/robc/repos/prefectshared/testenv/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 389, in _run_async result = await coro File "/home/robc/repos/prefectshared/testenv/lib/python3.8/site-packages/prefect/client/utilities.py", line 78, in with_injected_client return await fn(*args, **kwargs) File "/home/robc/repos/prefectshared/testenv/lib/python3.8/site-packages/prefect/engine.py", line 399, in create_then_begin_flow_run return await state.result(fetch=True) File "/home/robc/repos/prefectshared/testenv/lib/python3.8/site-packages/prefect/states.py", line 91, in _get_state_result raise await get_state_exception(state) File "/home/robc/repos/prefectshared/testenv/lib/python3.8/site-packages/prefect/engine.py", line 2148, in orchestrate_task_run result = await call.aresult() File "/home/robc/repos/prefectshared/testenv/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 327, in aresult return await asyncio.wrap_future(self.future) File "/home/robc/repos/prefectshared/testenv/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 389, in _run_async result = await coro File "test_rob.py", line 10, in my_first_task raise Exception("oops") Exception: oops ```
OUTPUT: without wait for used on submit with sequential task runner ```console 16:13:47.884 | DEBUG | prefect.profiles - Using profile 'default' 16:13:49.692 | DEBUG | prefect.client - Using ephemeral application with database at sqlite+aiosqlite:////home/robc/.prefect/prefect.db 16:13:50.211 | INFO | prefect.engine - Created flow run 'quiet-woodlouse' for flow 'my-second-flow' 16:13:50.215 | DEBUG | Flow run 'quiet-woodlouse' - Starting 'SequentialTaskRunner'; submitted tasks will be run sequentially... 16:13:50.218 | DEBUG | prefect.task_runner.sequential - Starting task runner... 16:13:50.229 | DEBUG | prefect.client - Using ephemeral application with database at sqlite+aiosqlite:////home/robc/.prefect/prefect.db 16:13:50.380 | DEBUG | Flow run 'quiet-woodlouse' - Executing flow 'my-second-flow' for flow run 'quiet-woodlouse'... 16:13:50.382 | DEBUG | Flow run 'quiet-woodlouse' - Beginning execution... 16:13:50.502 | INFO | Flow run 'quiet-woodlouse' - Created task run 'my_first_task-0' for task 'my_first_task' 16:13:50.505 | INFO | Flow run 'quiet-woodlouse' - Executing 'my_first_task-0' immediately... 16:13:50.664 | DEBUG | Task run 'my_first_task-0' - Beginning execution... 16:13:50.670 | ERROR | Task run 'my_first_task-0' - Encountered exception during execution: Traceback (most recent call last): File "/home/robc/repos/prefectshared/testenv/lib/python3.8/site-packages/prefect/engine.py", line 2148, in orchestrate_task_run result = await call.aresult() File "/home/robc/repos/prefectshared/testenv/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 327, in aresult return await asyncio.wrap_future(self.future) File "/home/robc/repos/prefectshared/testenv/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 389, in _run_async result = await coro File "test_rob.py", line 10, in my_first_task raise Exception("oops") Exception: oops 16:13:50.779 | ERROR | Task run 'my_first_task-0' - Finished in state Failed('Task run encountered an exception Exception: oops') 16:13:50.872 | INFO | Flow run 'quiet-woodlouse' - Created task run 'my_second_task-0' for task 'my_second_task' 16:13:50.874 | INFO | Flow run 'quiet-woodlouse' - Executing 'my_second_task-0' immediately... 16:13:51.046 | DEBUG | Task run 'my_second_task-0' - Beginning execution... hi 16:13:51.129 | INFO | Task run 'my_second_task-0' - Finished in state Completed() 16:13:51.233 | DEBUG | prefect.task_runner.sequential - Shutting down task runner... 16:13:51.236 | ERROR | Flow run 'quiet-woodlouse' - Finished in state Failed('1/2 states failed.') Traceback (most recent call last): File "test_rob.py", line 32, in asyncio.run(my_second_flow()) File "/usr/lib/python3.8/asyncio/runners.py", line 44, in run return loop.run_until_complete(main) File "/usr/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete return future.result() File "/home/robc/repos/prefectshared/testenv/lib/python3.8/site-packages/prefect/_internal/concurrency/api.py", line 150, in wait_for_call_in_loop_thread return call.result() File "/home/robc/repos/prefectshared/testenv/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 318, in result return self.future.result(timeout=timeout) File "/home/robc/repos/prefectshared/testenv/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 179, in result return self.__get_result() File "/usr/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result raise self._exception File "/home/robc/repos/prefectshared/testenv/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 389, in _run_async result = await coro File "/home/robc/repos/prefectshared/testenv/lib/python3.8/site-packages/prefect/client/utilities.py", line 78, in with_injected_client return await fn(*args, **kwargs) File "/home/robc/repos/prefectshared/testenv/lib/python3.8/site-packages/prefect/engine.py", line 399, in create_then_begin_flow_run return await state.result(fetch=True) File "/home/robc/repos/prefectshared/testenv/lib/python3.8/site-packages/prefect/states.py", line 91, in _get_state_result raise await get_state_exception(state) File "/home/robc/repos/prefectshared/testenv/lib/python3.8/site-packages/prefect/engine.py", line 2148, in orchestrate_task_run result = await call.aresult() File "/home/robc/repos/prefectshared/testenv/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 327, in aresult return await asyncio.wrap_future(self.future) File "/home/robc/repos/prefectshared/testenv/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 389, in _run_async result = await coro File "test_rob.py", line 10, in my_first_task raise Exception("oops") Exception: oops ```

Versions

Version:             2.16.9
API version:         0.8.4
Python version:      3.8.10
Git commit:          083def52
Built:               Thu, Apr 4, 2024 3:11 PM
OS/Arch:             linux/x86_64
Profile:             default
Server type:         ephemeral
Server:
  Database:          sqlite
  SQLite version:    3.31.1

Additional context

No response

serinamarie commented 2 months ago

Hi @robc-kagr, thanks for opening your first issue!

When you use the .submit() method to execute tasks in Prefect, even within a SequentialTaskRunner, each task submission is treated independently, so subsequent tasks will continue to be submitted.

The flow eventually concludes in a failed state. While individual tasks can succeed the overall flow will be in a state of failed.

We can see that demonstrated in another example:

from prefect import flow, task
from prefect.task_runners import SequentialTaskRunner
import random

@task
def stop_at_floor(floor):
    situation = random.choice(["on fire","clear"])
    print(f"elevator stops on {floor} which is {situation}")

    if floor == 13:
        raise ValueError("unlucky floor")

@flow(task_runner=SequentialTaskRunner(),
      name="towering-infernflow",
      )
def glass_tower():
    for floor in range(1, 15):
        stop_at_floor.submit(floor)

glass_tower()

Is there a reason you need to create PrefectFuture objects with submitted tasks, rather than simply calling the tasks?

e.g.

from prefect import flow, task
from prefect.task_runners import SequentialTaskRunner
import random

@task
def stop_at_floor(floor):
    situation = random.choice(["on fire","clear"])
    print(f"elevator stops on {floor} which is {situation}")

    if floor == 13:
        raise ValueError("unlucky floor")

@flow(task_runner=SequentialTaskRunner(),
      name="towering-infernflow",
      )
def glass_tower():
    for floor in range(1, 15):
        stop_at_floor(floor)

glass_tower()

which would raise upon failure the way you're intending.

robc-kagr commented 2 months ago

Hi @serinamarie, thanks for the reply. No, technically there is no need for the PrefectFuture creation in the provided examples, it just seemed to me it was good practice to use submit with the use of a task runner, at least that is my understanding of it from reading the documentation on this area (https://docs.prefect.io/latest/concepts/task-runners/). However, based on what you have provided, it seems like this is not best practice, at least in the context of sequential task runners. Is this a fair understanding?

robc-kagr commented 2 months ago

A follow up on the above, after thinking about it for a while, if you don't use submit it is not even going through the task runner defined. Is that correct? If so, what would be the purpose of using the sequential task runner all together? This was sparked from the highlighted box in the linked documentation earlier image