PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
17.37k stars 1.64k forks source link

Flow still in "Running" state when agent is down #7239

Open matlh0 opened 2 years ago

matlh0 commented 2 years ago

First check

Bug summary

When we start simple flow with time.sleep and stop the agent's process, the flow still in running state

example below, I brake the "prefect agent start" process with a CTRL+C

13:20:50.284 | INFO | prefect.agent - Submitting flow run '09dd7834-4ae3-4462-9f1d-ecd53ad37c11' 13:20:50.311 | INFO | prefect.infrastructure.process - Opening process 'crouching-seriema'... 13:20:50.314 | INFO | prefect.agent - Completed submission of flow run '09dd7834-4ae3-4462-9f1d-ecd53ad37c11' 13:20:51.382 | INFO | Flow run 'crouching-seriema' - Created task run 'task_start-1870bbf8-0' for task 'task_start' 13:20:51.382 | INFO | Flow run 'crouching-seriema' - Executing 'task_start-1870bbf8-0' immediately... 13:20:51.399 | INFO | Task run 'task_start-1870bbf8-0' - Starting flow 13:20:51.409 | INFO | Task run 'task_start-1870bbf8-0' - Finished in state Completed() ^C Aborted.

Reproduction

import prefect
from prefect import flow, task, get_run_logger
import time

@task
def task_start():
    logger = get_run_logger()
    logger.info("Starting flow")

def task_end():
    logger = get_run_logger()
    logger.info("End of flow")
@flow()
def sleeping_flow():
    task_start()
    time.sleep(120)
    task_end()

Error

No response

Versions

Version:             2.5.0
API version:         0.8.2
Python version:      3.8.10
Git commit:          eac37918
Built:               Thu, Oct 6, 2022 12:41 PM
OS/Arch:             linux/x86_64
Profile:             default
Server type:         hosted

Additional context

OS : Ubuntu 20.04

Agent and server are runs on the same server

bunchesofdonald commented 2 years ago

Thanks for the report @matlh0! I agree we could do better here, we should probably abort any running flows / tasks so that they have a more accurate end state reflected. We should also consider a 'warm shutdown' where we wait until the flow/tasks end before exiting.

zanieb commented 2 years ago

This behavior is intentional. All agents can be stopped/started without affecting their flow runs. I'm not sure I agree that this agent should kill its runs on shutdown.

matlh0 commented 2 years ago

This behavior is intentional. All agents can be stopped/started without affecting their flow runs. I'm not sure I agree that this agent should kill its runs on shutdown.

Ok, but if I restart the agent, the flow does not continue and still in Running state

Thanks for the report @matlh0! I agree we could do better here, we should probably abort any running flows / tasks so that they have a more accurate end state reflected. We should also consider a 'warm shutdown' where we wait until the flow/tasks end before exiting.

the 'warm shutdown' could be good in case of version upgrade and restart of the agent. We don't use docker or kubernetes, agents run in local mode

zanieb commented 2 years ago

Ok, but if I restart the agent, the flow does not continue and still in Running state

It's possible when the agent is killed the flow crashes but does not report its state. If that is the case, we can investigate a fix for it. However, the agent is not required for the flow to run. Once the agent creates the flow run process, the presence of an agent should not affect the flow. It does not require the agent to be started again to resume running.

matlh0 commented 2 years ago

Ok, but if I restart the agent, the flow does not continue and still in Running state

It's possible when the agent is killed the flow crashes but does not report its state. If that is the case, we can investigate a fix for it. However, the agent is not required for the flow to run. Once the agent creates the flow run process, the presence of an agent should not affect the flow. It does not require the agent to be started again to resume running.

After a little investigation, before running flow, i show this process :

root 1271628 1147962 8 15:05 pts/6 00:00:00 /usr/bin/python3.8 /usr/bin/prefect agent start -q sleep

When i start the flow, another process appears

root 1271628 1147962 0 15:05 pts/6 00:00:01 /usr/bin/python3.8 /usr/bin/prefect agent start -q sleep root 1271701 1271628 99 15:08 pts/6 00:00:00 /usr/bin/python3.8 -m prefect.engine Master process: 1271628 (prefect agent) Child process: 1271701

