PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
16.07k stars 1.57k forks source link

Unable to handle 'Cancelling' events #12688

Open dakotahorstman opened 6 months ago

dakotahorstman commented 6 months ago

First check

Bug summary

Good day.

My team and I have spent the past week debugging a file corruption issue created by one of our Prefect 2.0 tasks that took down a production server. We've tracked this issue to an improperly closed file handle that occurred on our standard 'reboot' Sunday.

Part of the 'Reboot Sunday' procedure is to go through and request each of our few long-running flows be canceled from the UI. We've noted internally the, at best, dubious handling Prefect has of handling these cancellation requests, but we haven't had any issues until now.

What we observed is the following:

  1. The tech pulled open the Prefect UI, version 2.16.9, and went to each flow and canceled their running flow run.
  2. He then waited for all flows to stop.
  3. Each flow threw the error: Crash detected! Execution was aborted by a termination signal. including the flow being the issue here as-is "normal". This left the file handle open when Prefect termed the process. This file is in a shared directory and caused a watchdog to term the worker entirely in an attempt to release the file after a period of time.
  4. During this waiting period, he received a notice that one of the primary workers dropped, and a new one was spun up. a. Each worker runs the to_deployment and serve functions. They are not workers like in pools.

Prefect did not enter a 'Cancelling' state as indicated by the docs as the below output shows the flow run state remains at Running() until it is termed. The on_cancellation= hook is also not called until the flow is termed which is too late by that point.

Reproduction

# A very simple MRE. Start this task with Quick Run in Prefect UI and then request a cancel after a few seconds.

import time

from prefect import flow, serve
from prefect.context import get_run_context

@flow(log_prints=True, on_cancellation=[lambda **a: print("cancelling!!!")])
def cycle_reset_scanner() -> None:
    while True:
        context = get_run_context()
        print(context.flow_run.state)
        time.sleep(1)

if __name__ == "__main__":
    crs = cycle_reset_scanner.to_deployment(
        name="Cycle Reset Scanner",
        is_schedule_active=False,
    )
    serve(crs)

Error

02:36:26.926 | INFO    | prefect.flow_runs.runner - Runner 'runner-5099630a-bdda-417b-b8ed-93f5f9208f48' submitting flow run '69b965e9-ee3b-4d43-b8f1-08b5a6253c70'
02:36:27.161 | INFO    | prefect.flow_runs.runner - Opening process...
02:36:27.163 | INFO    | prefect.flow_runs.runner - Completed submission of flow run '69b965e9-ee3b-4d43-b8f1-08b5a6253c70'
<frozen runpy>:128: RuntimeWarning: 'prefect.engine' found in sys.modules after import of package 'prefect', but prior to execution of 'prefect.engine'; this may result in unpredictable behaviour
02:36:31.121 | INFO    | Flow run 'aromatic-hippo' - Downloading flow code from storage at '.'
02:36:31.397 | INFO    | Flow run 'aromatic-hippo' - Running()
02:36:32.398 | INFO    | Flow run 'aromatic-hippo' - Running()
02:36:33.399 | INFO    | Flow run 'aromatic-hippo' - Running()
02:36:34.400 | INFO    | Flow run 'aromatic-hippo' - Running()
02:36:35.401 | INFO    | Flow run 'aromatic-hippo' - Running()
02:36:36.402 | INFO    | Flow run 'aromatic-hippo' - Running()
02:36:37.402 | INFO    | Flow run 'aromatic-hippo' - Running()
02:36:38.403 | INFO    | Flow run 'aromatic-hippo' - Running()
02:36:39.404 | INFO    | Flow run 'aromatic-hippo' - Running()
02:36:40.405 | INFO    | Flow run 'aromatic-hippo' - Running()
02:36:41.405 | INFO    | Flow run 'aromatic-hippo' - Running()
02:36:42.406 | INFO    | Flow run 'aromatic-hippo' - Running()
### Cancel request is sent around here ###
02:36:43.407 | INFO    | Flow run 'aromatic-hippo' - Running()
02:36:44.407 | INFO    | Flow run 'aromatic-hippo' - Running()
02:36:45.408 | INFO    | Flow run 'aromatic-hippo' - Running()
02:36:46.409 | INFO    | Flow run 'aromatic-hippo' - Running()
02:36:47.409 | INFO    | Flow run 'aromatic-hippo' - Running()
02:36:48.410 | INFO    | Flow run 'aromatic-hippo' - Running()
02:36:49.411 | INFO    | Flow run 'aromatic-hippo' - Running()
02:36:50.411 | INFO    | Flow run 'aromatic-hippo' - Running()
02:36:51.412 | INFO    | Flow run 'aromatic-hippo' - Running()
02:36:52.413 | INFO    | Flow run 'aromatic-hippo' - Running()
02:36:53.414 | INFO    | Flow run 'aromatic-hippo' - Running()
02:36:54.415 | INFO    | Flow run 'aromatic-hippo' - Running()
02:36:55.347 | INFO    | prefect.runner - Found 1 flow runs awaiting cancellation.
02:36:55.348 | ERROR   | Flow run 'aromatic-hippo' - Crash detected! Execution was aborted by a termination signal.
02:36:55.373 | INFO    | prefect.flow_runs.runner - Process for flow run 'aromatic-hippo' exited with status code: -15; This indicates that the process exited due to a SIGTERM signal. Typically, this is caused by manual cancellation.
02:36:58.355 | INFO    | Flow run 'aromatic-hippo' - Downloading flow code from storage at '.'
02:36:58.359 | INFO    | Flow run 'aromatic-hippo' - Running hook '<lambda>' in response to entering state 'Cancelling'
cancelling!!!
02:36:58.360 | INFO    | Flow run 'aromatic-hippo' - Hook '<lambda>' finished running successfully
02:36:58.389 | INFO    | prefect.flow_runs.runner - Cancelled flow run 'aromatic-hippo'!

