PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
15.92k stars 1.56k forks source link

Flows with subflows incorrectly reporting state as Failed #9376

Open paulinjo opened 1 year ago

paulinjo commented 1 year ago

First check

Bug summary

A subset of our Flows which make use of subflows are incorrectly reporting their terminal state as Failed, even when all subflows and tasks are completed.

These flows are triggered via an separate flow using the orchestrator pattern, and this orchestrator flow behaves as though the terminal state is Completed.

Logs from the agent running on EKS show the state initially reported as Success before switching to Failed.

Reproduction

N/A

Error

20:19:36.771 | INFO    | Task run 'extract_active_entities-775' - Finished in state Cached(type=COMPLETED)
20:19:36.821 | INFO    | Task run 'extract_active_entities-760' - Finished in state Cached(type=COMPLETED)
20:19:36.930 | INFO    | Flow run 'arrogant-goldfish' - Finished in state Completed('All states completed.')
20:19:36.931 | ERROR   | Flow run 'gainful-sturgeon' - Encountered exception during execution:
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/prefect/engine.py", line 674, in orchestrate_flow_run
    result = await flow_call.aresult()
  File "/usr/local/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 181, in aresult
    return await asyncio.wrap_future(self.future)
  File "/usr/local/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 194, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
  File "hourly_path_activity/flow/parent.py", line 20, in hourly_path_activity_orchestrator
    hourly_path_activity.with_options(name=f"Hourly Path Activity [{dataset}]")(
  File "/usr/local/lib/python3.9/site-packages/prefect/flows.py", line 468, in __call__
    return enter_flow_run_engine_from_flow_call(
  File "/usr/local/lib/python3.9/site-packages/prefect/engine.py", line 182, in enter_flow_run_engine_from_flow_call
    retval = from_sync.wait_for_call_in_loop_thread(
  File "/usr/local/lib/python3.9/site-packages/prefect/_internal/concurrency/api.py", line 137, in wait_for_call_in_loop_thread
    return call.result()
  File "/usr/local/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 173, in result
    return self.future.result(timeout=timeout)
  File "/usr/local/lib/python3.9/concurrent/futures/_base.py", line 439, in result
    return self.__get_result()
  File "/usr/local/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
    raise self._exception
  File "/usr/local/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
    result = await coro
  File "/usr/local/lib/python3.9/site-packages/prefect/client/utilities.py", line 40, in with_injected_client
    return await fn(*args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/prefect/engine.py", line 571, in create_and_begin_subflow_run
    return await terminal_state.result(fetch=True)
  File "/usr/local/lib/python3.9/site-packages/prefect/states.py", line 98, in _get_state_result
    result = await state.data.get()
  File "/usr/local/lib/python3.9/site-packages/prefect/results.py", line 394, in get
    raise MissingResult("The result was not persisted and is no longer available.")
prefect.exceptions.MissingResult: The result was not persisted and is no longer available.
20:19:36.976 | INFO    | Task run 'write_parquet-712' - Finished in state Completed()
20:19:36.998 | INFO    | Task run 'write_parquet-726' - Finished in state Completed()
20:19:37.079 | ERROR   | Flow run 'gainful-sturgeon' - Finished in state Failed('Flow run encountered an exception. MissingResult: The result was not persisted and is no longer available.\n')

Versions

Agent version 2.10.6 running on EKS 1.25

Additional context

No response

zanieb commented 1 year ago

@paulinjo we need a MRE in order to investigate this

paulinjo commented 1 year ago

@madkinsz This is happening in our production environment from previously well behaved flows beginning on roughly 04/26.

Coming up with an MRE is going to be a big challenge considering the number of moving pieces, but we were told byu support to open a Github issue for further investigation.

zanieb commented 1 year ago

@paulinjo Did you change Prefect versions..? Can you share the actual full output of prefect version from a production environment?

paulinjo commented 1 year ago

Agent: 2.10.6 Flow pod: 2.10.6 Server is Prefect cloud

zanieb commented 1 year ago

@paulinjo did you change Prefect versions when this started occurring?

paulinjo commented 1 year ago

When this started occurring we were using 2.9.x for the agent and flows. We since updated to 2.10.6 to see if that was the cause, but no change.

majikman111 commented 1 year ago

I'm also experiencing this error after upgrading from prefect 2.10.5 to 2.10.8 with Python 3.8 with Agents deploying Kubernetes jobs. This happens infrequently, but whenever it does the job appears to be scheduled and run twice (All logs appear twice, including Downloading flow code storage at '') with one copy completing successfully and another failing to start a subflow run with the above stack trace. All tasks and subflows are otherwise marked as successful in the UI and complete successfully.

Here's a general approximation of what my flows/subflos look like, but I unfortunately can't get it to reproduce on my local machine.

import time
from prefect import flow, task, get_run_logger

@task
def task0(inpt):
    for inp in inpt:
        get_run_logger().info(inp)
        time.sleep(1)
    return inpt

@task
def task1(inp):
    get_run_logger().info(inp)
    time.sleep(1)
    return True

@flow
def subflow1(inpt):
    future = task0.submit(inpt)
    future.wait()

    future = task2.submit(future.result())

    return future.result()

@task
def task2(val):
    get_run_logger().info(val)
    return 0

@flow
def subflow2(val):
    future = task2.submit(val)
    future.wait()

    return future.result()

@flow
def main():
    get_run_logger().info("Hello World")
    future = task0.submit(list(range(0,10)))
    future.wait()

    subflow1(future)

    subflow2(0)

    return future.result()

if __name__ == "__main__":
    main()
zanieb commented 1 year ago

It's possible this is related to a Cloud bug where the flow run was placed in a late state after it started running. We've released a fix for that now. If you can share some affected flow run and workspace ids, we can check if that was the cause for you.

paulinjo commented 1 year ago

Workspace: e5803b62-2267-4084-bd2a-ca0213518464 Flow Runs:

majikman111 commented 1 year ago

@madkinsz My jobs are running on self-hosted infrastructure. Will fix also be replicated for non-cloud setups?

zanieb commented 1 year ago

@majikman111 we have not seen cases of the bug I described in the OSS but we are replicating a fix anyway yes.

@paulinjo I've confirmed that you are not affected by the described bug. This seems like something else. It looks like the parent is failing because it tries to retrieve the result of an subflow run that has previously completed but the result is not persisted.

paulinjo commented 1 year ago

@madkinsz Is the implication that there's something we need to update to fix this? The flows and underlying infrastructure have not been changed in months, excluding Prefect and other dependency changes and several other flows are running without issue.

paulinjo commented 1 year ago

After some digging in, I found that this was related to a bad configuration on our side. Both our docker container entrypoint and our kubernetes job block were making a call to python -m prefect.engine, causing every flow to be executed twice in a container.

github-actions[bot] commented 1 year ago

This issue is stale because it has been open 30 days with no activity. To keep this issue open remove stale label or comment.

cicdw commented 1 year ago

Hi @paulinjo thank you for the update - is it safe to close this issue now?

sbz commented 6 months ago

We are experimenting the same issue with this (old) version in production

prefect version
Version:             2.9.0
API version:         0.8.4
Python version:      3.10.8
Git commit:          69f57bd5
Built:               Thu, Mar 30, 2023 1:08 PM
OS/Arch:             linux/x86_64
Profile:             objectStorage
Server type:         server

We still have running 2.9.x and we found regularly this kind of stacktrace after clicking "Retry" from the UI. As a result, we cannot continue our flow anymore and are totally stucked in flow progression.

Full trace from the logs in the UI:

Encountered exception during execution:
Traceback (most recent call last):
  File "/prefect/lib/python3.10/site-packages/prefect/engine.py", line 674, in orchestrate_flow_run
    result = await flow_call.aresult()
  File "/prefect/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 181, in aresult
    return await asyncio.wrap_future(self.future)
  File "/prefect/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 194, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
  File "/tmp/tmp4bdjm1a2prefect/reboot.py", line 34, in reboot_flow
    set_maintenance(server_id)
  File "/prefect/lib/python3.10/site-packages/prefect/tasks.py", line 485, in __call__
    return enter_task_run_engine(
  File "/prefect/lib/python3.10/site-packages/prefect/engine.py", line 977, in enter_task_run_engine
    return from_sync.wait_for_call_in_loop_thread(begin_run)
  File "/prefect/lib/python3.10/site-packages/prefect/_internal/concurrency/api.py", line 137, in wait_for_call_in_loop_thread
    return call.result()
  File "/prefect/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 173, in result
    return self.future.result(timeout=timeout)
  File "/opt/python-3.10.8-ovh161/lib/python3.10/concurrent/futures/_base.py", line 451, in result
    return self.__get_result()
  File "/opt/python-3.10.8-ovh161/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/prefect/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
    result = await coro
  File "/prefect/lib/python3.10/site-packages/prefect/engine.py", line 1132, in get_task_call_return_value
    return await future._result()
  File "/prefect/lib/python3.10/site-packages/prefect/futures.py", line 240, in _result
    return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
  File "/prefect/lib/python3.10/site-packages/prefect/states.py", line 98, in _get_state_result
    result = await state.data.get()
  File "/prefect/lib/python3.10/site-packages/prefect/results.py", line 394, in get
    raise MissingResult("The result was not persisted and is no longer available.")
prefect.exceptions.MissingResult: The result was not persisted and is no longer available.

We have tasks that are supposed to persist their Result using @task(persist_result=True) but it seems not functional as intended.