if I kill 1271628, child process is killed too

After that, if I start an agent, the flow never resume

zanieb commented 2 years ago

Great thanks! It's weird that the SIGINT sent to the parent process isn't forwarded to the child process in a way that lets our interrupt detection report the flow run as crashed. We can investigate that.

Separately, we should probably be launching processes for flow runs from agents as daemons so if the parent exits the child does not get killed. This would probably be a good setting to include on the Process infrastructure directly.

mahiki commented 2 years ago

I've seen this same issue, just commenting to indicate my support for the issue.

thuyt001 commented 1 year ago

I just updated prefect to the latest version 2.6.9. After restarting Orion and the agents after update, I'm still seeing the old flow runs with Running state, which have been stuck like that for the past 5 hours. I don't know how to confirm that they're really gone and that the UI is just out of synch.

davidssand commented 1 year ago

I’ve got the same problem, but my deployment is a bit different: currently I’m running many agents, each one running a docker daemon. This decision was made so that the resources used by the flow runs can be easily monitored (I just need to look at how much resources my agent uses). My deployments use the Docker infrastructure, so the flow runs are run by the agents’ docker daemons inside docker containers.

So in this case, if an agent goes down, no one is working on the flow runs anymore. The flow runs in the RUNNING or PENDING state remain like that forever.

I know my case is a really specific one, but looks like this problem happens in other cases also, as can be seen above.

My first approach to try to find a solution for this was to set a timeout for the flows. But I realized later that the agent is the one who checks if the timeout was reached or not, so if the agent goes down the flow runs would never timeout.

Here is a suggestion for a generic solution: the flow run should send an “is alive” signal periodically to the server, allowing the server to check which flow runs are RUNNING and mark the ones that are not anymore as CRASHED. What do you think?

zanieb commented 1 year ago

the flow run should send an “is alive” signal periodically to the server, allowing the server to check which flow runs are RUNNING and mark the ones that are not anymore as CRASHED. What do you think?

This is a "heartbeat" implementation and we did this in Prefect 1 as the sole mechanism for determining if a run was crashed. However, there were frequent false positives during CPU or memory intensive workloads (and other things) resulting in flows being marked as CRASHED even when they were still happily RUNNING. We have avoided this pattern thusfar for that reason.

davidssand commented 1 year ago

What if we implemented the heartbeat and created another state for this case, so that we could know which flow runs are not reporting to the server? It could be called UNTRACKED and could be reached only by flow runs in the RUNNING state when the heartbeat is not received by the server in time. If a flow run in the UNTRACKED state manages to send the heartbeat, it goes back to the RUNNING state.

Another improvement after that would be to allow user to configure the server to crash flow runs in the UNTRACKED state.

alexprice12 commented 1 year ago

What's the suggested here? I understand this is a hard challenge, but I'd hope for at least a lever I can pull to get my job to finish? I can't even "cancel" the build it appears (at least in the GUI).

Agents will sometimes die unreliably, I'd like my task to eventually get finished :)

kevin868 commented 1 year ago

This can be a bad issue if you have concurrency limits on your work queues / pools. ex - jobs stuck in RUNNING (fall out of the observability window of the last week). Now you have some hidden jobs that are not truly running, and eating up your concurrency. Eventually no new jobs can be read out of the queue because the concurrency is taken by all RUNNING jobs which are actually gone / Crashed.

Our work around has been to script the API:

  1. Find all flow runs, with a stateFilter for "Running".
  2. Set those states to "Crashed" if the flow run is older than some threshold (1 day).
  3. If desired, you can also restart those flow runs (we usually just kill, then trigger a new deployment that the flow run comes from).

Caution: rough!!

FROM_STATE = "Running"
TO_STATE = schemas.states.Crashed()

