PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
15.95k stars 1.57k forks source link

Better cancellation of submitted task runs in interactive flow runs #14383

Closed zzstoatzz closed 2 months ago

zzstoatzz commented 3 months ago

in the case of submitting tasks with the ThreadPoolTaskRunner, this PR uses threading events to track whether cancellation has occurred, allowing the engine to tell the task runner to cancel its futures

on main today, submitted task runs are left in Running when we ctrl-c a flow run interactively, instead of Crashed states (like we get in most cases with this PR)

note

there are times (especially after all task runs have been submitted/created) that we still do not successfully crash all tasks, and we require more than one sigterm, so this does not totally reconsile https://github.com/PrefectHQ/prefect/issues/14219

try the following example

import os
import time

from prefect import flow, task

@task
async def my_task():
    time.sleep(2000)
    return

@flow(log_prints=True)
def main():
    print(os.getpid())
    futures = [my_task.submit() for i in range(10)]
    for future in futures:
        future.result()

if __name__ == "__main__":
    main()