PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
15.88k stars 1.56k forks source link

MissingResult State data is missing. #12977

Closed ahuang11 closed 1 month ago

ahuang11 commented 1 year ago

Occurs when a worker's memory overflows (with_get_dask_client is commented out)

import dask.dataframe
import dask.distributed
from prefect import flow, task
from prefect_dask import DaskTaskRunner, get_dask_client

client = dask.distributed.Client()

@task
def read_data(start: str, end: str) -> dask.dataframe.DataFrame:
    df = dask.datasets.timeseries(start, end, partition_freq="4w")
    return df

@task
def process_data(df) -> dask.dataframe.DataFrame:
    # with get_dask_client():
    df_yearly_avg = df.groupby(df.index.year).mean()
    return df_yearly_avg.compute()

@flow(task_runner=DaskTaskRunner(address=client.scheduler.address))
def dask_flow():
    df = read_data.submit("1988", "2022")
    df_yearly_average = process_data.submit(df)
    return df_yearly_average

dask_flow()

Logs:


15:23:47.601 | INFO    | Task run 'read_data-5bc97744-0' - Finished in state Completed()
2023-01-10 15:23:51,373 - distributed.nanny.memory - WARNING - Worker tcp://127.0.0.1:51660 (pid=5119) exceeded 95% memory budget. Restarting...
15:23:51.373 | WARNING | distributed.nanny.memory - Worker tcp://127.0.0.1:51660 (pid=5119) exceeded 95% memory budget. Restarting...
15:23:51.463 | INFO    | distributed.nanny - Worker process 5119 was killed by signal 15
15:23:51.465 | INFO    | distributed.core - Connection to tcp://127.0.0.1:51668 has been closed.
15:23:51.466 | INFO    | distributed.scheduler - Remove worker <WorkerState 'tcp://127.0.0.1:51660', name: 1, status: running, memory: 1, processing: 1>
15:23:51.466 | INFO    | distributed.core - Removing comms to tcp://127.0.0.1:51660
2023-01-10 15:23:51,469 - distributed.nanny - WARNING - Restarting worker
15:23:51.469 | WARNING | distributed.nanny - Restarting worker
15:23:51.890 | INFO    | distributed.scheduler - Register worker <WorkerState 'tcp://127.0.0.1:53762', name: 1, status: init, memory: 0, processing: 0>
15:23:51.892 | INFO    | distributed.scheduler - Starting worker compute stream, tcp://127.0.0.1:53762
15:23:51.893 | INFO    | distributed.core - Starting established connection to tcp://127.0.0.1:53765
15:23:52.804 | INFO    | Task run 'read_data-5bc97744-0' - Task run '9b5215ca-1822-4c6e-930d-34480fca578e' already finished.
15:23:53.025 | ERROR   | Task run 'process_data-090555ba-0' - Crash detected! Execution was interrupted by an unexpected exception: MissingResult: State data is missing. Typically, this occurs when result persistence is disabled and the state has been retrieved from the API.

