PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
15.33k stars 1.5k forks source link

The Prefect server show that flow keeps running even after the process exits #14346

Open fyrestone opened 1 week ago

fyrestone commented 1 week ago

First check

Bug summary

The process exit, the server should mark all the flows crash or exit. image

Reproduction

import time

from prefect import flow
import asyncio
import ray

@ray.remote
class Actor:
    async def run_flow(self):
        await self.flow_running()

    @staticmethod
    @flow
    async def flow_running():
        await asyncio.sleep(10)

if __name__ == "__main__":
    actor = Actor.remote()
    for _ in range(10):
        actor.run_flow.remote()
    time.sleep(3)

Error

No response

Versions (prefect version output)

Version:             2.19.6
API version:         0.8.4
Python version:      3.9.6
Git commit:          9d938fe7
Built:               Mon, Jun 24, 2024 10:23 AM
OS/Arch:             darwin/arm64
Profile:             default
Server type:         server

Additional context

No response

zhen0 commented 1 week ago

Thanks for the issue @fyrestone - I was able to reproduce using your MRE and Prefect 3 so I'll add to our backlog for further investigation.

In the meantime, you might want to check out the prefect-ray integration and the the ray task runner which is our recommended way of interacting with Ray from Prefect.

fyrestone commented 6 days ago

Thanks for the issue @fyrestone - I was able to reproduce using your MRE and Prefect 3 so I'll add to our backlog for further investigation.

In the meantime, you might want to check out the prefect-ray integration and the the ray task runner which is our recommended way of interacting with Ray from Prefect.

Thanks. I am using the prefect-ray integration, however, I need the Ray actor to do some logic. I found that the prefect flow may not update the state in the Ray actor.