Versions

# The server:

Version:             2.16.9
API version:         0.8.4
Python version:      3.11.9
Git commit:          083def52
Built:               Thu, Apr 4, 2024 3:11 PM
OS/Arch:             linux/x86_64
Profile:             default
Server type:         server

# The worker:

Version:             2.17.1
API version:         0.8.4
Python version:      3.11.7
Git commit:          d6bdb075
Built:               Thu, Apr 11, 2024 6:58 PM
OS/Arch:             linux/x86_64
Profile:             default
Server type:         server

Additional context

No response

dakotahorstman commented 6 months ago

I can confirm the server does put the flow run in a canceling state immediately upon the request from the UI on its end.

Repeatedly querying the server with the below code shows the above MRE enters the canceling state immediately upon being requested from the UI. However, this is not reflected in the run context, nor is the on_cancellation= hook called until the flow run is termed. If this is intended, then the documentation ought to be revised as it currently states: 'on_cancellation=' An optional list of callables to run when the flow enters a cancelling state.

import asyncio
import time
from pprint import pformat, pprint

from prefect import get_client
from prefect.server.schemas.filters import (
    FlowRunFilter,
    FlowRunFilterState,
    FlowRunFilterStateType,
)
from prefect.server.schemas.states import StateType

async def do():
    async with get_client() as client:
        while not time.sleep(1):
            flow_runs = await client.read_flow_runs(
                flow_run_filter=FlowRunFilter(
                    state=FlowRunFilterState(
                        type=FlowRunFilterStateType(any_=[StateType.CANCELLING]),
                    ),
                )
            )
            pprint(pformat([vars(fr) for fr in flow_runs], indent=4, width=1), indent=4)

if __name__ == "__main__":
    asyncio.run(do())

To reproduce:

  1. Start a flow with the above MRE.
  2. Let it run for a few seconds, then spin up this state-check poll. You should see an empty list printed on the console every second.
  3. Cancel the MRE flow from the UI, and its info will be immediately and repeatedly printed to the poll console.
  4. The MRE will then be termed by prefect, and the console prints will return to an empty list.
  5. During this process observe the MRE's print(context.flow_run.state) will continue to print Running() until it is termed.
serinamarie commented 6 months ago

Hi @dakotahorstman, thanks for contributing your first issue with Prefect!

Prefect did not enter a 'Cancelling' state as indicated by the docs as the below output shows the flow run state remains at Running() until it is termed. The on_cancellation= hook is also not called until the flow is termed which is too late by that point.

