PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
17.6k stars 1.65k forks source link

Race condition causes sub-deployments to be stuck in Canceling state #15399

Open jameswu1991 opened 2 months ago

jameswu1991 commented 2 months ago

Bug summary

Summary

When a parent deployment calls a child deployment, and the parent deployment is cancelled, the child deployment is stuck in Canceling state indefinitely. This behavior is non-deterministic due to a race condition. I've been able to reproduce it with only 20-40% probability.

image

Minimal reproducible example

app.py ``` @flow async def parent_flow(): print('starting parent') task = await run_deployment( name='child-flow/serve-deployment', parameters={}, timeout=0, ) print('waiting for child') await wait_for_flow_run(task.id) print('ending parent') @flow async def child_flow(): print('starting child') for i in range(30): print('child iteration', i) await asyncio.sleep(1) print('ending child') async def run_parent_deployment(): print('Running deployment...') task = await run_deployment( name='parent-flow/serve-deployment', parameters={}, timeout=0, ) print('Waiting for task...') if __name__ == "__main__": asyncio.run(run_parent_deployment()) ````

Both parent and child must be deployments. Parent must be a deployment to enable the cancel button. Child must be a deployment to trigger the cancelling state documented below.

serve.py ``` from prefect import serve from app import child_flow, parent_flow if __name__ == "__main__": flows = [ parent_flow, child_flow, ] flow_deployments = [flow.to_deployment(name='serve-deployment') for flow in flows] serve(*flow_deployments, limit=16, query_seconds=1) ```

in Terminal:

python serve.py

and then, separately

python app.py

Finally, cancel the parent_flow deployment that was just kicked off, after it's started the child flow.

stdout ``` 17:35:50.967 | INFO | Flow run 'asparagus-hound' - Downloading flow code from storage at '.' 17:35:50.971 | DEBUG | Flow run 'asparagus-hound' - Importing flow code from 'app.py:child_flow' 17:35:50.988 | DEBUG | prefect.client - Connecting to API at http://127.0.0.1:4200/api/ 2024-09-16 17:35:51,164 - 64825 - 6252244992 - prefect.task_runner.threadpool - DEBUG - Starting task runner starting child child iteration 0 Process 64810 killed successfully. Marking flow run c18a6c98-8b1c-4607-9e86-4e7fb335c4aa as cancelled. Marking flow run c18a6c98-8b1c-4607-9e86-4e7fb335c4aa as cancelled into state Cancelled('Flow run was cancelled successfully.') Emittin flow run cancelled event for flow run c18a6c98-8b1c-4607-9e86-4e7fb335c4aa with flow parent-flow and deployment serve-deployment Cancelled flow run c18a6c98-8b1c-4607-9e86-4e7fb335c4aa child iteration 1 child iteration 2 child iteration 3 child iteration 4 child iteration 5 child iteration 6 child iteration 7 child iteration 8 child iteration 9 Found 1 flow runs awaiting cancellation. Attemping to cancel flow run dd7e1ead-af8c-44e1-8b00-a84fbb267a07 Killing process 64825 Process 64825 killed successfully. Marking flow run dd7e1ead-af8c-44e1-8b00-a84fbb267a07 as cancelled. Marking flow run dd7e1ead-af8c-44e1-8b00-a84fbb267a07 as cancelled into state Cancelled('Flow run was cancelled successfully.') Emittin flow run cancelled event for flow run dd7e1ead-af8c-44e1-8b00-a84fbb267a07 with flow child-flow and deployment serve-deployment Cancelled flow run dd7e1ead-af8c-44e1-8b00-a84fbb267a07 ```

The parent_flow and the child_flow must have already started but not yet finished when the parent task is canceled. Again, I've only been able to reproduce this about 20-40% of the time.

Investigation Findings

I spent about 4 hours debugging this issue, and narrowed it down to a few tricky state transitions. This is the state transition events for the child_flow.

image

As can be seen from the screenshot, after the child flow has successfully transitioned from CANCELLING to CANCELLED, about half a second later, the state was again reverted back from CANCELLED back to CANCELLING.

Based on the state transition messages:

I believe there is a race condition whereby, after the state has been read (as CANCELLING), and before the state has been updated (to CANCELLING), client-side has already updated the state to CANCELLED.

Proposed fix

I believe that this issue can be fixed by adding a flow_policy to the call.

from prefect.server.orchestration.core_policy import CoreTaskPolicy
...
await models.flow_runs.set_flow_run_state(
    session=session,
    flow_run_id=flow_run.id,
    state=state,
    flow_policy=CoreTaskPolicy,
)

