PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
16.4k stars 1.59k forks source link

Errors raised in mapped tasks result in UnfinishedRun when futures are passed to another task, source error not included in stack trace #12028

Open austinweisgrau opened 8 months ago

austinweisgrau commented 8 months ago

First check

Bug summary

"Normally," when an unhandled exception is raised in a task, that error will be raised all the way to the "top" of the stack and the Prefect flow will fail on that exception, with a clear stack trace showing the context that raised the error. This is true if a task is called directly or is mapped.

However, if the results from a mapped task "A" are passed to another task "B" as an argument, and an exception is raised within an instance of task "A", then the Prefect flow will fail when calling task "B" with an UnfinishedRun error, and the stack trace will not show the exception in task "A".

Reproduction

from prefect import flow, task

@task
def fetch_values() -> list[int]:
    result = [i for i in range(10)]
    return result

@task
def transform_value(value: int) -> int:
    if value == 6:
        raise ValueError
    result = value * 2
    return result

@task
def log_results(values: list[int]) -> None:
    for i in values:
        print(i)

@flow
def myflow():
    values = fetch_values()
    transformed = transform_value.map(values)

    # If this method is not called, the ValueError will be raised clearly and coherently
    # But if this method is called, the flow will fail here with an UnfinishedRun exception
    log_results(transformed)

if __name__ == "__main__":
    myflow()

Error

10:01:41.075 | INFO    | prefect.engine - Created flow run 'dashing-snail' for flow 'myflow'
10:01:41.082 | INFO    | Flow run 'dashing-snail' - View at ...
10:01:42.714 | INFO    | Flow run 'dashing-snail' - Created task run 'fetch_values-0' for task 'fetch_values'
10:01:42.716 | INFO    | Flow run 'dashing-snail' - Executing 'fetch_values-0' immediately...
10:01:44.052 | INFO    | Task run 'fetch_values-0' - Finished in state Completed()
10:01:44.365 | INFO    | Flow run 'dashing-snail' - Created task run 'transform_value-7' for task 'transform_value'
10:01:44.367 | INFO    | Flow run 'dashing-snail' - Submitted task run 'transform_value-7' for execution.
10:01:44.384 | INFO    | Flow run 'dashing-snail' - Created task run 'transform_value-0' for task 'transform_value'
10:01:44.387 | INFO    | Flow run 'dashing-snail' - Submitted task run 'transform_value-0' for execution.
10:01:44.397 | INFO    | Flow run 'dashing-snail' - Created task run 'transform_value-2' for task 'transform_value'
10:01:44.399 | INFO    | Flow run 'dashing-snail' - Submitted task run 'transform_value-2' for execution.
10:01:44.419 | INFO    | Flow run 'dashing-snail' - Created task run 'transform_value-5' for task 'transform_value'
10:01:44.421 | INFO    | Flow run 'dashing-snail' - Submitted task run 'transform_value-5' for execution.
10:01:44.433 | INFO    | Flow run 'dashing-snail' - Created task run 'transform_value-6' for task 'transform_value'
10:01:44.436 | INFO    | Flow run 'dashing-snail' - Submitted task run 'transform_value-6' for execution.
10:01:44.447 | INFO    | Flow run 'dashing-snail' - Created task run 'transform_value-9' for task 'transform_value'
10:01:44.450 | INFO    | Flow run 'dashing-snail' - Submitted task run 'transform_value-9' for execution.
10:01:44.457 | INFO    | Flow run 'dashing-snail' - Created task run 'transform_value-3' for task 'transform_value'
10:01:44.459 | INFO    | Flow run 'dashing-snail' - Submitted task run 'transform_value-3' for execution.
10:01:44.468 | INFO    | Flow run 'dashing-snail' - Created task run 'transform_value-4' for task 'transform_value'
10:01:44.470 | INFO    | Flow run 'dashing-snail' - Submitted task run 'transform_value-4' for execution.
10:01:44.484 | INFO    | Flow run 'dashing-snail' - Created task run 'transform_value-1' for task 'transform_value'
10:01:44.487 | INFO    | Flow run 'dashing-snail' - Submitted task run 'transform_value-1' for execution.
10:01:44.496 | INFO    | Flow run 'dashing-snail' - Created task run 'transform_value-8' for task 'transform_value'
10:01:44.498 | INFO    | Flow run 'dashing-snail' - Submitted task run 'transform_value-8' for execution.
10:01:44.699 | INFO    | Flow run 'dashing-snail' - Created task run 'log_results-0' for task 'log_results'
10:01:44.701 | INFO    | Flow run 'dashing-snail' - Executing 'log_results-0' immediately...
10:01:45.043 | ERROR   | Task run 'transform_value-6' - Encountered exception during execution:
Traceback (most recent call last):
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/prefect/engine.py", line 1623, in orchestrate_task_run
    result = await call.aresult()
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 181, in aresult
    return await asyncio.wrap_future(self.future)
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 194, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
  File "/home/aradox/code/wfp/wfp-prefect/example.py", line 12, in transform_value
    raise ValueError