When the flow run is no longer in a Running state and moves to a Cancelling state, it won't continue to print stmts as that would only happen while the flow is running. The on_cancellation hook is triggered by the flow run entering a Cancelling state, which is why you see that INFO-level log prior to the flow entering its terminal state of Cancelled.


02:36:58.359 | INFO    | Flow run 'aromatic-hippo' - Running hook '<lambda>' in response to entering state 'Cancelling'
cancelling!!!
02:36:58.360 | INFO    | Flow run 'aromatic-hippo' - Hook '<lambda>' finished running successfully
02:36:58.389 | INFO    | prefect.flow_runs.runner - Cancelled flow run 'aromatic-hippo'!

I see the flow run entering a Cancelling state. Then the flow run state change hook is called and the expected print statement appears. Then we see that the hook completed. Then the flow run is moved to a Cancelled state. Can you elaborate when you say "nor is the on_cancellation= hook called until the flow run is termed"?

dakotahorstman commented 6 months ago

Hi Serina,

I can elaborate, sure.

The flow is an infinite loop in the MRE I've provided here. This mimics our tasks.

Through the UI and the state poller I whipped up, we can see that the server's tracking of the flow run correctly places the run into a canceling state when requested. It is immediate, as would be expected.

However, this state change is never communicated to the worker or in a way that does not require polling the server (which, in our case, would be a major hindrance to performance). I would expect get_run_context to reflect this state change ASAP so I could handle the request, but it does not. Additionally, the on_cancellation hook is called after the flow has been termed (or at least my code is termed). It is not called before. Therefore, it is not possible for us to gracefully shut down the flow before termination.

For example, we could have a sentinel flag raised in the hook caught by our flow code and shut down properly.