state_filter = {}
state_filter["name"] = {"any_": [FROM_STATE]}
# expected_start_threshold = pendulum.datetime(2022, 11, 23, 0, 0)
expected_start_threshold = None
flow_runs = asyncio.run(
    get_client().read_flow_runs(
        # flow_filter=FlowFilter(name={"any_": flow_name}) if flow_name else None,
        flow_run_filter=FlowRunFilter(
            state=state_filter,
            expected_start_time=schemas.filters.FlowRunFilterExpectedStartTime(
                after_=expected_start_threshold
            )
            if expected_start_threshold
            else None,
        ),
        # limit=limit,
        sort=FlowRunSort.EXPECTED_START_TIME_DESC,
    )
)

for flow_run in flow_runs:
    if MODIFY_STATE:
        details = asyncio.run(
            get_client().set_flow_run_state(
                flow_run.id,
                # state=schemas.states.Scheduled(flow_run.expected_start_time.add(hours=2)),
                state=TO_STATE,
                # force=True,
            )
        )

Really not an ideal experience. I believe it's a tough ask of the current model because afaik, the Prefect Server isn't aware of who the agents are (which worker has pulled out a flow run from the queue). Thus, it's not easy to impose a heartbeat, or alive-check on each of these flow runs. It seems reasonable to me to impose a 2-day timeout though.

argibbs commented 1 year ago

This is a "heartbeat" implementation and we did this in Prefect 1 as the sole mechanism for determining if a run was crashed.

@zanieb I've just started using Prefect (2.10+), and while it's really great at scheduling etc, the lack of a server-driven "this task is not responding, we're going to mark it as dead" is a real concern, made worse because I use concurrency limits. Consider:

  1. I spin up a task. It consumes the concurrency slot.
  2. It crashes. The server considers the old flow/task as still running.
  3. I relaunch the task ... and the new version then blocks indefinitely. The concurrency slot is still taken by its previous incarnation..

I'd consider this to be pretty standard use case for a production system, but there are at least three separate issues open in prefect right now around this problem.

  1. This one (effectively, a lack of heart beating)
  2. Problems freeing concurrency slots: #9895 (which itself references two earlier issues #5995 and #7753)
  3. A lack of clarity about being blocked by concurrency limits: #9500 (which hopefully will include the blocked state requested by #9243)

There seem to me to be a couple of easy wins here:

  1. If the task/flow has a timeout set, the server should be aware of it, and kill the flow after that point. This would negate the need for @kevin868's script - he just sets timeout_seconds=timedelta(days=2).total_seconds() and the server will kill it for him.
  2. Add back heartbeat support. You clearly have been burned by false positives in v1, so maybe make it optional, off by default. Or make it mandatory, but optional for the server to kill the flow when it's unresponsive.

Prefect is hella impressive, and since finding it I've been singing its praises to everyone, but those praises are currently tempered by a "but you have to be constantly monitoring it for stuck jobs - I'd be careful before putting it in production" caveat.

chrisgoddard commented 1 year ago

Just to have to add a "me-too" on this thread - this has been one of the challenges of migrating to v2 from v1 -- I feel like v2 is currently less reliable in production than was my experience with v1.

zanieb commented 1 year ago

Hey @argibbs I'm not working on this project anymore but there's definitely some people thinking about the right way to make this better! cc @WillRaphaelson / @billpalombi

shawnlong636 commented 10 months ago

Hey @WillRaphaelson @billpalombi , I just want to add a +1 here and report that I'm having the same issue using prefect agents in kubernetes. From a user perspective, it's frustrating that I have to keep going in, check for running flows more than an hour and mark crashed.

Even as a temporary work-around, it would be great to be able to set a max-runtime parameter so that if a job is in a running state for the designated period, it'll automatically show an error state.

ytl-liva commented 5 months ago

+1

davidssand commented 4 months ago

Can we lift the priority of this issue?

davidssand commented 4 months ago

Here is another workaround to help with the problem, to complement @kevin868's idea.

Have 1 agent working for each work queue. When you start your agent, make it crash all running flow runs of its work queue, alerting you something went wrong. To find which flow runs belong to the given work queue you can use tags.

Of course, this implementation does not work when you need many agents working in 1 work queue, since the restart of an agent would crash "active" running flow runs.

adrianschneider94 commented 3 months ago

+1