2023-01-10 15:23:53,499 - distributed.worker - WARNING - Compute Failed
Key:       process_data-090555ba-0-fa34a3b0fd834a6aa2fd9ece3d2c233c-1
Function:  begin_task_run
args:      ()
kwargs:    {'task': <prefect.tasks.Task object at 0x10cc5b6d0>, 'task_run': TaskRun(id=UUID('fa34a3b0-fd83-4a6a-a2fd-9ece3d2c233c'), name='process_data-090555ba-0', flow_run_id=UUID('51a4a2f7-bdbe-4433-b54f-bb0b60b9e95a'), task_key='__main__.process_data', dynamic_key='0', cache_key=None, cache_expiration=None, task_version=None, empirical_policy=TaskRunPolicy(max_retries=0, retry_delay_seconds=0.0, retries=0, retry_delay=0, retry_jitter_factor=None), tags=[], state_id=UUID('3a84e6eb-1a87-487f-8956-7a268a8fb1c3'), task_inputs={'df': [TaskRunResult(input_type='task_run', id=UUID('9b5215ca-1822-4c6e-930d-34480fca578e'))]}, state_type=StateType.PENDING, state_name='Pending', run_count=0, flow_run_run_count=0, expected_start_time=DateTime(2023, 1, 10, 23, 23, 46, 173061, tzinfo=Timezone('+00:00')), next_scheduled_start_time=None, start_time=None, end_time=None, total_run_time=datetime.timedelta(0), estimated_run_time=datetime.timedelta(0), estimated_start_time_delta=datetime.timedelta(microseconds=41
Exception: "MissingResult('State data is missing. Typically, this occurs when result persistence is disabled and the state has been retrieved from the API.')"

15:23:53.499 | WARNING | distributed.worker - Compute Failed
Key:       process_data-090555ba-0-fa34a3b0fd834a6aa2fd9ece3d2c233c-1
Function:  begin_task_run
args:      ()
kwargs:    {'task': <prefect.tasks.Task object at 0x10cc5b6d0>, 'task_run': TaskRun(id=UUID('fa34a3b0-fd83-4a6a-a2fd-9ece3d2c233c'), name='process_data-090555ba-0', flow_run_id=UUID('51a4a2f7-bdbe-4433-b54f-bb0b60b9e95a'), task_key='__main__.process_data', dynamic_key='0', cache_key=None, cache_expiration=None, task_version=None, empirical_policy=TaskRunPolicy(max_retries=0, retry_delay_seconds=0.0, retries=0, retry_delay=0, retry_jitter_factor=None), tags=[], state_id=UUID('3a84e6eb-1a87-487f-8956-7a268a8fb1c3'), task_inputs={'df': [TaskRunResult(input_type='task_run', id=UUID('9b5215ca-1822-4c6e-930d-34480fca578e'))]}, state_type=StateType.PENDING, state_name='Pending', run_count=0, flow_run_run_count=0, expected_start_time=DateTime(2023, 1, 10, 23, 23, 46, 173061, tzinfo=Timezone('+00:00')), next_scheduled_start_time=None, start_time=None, end_time=None, total_run_time=datetime.timedelta(0), estimated_run_time=datetime.timedelta(0), estimated_start_time_delta=datetime.timedelta(microseconds=41
Exception: "MissingResult('State data is missing. Typically, this occurs when result persistence is disabled and the state has been retrieved from the API.')"

Traceback:

---------------------------------------------------------------------------
MissingResult                             Traceback (most recent call last)
Cell In[1], line 24
     21     df_yearly_average = process_data.submit(df)
     22     return df_yearly_average
---> 24 dask_flow()

File ~/mambaforge/envs/dask/lib/python3.9/site-packages/prefect/flows.py:448, in Flow.__call__(self, return_state, wait_for, *args, **kwargs)
    444 parameters = get_call_parameters(self.fn, args, kwargs)
    446 return_type = "state" if return_state else "result"
--> 448 return enter_flow_run_engine_from_flow_call(
    449     self,
    450     parameters,
    451     wait_for=wait_for,
    452     return_type=return_type,
    453 )

File ~/mambaforge/envs/dask/lib/python3.9/site-packages/prefect/engine.py:161, in enter_flow_run_engine_from_flow_call(flow, parameters, wait_for, return_type)
    157 elif in_async_main_thread():
    158     # An event loop is already running and we must create a blocking portal to
    159     # run async code from this synchronous context
    160     with start_blocking_portal() as portal:
--> 161         return portal.call(begin_run)
    162 else:
    163     # An event loop is not running so we will create one
    164     return anyio.run(begin_run)

File ~/mambaforge/envs/dask/lib/python3.9/site-packages/anyio/from_thread.py:283, in BlockingPortal.call(self, func, *args)
    268 def call(
    269     self,
    270     func: Callable[..., Union[Coroutine[Any, Any, T_Retval], T_Retval]],
    271     *args: object
    272 ) -> T_Retval:
    273     """
    274     Call the given function in the event loop thread.
    275 
   (...)
    281 
    282     """
--> 283     return cast(T_Retval, self.start_task_soon(func, *args).result())

File ~/mambaforge/envs/dask/lib/python3.9/concurrent/futures/_base.py:446, in Future.result(self, timeout)
    444     raise CancelledError()
    445 elif self._state == FINISHED:
--> 446     return self.__get_result()
    447 else:
    448     raise TimeoutError()

File ~/mambaforge/envs/dask/lib/python3.9/concurrent/futures/_base.py:391, in Future.__get_result(self)
    389 if self._exception:
    390     try:
--> 391         raise self._exception
    392     finally:
    393         # Break a reference cycle with the exception in self._exception
    394         self = None

File ~/mambaforge/envs/dask/lib/python3.9/site-packages/anyio/from_thread.py:219, in BlockingPortal._call_func(self, func, args, kwargs, future)
    216             else:
    217                 future.add_done_callback(callback)
--> 219             retval = await retval
    220 except self._cancelled_exc_class:
    221     future.cancel()

File ~/mambaforge/envs/dask/lib/python3.9/site-packages/prefect/client/utilities.py:47, in inject_client.<locals>.with_injected_client(*args, **kwargs)
     45 async with client_context as new_client:
     46     kwargs.setdefault("client", new_client or client)
---> 47     return await fn(*args, **kwargs)

File ~/mambaforge/envs/dask/lib/python3.9/site-packages/prefect/engine.py:244, in create_then_begin_flow_run(flow, parameters, wait_for, return_type, client)
    242     return state
    243 elif return_type == "result":
--> 244     return await state.result(fetch=True)
    245 else:
    246     raise ValueError(f"Invalid return type for flow engine {return_type!r}.")

File ~/mambaforge/envs/dask/lib/python3.9/site-packages/prefect/states.py:89, in _get_state_result(state, raise_on_failure)
     84     raise PausedRun("Run paused.")
     86 if raise_on_failure and (
     87     state.is_crashed() or state.is_failed() or state.is_cancelled()
     88 ):
---> 89     raise await get_state_exception(state)
     91 if isinstance(state.data, DataDocument):
     92     result = result_from_state_with_data_document(
     93         state, raise_on_failure=raise_on_failure
     94     )

File ~/Applications/python/prefect-dask/prefect_dask/task_runners.py:269, in DaskTaskRunner.wait(self, key, timeout)
    267 future = self._get_dask_future(key)
    268 try:
--> 269     return await future.result(timeout=timeout)
    270 except distributed.TimeoutError:
    271     return None

File ~/Applications/python/test/distributed/distributed/client.py:296, in Future._result(self, raiseit)
    294 if raiseit:
    295     typ, exc, tb = exc
--> 296     raise exc.with_traceback(tb)
    297 else:
    298     return exc

File ~/mambaforge/envs/dask/lib/python3.9/site-packages/prefect/engine.py:1303, in begin_task_run()
   1297     raise RuntimeError(
   1298         f"Cannot orchestrate task run '{task_run.id}'. "
   1299         f"Failed to connect to API at {client.api_url}."
   1300     ) from connect_error
   1302 try:
-> 1303     state = await orchestrate_task_run(
   1304         task=task,
   1305         task_run=task_run,
   1306         parameters=parameters,
   1307         wait_for=wait_for,
   1308         result_factory=result_factory,
   1309         log_prints=log_prints,
   1310         interruptible=interruptible,
   1311         client=client,
   1312     )
   1314     if not maybe_flow_run_context:
   1315         # When a a task run finishes on a remote worker flush logs to prevent
   1316         # loss if the process exits
   1317         OrionHandler.flush(block=True)

File ~/mambaforge/envs/dask/lib/python3.9/site-packages/prefect/engine.py:1378, in orchestrate_task_run()
   1367 partial_task_run_context = PartialModel(
   1368     TaskRunContext,
   1369     task_run=task_run,
   (...)
   1373     log_prints=log_prints,
   1374 )
   1376 try:
   1377     # Resolve futures in parameters into data
-> 1378     resolved_parameters = await resolve_inputs(parameters)
   1379     # Resolve futures in any non-data dependencies to ensure they are ready
   1380     await resolve_inputs(wait_for, return_data=False)

File ~/mambaforge/envs/dask/lib/python3.9/site-packages/prefect/engine.py:1671, in resolve_inputs()
   1668     # Only retrieve the result if requested as it may be expensive
   1669     return state.result(raise_on_failure=False, fetch=True) if return_data else None
-> 1671 return await run_sync_in_worker_thread(
   1672     visit_collection,
   1673     parameters,
   1674     visit_fn=resolve_input,
   1675     return_data=return_data,
   1676     max_depth=max_depth,
   1677     remove_annotations=True,
   1678     context={},
   1679 )

File ~/mambaforge/envs/dask/lib/python3.9/site-packages/prefect/utilities/asyncutils.py:91, in run_sync_in_worker_thread()
     80 """
     81 Runs a sync function in a new worker thread so that the main thread's event loop
     82 is not blocked
   (...)
     88 thread may continue running — the outcome will just be ignored.
     89 """
     90 call = partial(__fn, *args, **kwargs)
---> 91 return await anyio.to_thread.run_sync(
     92     call, cancellable=True, limiter=get_thread_limiter()
     93 )

File ~/mambaforge/envs/dask/lib/python3.9/site-packages/anyio/to_thread.py:31, in run_sync()
     10 async def run_sync(
     11     func: Callable[..., T_Retval],
     12     *args: object,
     13     cancellable: bool = False,
     14     limiter: Optional[CapacityLimiter] = None
     15 ) -> T_Retval:
     16     """
     17     Call the given function with the given arguments in a worker thread.
     18 
   (...)
     29 
     30     """
---> 31     return await get_asynclib().run_sync_in_worker_thread(
     32         func, *args, cancellable=cancellable, limiter=limiter
     33     )

File ~/mambaforge/envs/dask/lib/python3.9/site-packages/anyio/_backends/_asyncio.py:937, in run_sync_in_worker_thread()
    935 context.run(sniffio.current_async_library_cvar.set, None)
    936 worker.queue.put_nowait((context, func, args, future))
--> 937 return await future

File ~/mambaforge/envs/dask/lib/python3.9/site-packages/anyio/_backends/_asyncio.py:867, in run()
    865 exception: Optional[BaseException] = None
    866 try:
--> 867     result = context.run(func, *args)
    868 except BaseException as exc:
    869     exception = exc

File ~/mambaforge/envs/dask/lib/python3.9/site-packages/prefect/utilities/collections.py:318, in visit_collection()
    316 elif typ in (dict, OrderedDict):
    317     assert isinstance(expr, (dict, OrderedDict))  # typecheck assertion
--> 318     items = [(visit_nested(k), visit_nested(v)) for k, v in expr.items()]
    319     result = typ(items) if return_data else None
    321 elif is_dataclass(expr) and not isinstance(expr, type):

File ~/mambaforge/envs/dask/lib/python3.9/site-packages/prefect/utilities/collections.py:318, in <listcomp>()
    316 elif typ in (dict, OrderedDict):
    317     assert isinstance(expr, (dict, OrderedDict))  # typecheck assertion
--> 318     items = [(visit_nested(k), visit_nested(v)) for k, v in expr.items()]
    319     result = typ(items) if return_data else None
    321 elif is_dataclass(expr) and not isinstance(expr, type):

File ~/mambaforge/envs/dask/lib/python3.9/site-packages/prefect/utilities/collections.py:264, in visit_nested()
    262 def visit_nested(expr):
    263     # Utility for a recursive call, preserving options and updating the depth.
--> 264     return visit_collection(
    265         expr,
    266         visit_fn=visit_fn,
    267         return_data=return_data,
    268         remove_annotations=remove_annotations,
    269         max_depth=max_depth - 1,
    270         # Copy the context on nested calls so it does not "propagate up"
    271         context=context.copy() if context is not None else None,
    272     )

File ~/mambaforge/envs/dask/lib/python3.9/site-packages/prefect/utilities/collections.py:281, in visit_collection()
    278         return visit_fn(expr)
    280 # Visit every expression
--> 281 result = visit_expression(expr)
    283 if return_data:
    284     # Only mutate the expression while returning data, otherwise it could be null
    285     expr = result

File ~/mambaforge/envs/dask/lib/python3.9/site-packages/prefect/utilities/collections.py:276, in visit_expression()
    274 def visit_expression(expr):
    275     if context is not None:
--> 276         return visit_fn(expr, context)
    277     else:
    278         return visit_fn(expr)

File ~/mambaforge/envs/dask/lib/python3.9/site-packages/prefect/engine.py:1669, in resolve_input()
   1664     raise UpstreamTaskError(
   1665         f"Upstream task run '{state.state_details.task_run_id}' did not reach a 'COMPLETED' state."
   1666     )
   1668 # Only retrieve the result if requested as it may be expensive
-> 1669 return state.result(raise_on_failure=False, fetch=True) if return_data else None

File ~/mambaforge/envs/dask/lib/python3.9/site-packages/prefect/client/schemas.py:107, in result()
     35 """
     36 Retrieve the result attached to this state.
     37 
   (...)
    103     hello
    104 """
    105 from prefect.states import get_state_result
--> 107 return get_state_result(self, raise_on_failure=raise_on_failure, fetch=fetch)

File ~/mambaforge/envs/dask/lib/python3.9/site-packages/prefect/states.py:74, in get_state_result()
     72         return state.data
     73 else:
---> 74     return _get_state_result(state, raise_on_failure=raise_on_failure)

File ~/mambaforge/envs/dask/lib/python3.9/site-packages/prefect/utilities/asyncutils.py:226, in coroutine_wrapper()
    222     return async_fn(*args, **kwargs)
    223 elif in_async_worker_thread():
    224     # In a sync context but we can access the event loop thread; send the async
    225     # call to the parent
--> 226     return run_async_from_worker_thread(async_fn, *args, **kwargs)
    227 else:
    228     # In a sync context and there is no event loop; just create an event loop
    229     # to run the async code then tear it down
    230     return run_async_in_new_loop(async_fn, *args, **kwargs)

File ~/mambaforge/envs/dask/lib/python3.9/site-packages/prefect/utilities/asyncutils.py:177, in run_async_from_worker_thread()
    172 """
    173 Runs an async function in the main thread's event loop, blocking the worker
    174 thread until completion
    175 """
    176 call = partial(__fn, *args, **kwargs)
--> 177 return anyio.from_thread.run(call)

File ~/mambaforge/envs/dask/lib/python3.9/site-packages/anyio/from_thread.py:49, in run()
     46 except AttributeError:
     47     raise RuntimeError("This function can only be run from an AnyIO worker thread")
---> 49 return asynclib.run_async_from_thread(func, *args)

File ~/mambaforge/envs/dask/lib/python3.9/site-packages/anyio/_backends/_asyncio.py:970, in run_async_from_thread()
    964 def run_async_from_thread(
    965     func: Callable[..., Coroutine[Any, Any, T_Retval]], *args: object
    966 ) -> T_Retval:
    967     f: concurrent.futures.Future[T_Retval] = asyncio.run_coroutine_threadsafe(
    968         func(*args), threadlocals.loop
    969     )
--> 970     return f.result()

File ~/mambaforge/envs/dask/lib/python3.9/concurrent/futures/_base.py:446, in result()
    444     raise CancelledError()
    445 elif self._state == FINISHED:
--> 446     return self.__get_result()
    447 else:
    448     raise TimeoutError()

File ~/mambaforge/envs/dask/lib/python3.9/concurrent/futures/_base.py:391, in __get_result()
    389 if self._exception:
    390     try:
--> 391         raise self._exception
    392     finally:
    393         # Break a reference cycle with the exception in self._exception
    394         self = None

File ~/mambaforge/envs/dask/lib/python3.9/site-packages/prefect/states.py:101, in _get_state_result()
     99         return await get_state_exception(state)
    100     else:
--> 101         raise MissingResult(
    102             "State data is missing. "
    103             "Typically, this occurs when result persistence is disabled and the "
    104             "state has been retrieved from the API."
    105         )
    107 else:
    108     # The result is attached directly
    109     result = state.data

MissingResult: State data is missing. Typically, this occurs when result persistence is disabled and the state has been retrieved from the API.
AleksandrLiadov commented 1 year ago

@ahuang11, @madkinsz hello, Did you resolve this issue?

I have met the same issue :(

And it seems to be a little bit random, sometimes my flows are executed without this problem.

tonal commented 4 months ago

I have same iusse in RayTaskRunner. :(

tonal commented 4 months ago

For me, the situation has been consistently reproduced in the following situation:

@task(tags={'load'}, log_prints=True, retries=3, retry_delay_seconds=5,)
async def load_raw(url:str, auth:Auth|None=None, **kwds) -> bytes:
  content = await _load_raw_inner(url, auth=auth, **kwds)
  return content

_load_raw = load_raw.with_options(
  name=f'load-raw-{PROV_NAME.lower()}', tags=load_raw.tags | {PROV_NAME},
  cache_key_fn=task_input_hash, persist_result=True,
  cache_expiration=_cache_expiration, retries=3, retry_delay_seconds=5,)

@flow
async def epool_rem_gr_active_group_csv():
  cont = await _load_raw(URL, verify=False, timeout=15.)
  ...

Fixed it by moving the caching parameters to the task definition:


@task(
  tags={'load'}, log_prints=True,
  cache_key_fn=task_input_hash, persist_result=True, retries=3, retry_delay_seconds=5,)
async def load_raw(url:str, auth:Auth|None=None, **kwds) -> bytes:
  content = await _load_raw_inner(url, auth=auth, **kwds)
  return content

_load_raw = load_raw.with_options(
  name=f'load-raw-{PROV_NAME.lower()}', tags=load_raw.tags | {PROV_NAME},
  cache_expiration=_cache_expiration,)

@flow
async def epool_rem_gr_active_group_csv():
  cont = await _load_raw(URL, verify=False, timeout=15.)
  ...
tonal commented 4 months ago

Fix do not work. :(

tonal commented 4 months ago

see also #8228

tonal commented 4 months ago

and #8415

tonal commented 4 months ago

Downgrate prefect to 2.16.9 The error has almost disappeared

cicdw commented 1 month ago

This pattern of result persistence is updated and fixed in 3.0 - I'm going to close this but if there are other issues that arise, please open a new issue.