ValueError
10:01:45.230 | INFO    | Task run 'transform_value-2' - Finished in state Completed()
10:01:45.246 | INFO    | Task run 'transform_value-4' - Finished in state Completed()
10:01:45.305 | ERROR   | Task run 'transform_value-6' - Finished in state Failed('Task run encountered an exception: ValueError\n')
10:01:45.312 | INFO    | Task run 'transform_value-3' - Finished in state Completed()
10:01:45.319 | INFO    | Task run 'transform_value-7' - Finished in state Completed()
10:01:45.349 | INFO    | Task run 'transform_value-0' - Finished in state Completed()
10:01:45.356 | INFO    | Task run 'transform_value-9' - Finished in state Completed()
10:01:45.368 | INFO    | Task run 'transform_value-8' - Finished in state Completed()
10:01:45.374 | INFO    | Task run 'transform_value-5' - Finished in state Completed()
10:01:45.689 | INFO    | Task run 'transform_value-1' - Finished in state Completed()
10:01:45.994 | ERROR   | Flow run 'dashing-snail' - Encountered exception during execution:
Traceback (most recent call last):
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/prefect/engine.py", line 742, in orchestrate_flow_run
    result = await flow_call.aresult()
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 181, in aresult
    return await asyncio.wrap_future(self.future)
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 194, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
  File "/home/aradox/code/wfp/wfp-prefect/example.py", line 25, in myflow
    log_results(transformed)
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/prefect/tasks.py", line 505, in __call__
    return enter_task_run_engine(
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/prefect/engine.py", line 1045, in enter_task_run_engine
    return from_sync.wait_for_call_in_loop_thread(begin_run)
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/prefect/_internal/concurrency/api.py", line 232, in wait_for_call_in_loop_thread
    return call.result()
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 173, in result
    return self.future.result(timeout=timeout)
  File "/home/aradox/.pyenv/versions/3.10.9/lib/python3.10/concurrent/futures/_base.py", line 451, in result
    return self.__get_result()
  File "/home/aradox/.pyenv/versions/3.10.9/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
    result = await coro
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/prefect/engine.py", line 1210, in get_task_call_return_value
    return await future._result()
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/prefect/futures.py", line 241, in _result
    return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/prefect/states.py", line 85, in _get_state_result
    raise UnfinishedRun(
prefect.exceptions.UnfinishedRun: Run is in PENDING state, its result is not available.
10:01:46.297 | ERROR   | Flow run 'dashing-snail' - Finished in state Failed('Flow run encountered an exception. UnfinishedRun: Run is in PENDING state, its result is not available.\n')
Traceback (most recent call last):
  File "/home/aradox/code/wfp/wfp-prefect/example.py", line 28, in <module>
    myflow()
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/prefect/flows.py", line 518, in __call__
    return enter_flow_run_engine_from_flow_call(
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/prefect/engine.py", line 198, in enter_flow_run_engine_from_flow_call
    retval = from_sync.wait_for_call_in_loop_thread(
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/prefect/_internal/concurrency/api.py", line 232, in wait_for_call_in_loop_thread
    return call.result()
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 173, in result
    return self.future.result(timeout=timeout)
  File "/home/aradox/.pyenv/versions/3.10.9/lib/python3.10/concurrent/futures/_base.py", line 451, in result
    return self.__get_result()
  File "/home/aradox/.pyenv/versions/3.10.9/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
    result = await coro
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/prefect/client/utilities.py", line 40, in with_injected_client
    return await fn(*args, **kwargs)
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/prefect/engine.py", line 292, in create_then_begin_flow_run
    return await state.result(fetch=True)
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/prefect/states.py", line 92, in _get_state_result
    raise await get_state_exception(state)
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/prefect/engine.py", line 742, in orchestrate_flow_run
    result = await flow_call.aresult()
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 181, in aresult
    return await asyncio.wrap_future(self.future)
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 194, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
  File "/home/aradox/code/wfp/wfp-prefect/example.py", line 25, in myflow
    log_results(transformed)
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/prefect/tasks.py", line 505, in __call__
    return enter_task_run_engine(
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/prefect/engine.py", line 1045, in enter_task_run_engine
    return from_sync.wait_for_call_in_loop_thread(begin_run)
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/prefect/_internal/concurrency/api.py", line 232, in wait_for_call_in_loop_thread
    return call.result()
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 173, in result
    return self.future.result(timeout=timeout)
  File "/home/aradox/.pyenv/versions/3.10.9/lib/python3.10/concurrent/futures/_base.py", line 451, in result
    return self.__get_result()
  File "/home/aradox/.pyenv/versions/3.10.9/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
    result = await coro
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/prefect/engine.py", line 1210, in get_task_call_return_value
    return await future._result()
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/prefect/futures.py", line 241, in _result
    return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/prefect/states.py", line 85, in _get_state_result
    raise UnfinishedRun(
prefect.exceptions.UnfinishedRun: Run is in PENDING state, its result is not available.

Versions

Version:             2.10.11
API version:         0.8.4
Python version:      3.10.9
Git commit:          8c651ffc
Built:               Thu, May 25, 2023 2:59 PM
OS/Arch:             linux/x86_64
Profile:             default
Server type:         cloud

Additional context

No response

austinweisgrau commented 8 months ago

Might be related to #8124?

serinamarie commented 8 months ago

HI @austinweisgrau, thanks for the issue! While I've not reproduced it, the true error from task A certainly appears to be something we'd ideally raise here. We've added it to our backlog :)

mattiamatrix commented 6 months ago

Hello, do you have any updates on this? I am facing the same issue.

I have a .map applied to a task and I would like to allow_failure to some of them.

I also tried this but it does not seem to work

tasks = my_function.map(
    ...,
    return_state=True,
)

for task in tasks:
    result = task.result(raise_on_failure=False)
    logger.info(result)
christiansicari commented 5 months ago

Hi there, same issue here! I am trying to have a map reduce workflow on Prefect. The idea is that the map submits some tasks and return the futures, the reduce collects the results and return an aggregation of them.

This my dummy code

from random import randint
from prefect import task, flow
class DummyError(Exception):
    pass

@task
def say_hello():
    print("Hello, world!")

@task
def t1():
    print("t1")

@task
def t2():
    print("t2")

@task
def t3():
    raise DummyError("t3")

@task
def mapper():
    task_id = randint(1, 2)
    task_instance = globals()[f"t{task_id}"]  # dynamically select a task
    return [
        task_instance.submit(),
        t3.submit()
    ]

@task
def reducer(futures):
    for future in futures:
        try:
            future.result()
        except Exception as e:
            print(f"Error: {e}")

@flow
def myflow():
    say_hello()
    futures = mapper()
    reducer(futures)
    say_hello()
    return "OK"

if __name__ == "__main__":
    result = myflow()

This is the stacktrace

17:43:17.839 | ERROR   | Task run 't3-0' - Encountered exception during execution:
Traceback (most recent call last):
  File "/Users/csicari/PycharmProjects/dummy_prefect/.venv/lib/python3.10/site-packages/prefect/engine.py", line 2103, in orchestrate_task_run
    result = await call.aresult()
  File "/Users/csicari/PycharmProjects/dummy_prefect/.venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 327, in aresult
    return await asyncio.wrap_future(self.future)
  File "/Users/csicari/PycharmProjects/dummy_prefect/.venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 352, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
  File "/Users/csicari/PycharmProjects/dummy_prefect/test.py", line 25, in t3
    raise DummyError("t3")
DummyError: t3
17:43:18.061 | INFO    | Task run 't1-0' - Finished in state Completed()
17:43:18.088 | ERROR   | Task run 't3-0' - Finished in state Failed('Task run encountered an exception DummyError: t3')
17:43:18.346 | ERROR   | Flow run 'scarlet-walrus' - Encountered exception during execution:
Traceback (most recent call last):
  File "/Users/csicari/PycharmProjects/dummy_prefect/.venv/lib/python3.10/site-packages/prefect/engine.py", line 877, in orchestrate_flow_run
    result = await flow_call.aresult()
  File "/Users/csicari/PycharmProjects/dummy_prefect/.venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 327, in aresult
    return await asyncio.wrap_future(self.future)
  File "/Users/csicari/PycharmProjects/dummy_prefect/.venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 352, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
  File "/Users/csicari/PycharmProjects/dummy_prefect/test.py", line 51, in myflow
    reducer(futures)
  File "/Users/csicari/PycharmProjects/dummy_prefect/.venv/lib/python3.10/site-packages/prefect/tasks.py", line 689, in __call__
    return enter_task_run_engine(
  File "/Users/csicari/PycharmProjects/dummy_prefect/.venv/lib/python3.10/site-packages/prefect/engine.py", line 1421, in enter_task_run_engine
    return from_sync.wait_for_call_in_loop_thread(begin_run)
  File "/Users/csicari/PycharmProjects/dummy_prefect/.venv/lib/python3.10/site-packages/prefect/_internal/concurrency/api.py", line 218, in wait_for_call_in_loop_thread
    return call.result()
  File "/Users/csicari/PycharmProjects/dummy_prefect/.venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 318, in result
    return self.future.result(timeout=timeout)
  File "/Users/csicari/PycharmProjects/dummy_prefect/.venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 179, in result
    return self.__get_result()
  File "/Users/csicari/.pyenv/versions/3.10.12/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/Users/csicari/PycharmProjects/dummy_prefect/.venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 389, in _run_async
    result = await coro
  File "/Users/csicari/PycharmProjects/dummy_prefect/.venv/lib/python3.10/site-packages/prefect/engine.py", line 1555, in get_task_call_return_value
    return await future._result()
  File "/Users/csicari/PycharmProjects/dummy_prefect/.venv/lib/python3.10/site-packages/prefect/futures.py", line 237, in _result
    return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
  File "/Users/csicari/PycharmProjects/dummy_prefect/.venv/lib/python3.10/site-packages/prefect/states.py", line 84, in _get_state_result
    raise UnfinishedRun(
prefect.exceptions.UnfinishedRun: Run is in PENDING state, its result is not available.
17:43:18.632 | ERROR   | Flow run 'scarlet-walrus' - Finished in state Failed('Flow run encountered an exception. UnfinishedRun: Run is in PENDING state, its result is not available.')
Traceback (most recent call last):
  File "/Users/csicari/.pyenv/versions/3.10.12/lib/python3.10/runpy.py", line 196, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "/Users/csicari/.pyenv/versions/3.10.12/lib/python3.10/runpy.py", line 86, in _run_code
    exec(code, run_globals)
  File "/Users/csicari/PycharmProjects/dummy_prefect/test.py", line 57, in <module>
    result = myflow()
  File "/Users/csicari/PycharmProjects/dummy_prefect/.venv/lib/python3.10/site-packages/prefect/flows.py", line 1229, in __call__
    return enter_flow_run_engine_from_flow_call(
  File "/Users/csicari/PycharmProjects/dummy_prefect/.venv/lib/python3.10/site-packages/prefect/engine.py", line 293, in enter_flow_run_engine_from_flow_call
    retval = from_sync.wait_for_call_in_loop_thread(
  File "/Users/csicari/PycharmProjects/dummy_prefect/.venv/lib/python3.10/site-packages/prefect/_internal/concurrency/api.py", line 218, in wait_for_call_in_loop_thread
    return call.result()
  File "/Users/csicari/PycharmProjects/dummy_prefect/.venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 318, in result
    return self.future.result(timeout=timeout)
  File "/Users/csicari/PycharmProjects/dummy_prefect/.venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 179, in result
    return self.__get_result()
  File "/Users/csicari/.pyenv/versions/3.10.12/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/Users/csicari/PycharmProjects/dummy_prefect/.venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 389, in _run_async
    result = await coro
  File "/Users/csicari/PycharmProjects/dummy_prefect/.venv/lib/python3.10/site-packages/prefect/client/utilities.py", line 100, in with_injected_client
    return await fn(*args, **kwargs)
  File "/Users/csicari/PycharmProjects/dummy_prefect/.venv/lib/python3.10/site-packages/prefect/engine.py", line 396, in create_then_begin_flow_run
    return await state.result(fetch=True)
  File "/Users/csicari/PycharmProjects/dummy_prefect/.venv/lib/python3.10/site-packages/prefect/states.py", line 91, in _get_state_result
    raise await get_state_exception(state)
  File "/Users/csicari/PycharmProjects/dummy_prefect/.venv/lib/python3.10/site-packages/prefect/engine.py", line 877, in orchestrate_flow_run
    result = await flow_call.aresult()
  File "/Users/csicari/PycharmProjects/dummy_prefect/.venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 327, in aresult
    return await asyncio.wrap_future(self.future)
  File "/Users/csicari/PycharmProjects/dummy_prefect/.venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 352, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
  File "/Users/csicari/PycharmProjects/dummy_prefect/test.py", line 51, in myflow
    reducer(futures)
  File "/Users/csicari/PycharmProjects/dummy_prefect/.venv/lib/python3.10/site-packages/prefect/tasks.py", line 689, in __call__
    return enter_task_run_engine(
  File "/Users/csicari/PycharmProjects/dummy_prefect/.venv/lib/python3.10/site-packages/prefect/engine.py", line 1421, in enter_task_run_engine
    return from_sync.wait_for_call_in_loop_thread(begin_run)
  File "/Users/csicari/PycharmProjects/dummy_prefect/.venv/lib/python3.10/site-packages/prefect/_internal/concurrency/api.py", line 218, in wait_for_call_in_loop_thread
    return call.result()
  File "/Users/csicari/PycharmProjects/dummy_prefect/.venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 318, in result
    return self.future.result(timeout=timeout)
  File "/Users/csicari/PycharmProjects/dummy_prefect/.venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 179, in result
    return self.__get_result()
  File "/Users/csicari/.pyenv/versions/3.10.12/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/Users/csicari/PycharmProjects/dummy_prefect/.venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 389, in _run_async
    result = await coro
  File "/Users/csicari/PycharmProjects/dummy_prefect/.venv/lib/python3.10/site-packages/prefect/engine.py", line 1555, in get_task_call_return_value
    return await future._result()
  File "/Users/csicari/PycharmProjects/dummy_prefect/.venv/lib/python3.10/site-packages/prefect/futures.py", line 237, in _result
    return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
  File "/Users/csicari/PycharmProjects/dummy_prefect/.venv/lib/python3.10/site-packages/prefect/states.py", line 84, in _get_state_result
    raise UnfinishedRun(
prefect.exceptions.UnfinishedRun: Run is in PENDING state, its result is not available.

If the reducer is called as a normal function instead of a task (in brief: remove the task decorator) I get the desired result.

To me, this does not make sense since future.result()

Wait for a task to complete and retrieve its result