According to the logs, which align with my observation, the SIGTERM is sent before the on_cancellation hook is called. This is likely due to Prefect needing to transition through CANCELLING from RUNNING to reach CANCELED (I haven't checked the source to confirm, though). The ERROR print of the SIGTERM occurred 3 seconds before the hook was called--it should be printed well after the hook is run.

Here's how the state changes differ on the server and the worker according to my observations:

server RUNNING (times N) -> cancel requested from UI -> CANCELLING (for timeout) -> CANCELED

worker RUNNING (times N) -> cancel requested from UI (but never communicated) -> RUNNING (times N) -> proc termed -> CANCELLING (one cycle) -> CANCELED

This is what the logs should be: (Ideally, we'd gracefully exit the flow before it is termed as soon as the run context returned a Cancelling state)

02:36:26.926 | INFO    | prefect.flow_runs.runner - Runner 'runner-5099630a-bdda-417b-b8ed-93f5f9208f48' submitting flow run '69b965e9-ee3b-4d43-b8f1-08b5a6253c70'
02:36:27.161 | INFO    | prefect.flow_runs.runner - Opening process...
02:36:27.163 | INFO    | prefect.flow_runs.runner - Completed submission of flow run '69b965e9-ee3b-4d43-b8f1-08b5a6253c70'
<frozen runpy>:128: RuntimeWarning: 'prefect.engine' found in sys.modules after import of package 'prefect', but prior to execution of 'prefect.engine'; this may result in unpredictable behaviour
02:36:31.121 | INFO    | Flow run 'aromatic-hippo' - Downloading flow code from storage at '.'
02:36:31.397 | INFO    | Flow run 'aromatic-hippo' - Running()
02:36:32.398 | INFO    | Flow run 'aromatic-hippo' - Running()
02:36:33.399 | INFO    | Flow run 'aromatic-hippo' - Running()
02:36:34.400 | INFO    | Flow run 'aromatic-hippo' - Running()
02:36:35.401 | INFO    | Flow run 'aromatic-hippo' - Running()
02:36:36.402 | INFO    | Flow run 'aromatic-hippo' - Running()
02:36:37.402 | INFO    | Flow run 'aromatic-hippo' - Running()
02:36:38.403 | INFO    | Flow run 'aromatic-hippo' - Running()
02:36:39.404 | INFO    | Flow run 'aromatic-hippo' - Running()
02:36:40.405 | INFO    | Flow run 'aromatic-hippo' - Running()
02:36:41.405 | INFO    | Flow run 'aromatic-hippo' - Running()
02:36:42.406 | INFO    | Flow run 'aromatic-hippo' - Running()
### Cancel request is sent around here ###
02:36:42.406 | INFO    | Flow run 'aromatic-hippo' - Downloading flow code from storage at '.'
02:36:42.406 | INFO    | Flow run 'aromatic-hippo' - Running hook '<lambda>' in response to entering state 'Cancelling'
cancelling!!!
02:36:42.407 | INFO    | Flow run 'aromatic-hippo' - Hook '<lambda>' finished running successfully
02:36:43.407 | INFO    | Flow run 'aromatic-hippo' - Cancelling()
02:36:44.407 | INFO    | Flow run 'aromatic-hippo' - Cancelling()
02:36:45.408 | INFO    | Flow run 'aromatic-hippo' - Cancelling()
02:36:46.409 | INFO    | Flow run 'aromatic-hippo' - Cancelling()
02:36:47.409 | INFO    | Flow run 'aromatic-hippo' - Cancelling()
02:36:48.410 | INFO    | Flow run 'aromatic-hippo' - Cancelling()
02:36:49.411 | INFO    | Flow run 'aromatic-hippo' - Cancelling()
02:36:50.411 | INFO    | Flow run 'aromatic-hippo' - Cancelling()
02:36:51.412 | INFO    | Flow run 'aromatic-hippo' - Cancelling()
02:36:52.413 | INFO    | Flow run 'aromatic-hippo' - Cancelling()
02:36:53.414 | INFO    | Flow run 'aromatic-hippo' - Cancelling()
02:36:54.415 | INFO    | Flow run 'aromatic-hippo' - Cancelling()
02:36:55.347 | INFO    | prefect.runner - Found 1 flow runs awaiting cancellation.
02:36:55.348 | ERROR   | Flow run 'aromatic-hippo' - Crash detected! Execution was aborted by a termination signal.
02:36:55.373 | INFO    | prefect.flow_runs.runner - Process for flow run 'aromatic-hippo' exited with status code: -15; This indicates that the process exited due to a SIGTERM signal. Typically, this is caused by manual cancellation.
02:36:58.389 | INFO    | prefect.flow_runs.runner - Cancelled flow run 'aromatic-hippo'!
dakotahorstman commented 5 months ago

Hi @serinamarie

Just checking in on this ticket. Let me know if there are any questions or concerns I can answer for you :)

mthanded commented 4 months ago

I am seeing this behavior as well for process workers running inside of docker. I have callbacks running on the all the on_state hooks.

In my logs I see the workflows receiving a signal prior to the on_cancellation hook firing. The server sees the workflows as being cancelled and if you reach out to the server via the client or look at the UI you will see cancelled. However, the cancellation hook does not run.

CoffeeMark2 commented 4 months ago

I am seeing this behavior as well for process workers running inside of docker. I have callbacks running on the all the on_state hooks.

In my logs I see the workflows receiving a signal prior to the on_cancellation hook firing. The server sees the workflows as being cancelled and if you reach out to the server via the client or look at the UI you will see cancelled. However, the cancellation hook does not run.

I encountered the same issue

from prefect import flow, get_run_logger

from prefect.deployments import Deployment

import time

def hook_cancellantion(flow, flow_run, state):
    logger = get_run_logger()
    logger.info("here is hook cancellation")

@flow(
    name="cancellation_hook_test",
    on_cancellation=[hook_cancellantion],
)
def cancellation_hook_test():
    logger = get_run_logger()
    for i in range(10):
        logger.info(f"flow is running {i}")
        time.sleep(10)

deployment = Deployment.build_from_flow(
    flow=cancellation_hook_test,
    name="weekday",
    tags=['data-engineering'],
    work_queue_name="default",
)

deployment.apply()
# prefect version

Version:             2.16.4
API version:         0.8.4
Python version:      3.11.8
Git commit:          e3e7df9d
Built:               Thu, Mar 14, 2024 5:11 PM
OS/Arch:             win32/AMD64
Profile:             test
Server type:         ephemeral
Server:
  Database:          sqlite
  SQLite version:    3.45.2