This should prevent the erroneous state transition via the rule EnforceCancellingToCancelledTransition.

Since I'm a newbie to prefect, and this issue seems to be deep in prefect API server code, I'm hesitant to make a PR. I also don't have an easy way of testing this issue, due to my running the API via docker container on local without being able to modify the source code easily.

Furthermore, I believe this policy will not only prevent the transition from CANCELLING -> CANCELLED, but also from CANCELLING -> CANCELLING.

Version info (prefect version output)

$ prefect version
Version:             3.0.2
API version:         0.8.4
Python version:      3.12.4
Git commit:          c846de02
Built:               Fri, Sep 13, 2024 10:48 AM
OS/Arch:             darwin/arm64
Profile:             default
Server type:         server
Pydantic version:    2.8.2
Integrations:
  prefect-dask:      0.3.0rc2

Additional context

How I originally found this issue

I'm posting this information here for SEO for anyone else observing these symptoms:

We first noticed issues when we got a lot of MaxDepthExceeded exceptions in our runtime logs. My understanding of this code is that I have a large number of events that are all chained together (via the event.follows table column), and all of them are processed together using recursion, and the recursion depth of 20 will not handle more than 20 events.

stack trace ``` 18:23:16.905 | ERROR | prefect.server.events.ordering - Event 'prefect.flow-run.Cancelling' (d0c112b8-8f01-47fc-ac78-e65f4b1ff926) for 'prefect.flow-run.01afb912-3057-43d4-9200-0225e8d40198' has exceeded the maximum recursion depth of 20 NoneType: None 18:23:16.906 | ERROR | prefect.server.events.triggers - Error running periodic evaluation Traceback (most recent call last): File "/usr/local/lib/python3.12/site-packages/prefect/server/events/triggers.py", line 619, in evaluate_periodically await periodic_evaluation(pendulum.now("UTC")) File "/usr/local/lib/python3.12/site-packages/prefect/server/events/triggers.py", line 601, in periodic_evaluation await reactive_evaluation(event) File "/usr/local/lib/python3.12/site-packages/prefect/server/events/triggers.py", line 483, in reactive_evaluation async with AsyncExitStack() as stack: File "/usr/local/lib/python3.12/contextlib.py", line 754, in __aexit__ raise exc_details[1] File "/usr/local/lib/python3.12/contextlib.py", line 737, in __aexit__ cb_suppress = await cb(*exc_details) ^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.12/contextlib.py", line 217, in __aexit__ await anext(self.gen) ...repeated several times File "/usr/local/lib/python3.12/site-packages/prefect/server/events/ordering.py", line 201, in preceding_event_confirmed await handler(waiter, depth + 1) File "/usr/local/lib/python3.12/site-packages/prefect/server/events/triggers.py", line 485, in reactive_evaluation await stack.enter_async_context( File "/usr/local/lib/python3.12/contextlib.py", line 659, in enter_async_context result = await _enter(cm) ^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.12/contextlib.py", line 210, in __aenter__ return await anext(self.gen) ^^^^^^^^^^^^^^^^^^^^^ ... File "/usr/local/lib/python3.12/site-packages/prefect/server/events/ordering.py", line 176, in preceding_event_confirmed raise MaxDepthExceeded(event) prefect.server.events.ordering.MaxDepthExceeded: occurred=DateTime(2024, 9, 18, 18, 13, 35, 114994, tzinfo=Timezone('UTC')) event='prefect.flow-run.Cancelling' resource=Resource(root={'prefect.state-name': 'Cancelling', 'prefect.state-type': 'CANCELLING' ```

From there, I checked the database and noticed that one of my child flow deployments had over 191,000 state transition events on it. It's been broken for about 6 days and spewing out event table rows non-stop. (See row count on top left corner of screenshot.)

image

Ironically, the huge number of rows in this table is likely what caused the issue to be more easily reproducible, as I suspect that the race condition to be more pronounced when database queries are slow.

Possibly related

jameswu1991 commented 2 months ago

After some additional testing, I realized that my API server (in docker) was still on 3.0.0rc16. After upgrading to 3.0.2, the issue was no longer reproducible. After checking the patch notes, I think the most likely candidate for this is https://github.com/PrefectHQ/prefect/pull/15286 , which significantly reduced the amount of time it took to run the read query. This likely reduced the time gap between between reading as CANCELLING and writing as CANCELLING, and drastically reduced the probability for the race condition.

I still believe it's valuable to prevent the unnecessary state transition from CANCELLING to CANCELLING in cancellation_cleanup.py, to minimize unnecessary rows from accumulating in the event table.