PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
15.82k stars 1.55k forks source link

Cannot pass dict as a parameter 'data' to Completed state on Prefect v3.0.0rc16 #14927

Open ihor-ramskyi-globallogic opened 1 month ago

ihor-ramskyi-globallogic commented 1 month ago

Bug summary

I cannot pass dict as parameter 'data' on Completed state on Prefect v3.0.0rc16. This is the code on which it fails:

from prefect import flow, task
from prefect.states import Completed

@task
def task_a():
    return Completed(message='A', data={'A': 1})

@flow
def some_flow():
    a = task_a.submit().result()
    return

if __name__ == '__main__':
    some_flow()

As result of running this code, I get such log and traceback:

logs and traceback ```python 17:28:58.177 | INFO | prefect.engine - Created flow run 'energetic-coot' for flow 'some-flow' 17:28:58.179 | INFO | prefect.engine - View at http://127.0.0.1:4200/runs/flow-run/7b207023-a0a2-4eda-816b-218ace94fb46 17:28:58.271 | INFO | Flow run 'energetic-coot' - Submitting task task_a to thread pool executor... 17:28:58.373 | INFO | Task run 'task_a-0' - Created task run 'task_a-0' for task 'task_a' 17:28:58.458 | ERROR | Task run 'task_a-0' - Task run failed with exception: TypeError("Can't instantiate abstract class BaseResult with abstract methods create, get") - Retries are exhausted Traceback (most recent call last): File "My_Path\lib\site-packages\prefect\task_engine.py", line 804, in run_context yield self File "My_Path\lib\site-packages\prefect\task_engine.py", line 1420, in run_task_sync engine.call_task_fn(txn) File "My_Path\lib\site-packages\prefect\task_engine.py", line 832, in call_task_fn result = call_with_parameters(self.task.fn, parameters) File "My_Path\lib\site-packages\prefect\utilities\callables.py", line 208, in call_with_parameters return fn(*args, **kwargs) File "C:\Users\ihor.ramskyi\PycharmProjects\Prefect_Tutorial\test_flow4.py", line 6, in task_a return Completed(message='A', data={'A': 1}) File "My_Path\lib\site-packages\prefect\states.py", line 556, in Completed return cls(type=StateType.COMPLETED, **kwargs) File "My_Path\lib\site-packages\pydantic\main.py", line 176, in __init__ self.__pydantic_validator__.validate_python(data, self_instance=self) File "My_Path\lib\site-packages\prefect\results.py", line 405, in __new__ return super().__new__(cls) TypeError: Can't instantiate abstract class BaseResult with abstract methods create, get 17:28:58.495 | ERROR | Task run 'task_a-0' - Finished in state Failed("Task run encountered an exception TypeError: Can't instantiate abstract class BaseResult with abstract methods create, get") 17:28:58.498 | ERROR | Flow run 'energetic-coot' - Encountered exception during execution: TypeError("Can't instantiate abstract class BaseResult with abstract methods create, get") Traceback (most recent call last): File "My_Path\lib\site-packages\prefect\flow_engine.py", line 636, in run_context yield self File "My_Path\lib\site-packages\prefect\flow_engine.py", line 680, in run_flow_sync engine.call_flow_fn() File "My_Path\lib\site-packages\prefect\flow_engine.py", line 659, in call_flow_fn result = call_with_parameters(self.flow.fn, self.parameters) File "My_Path\lib\site-packages\prefect\utilities\callables.py", line 208, in call_with_parameters return fn(*args, **kwargs) File "C:\Users\ihor.ramskyi\PycharmProjects\Prefect_Tutorial\test_flow4.py", line 11, in some_flow a = task_a.submit().result() File "My_Path\lib\site-packages\prefect\futures.py", line 164, in result _result = self._final_state.result( File "My_Path\lib\site-packages\prefect\client\schemas\objects.py", line 261, in result return get_state_result( File "My_Path\lib\site-packages\prefect\states.py", line 68, in get_state_result return _get_state_result( File "My_Path\lib\site-packages\prefect\utilities\asyncutils.py", line 392, in coroutine_wrapper return run_coro_as_sync(ctx_call()) File "My_Path\lib\site-packages\prefect\utilities\asyncutils.py", line 243, in run_coro_as_sync return call.result() File "My_Path\lib\site-packages\prefect\_internal\concurrency\calls.py", line 312, in result return self.future.result(timeout=timeout) File "My_Path\lib\site-packages\prefect\_internal\concurrency\calls.py", line 182, in result return self.__get_result() File "C:\Users\ihor.ramskyi\AppData\Local\Programs\Python\Python310\lib\concurrent\futures\_base.py", line 403, in __get_result raise self._exception File "My_Path\lib\site-packages\prefect\_internal\concurrency\calls.py", line 383, in _run_async result = await coro File "My_Path\lib\site-packages\prefect\utilities\asyncutils.py", line 225, in coroutine_wrapper return await task File "My_Path\lib\site-packages\prefect\utilities\asyncutils.py", line 382, in ctx_call result = await async_fn(*args, **kwargs) File "My_Path\lib\site-packages\prefect\states.py", line 127, in _get_state_result raise await get_state_exception(state) File "My_Path\lib\site-packages\prefect\task_engine.py", line 804, in run_context yield self File "My_Path\lib\site-packages\prefect\task_engine.py", line 1420, in run_task_sync engine.call_task_fn(txn) File "My_Path\lib\site-packages\prefect\task_engine.py", line 832, in call_task_fn result = call_with_parameters(self.task.fn, parameters) File "My_Path\lib\site-packages\prefect\utilities\callables.py", line 208, in call_with_parameters return fn(*args, **kwargs) File "C:\Users\ihor.ramskyi\PycharmProjects\Prefect_Tutorial\test_flow4.py", line 6, in task_a return Completed(message='A', data={'A': 1}) File "My_Path\lib\site-packages\prefect\states.py", line 556, in Completed return cls(type=StateType.COMPLETED, **kwargs) File "My_Path\lib\site-packages\pydantic\main.py", line 176, in __init__ self.__pydantic_validator__.validate_python(data, self_instance=self) File "My_Path\lib\site-packages\prefect\results.py", line 405, in __new__ return super().__new__(cls) TypeError: Can't instantiate abstract class BaseResult with abstract methods create, get 17:28:58.543 | ERROR | Flow run 'energetic-coot' - Finished in state Failed("Flow run encountered an exception: TypeError: Can't instantiate abstract class BaseResult with abstract methods create, get") Traceback (most recent call last): File "C:\Users\ihor.ramskyi\PycharmProjects\Prefect_Tutorial\test_flow4.py", line 15, in some_flow() File "My_Path\lib\site-packages\prefect\flows.py", line 1322, in __call__ return run_flow( File "My_Path\lib\site-packages\prefect\flow_engine.py", line 802, in run_flow return run_flow_sync(**kwargs) File "My_Path\lib\site-packages\prefect\flow_engine.py", line 682, in run_flow_sync return engine.state if return_type == "state" else engine.result() File "My_Path\lib\site-packages\prefect\flow_engine.py", line 246, in result raise self._raised File "My_Path\lib\site-packages\prefect\flow_engine.py", line 636, in run_context yield self File "My_Path\lib\site-packages\prefect\flow_engine.py", line 680, in run_flow_sync engine.call_flow_fn() File "My_Path\lib\site-packages\prefect\flow_engine.py", line 659, in call_flow_fn result = call_with_parameters(self.flow.fn, self.parameters) File "My_Path\lib\site-packages\prefect\utilities\callables.py", line 208, in call_with_parameters return fn(*args, **kwargs) File "C:\Users\ihor.ramskyi\PycharmProjects\Prefect_Tutorial\test_flow4.py", line 11, in some_flow a = task_a.submit().result() File "My_Path\lib\site-packages\prefect\futures.py", line 164, in result _result = self._final_state.result( File "My_Path\lib\site-packages\prefect\client\schemas\objects.py", line 261, in result return get_state_result( File "My_Path\lib\site-packages\prefect\states.py", line 68, in get_state_result return _get_state_result( File "My_Path\lib\site-packages\prefect\utilities\asyncutils.py", line 392, in coroutine_wrapper return run_coro_as_sync(ctx_call()) File "My_Path\lib\site-packages\prefect\utilities\asyncutils.py", line 243, in run_coro_as_sync return call.result() File "My_Path\lib\site-packages\prefect\_internal\concurrency\calls.py", line 312, in result return self.future.result(timeout=timeout) File "My_Path\lib\site-packages\prefect\_internal\concurrency\calls.py", line 182, in result return self.__get_result() File "C:\Users\ihor.ramskyi\AppData\Local\Programs\Python\Python310\lib\concurrent\futures\_base.py", line 403, in __get_result raise self._exception File "My_Path\lib\site-packages\prefect\_internal\concurrency\calls.py", line 383, in _run_async result = await coro File "My_Path\lib\site-packages\prefect\utilities\asyncutils.py", line 225, in coroutine_wrapper return await task File "My_Path\lib\site-packages\prefect\utilities\asyncutils.py", line 382, in ctx_call result = await async_fn(*args, **kwargs) File "My_Path\lib\site-packages\prefect\states.py", line 127, in _get_state_result raise await get_state_exception(state) File "My_Path\lib\site-packages\prefect\task_engine.py", line 804, in run_context yield self File "My_Path\lib\site-packages\prefect\task_engine.py", line 1420, in run_task_sync engine.call_task_fn(txn) File "My_Path\lib\site-packages\prefect\task_engine.py", line 832, in call_task_fn result = call_with_parameters(self.task.fn, parameters) File "My_Path\lib\site-packages\prefect\utilities\callables.py", line 208, in call_with_parameters return fn(*args, **kwargs) File "C:\Users\ihor.ramskyi\PycharmProjects\Prefect_Tutorial\test_flow4.py", line 6, in task_a return Completed(message='A', data={'A': 1}) File "My_Path\lib\site-packages\prefect\states.py", line 556, in Completed return cls(type=StateType.COMPLETED, **kwargs) File "My_Path\lib\site-packages\pydantic\main.py", line 176, in __init__ self.__pydantic_validator__.validate_python(data, self_instance=self) File "My_Path\lib\site-packages\prefect\results.py", line 405, in __new__ return super().__new__(cls) TypeError: Can't instantiate abstract class BaseResult with abstract methods create, get ``` However, same code works correctly (returning Complete state from both the task and the flow), when I run it on Prefect v2.20.0 or when I run it on Prefect v3.0.0rc16 but pass `data=1`, `data='abcde'`, or even `data=(1, {'A': 1})`. ### Version info (`prefect version` output) ```Text Version: 3.0.0rc16 API version: 0.8.4 Python version: 3.10.11 Git commit: aad66936 Built: Mon, Aug 12, 2024 3:51 PM OS/Arch: win32/AMD64 Profile: default Server type: server Pydantic version: 2.7.1 ```

Additional context

No response

zzstoatzz commented 1 month ago

thanks for the issue @ihor-ramskyi-globallogic - we'll take a look here

just out of curiosity, what's your use case for passing data to a Completed state? i.e. how did you use data downstream in 2.x?

zzstoatzz commented 1 month ago

note to self, here's a more concise repro

In [1]: from prefect.states import Completed

In [2]: Completed(data={"A": 1})
...
File ~/github.com/prefecthq/prefect/src/prefect/results.py:407, in BaseResult.__new__(cls, **kwargs)
    405     return super().__new__(subcls)
    406 else:
--> 407     return super().__new__(cls)

TypeError: Can't instantiate abstract class BaseResult without an implementation for abstract methods 'create', 'get'
cicdw commented 1 month ago

@zzstoatzz seems like in this case we should dispatch to an UnpersistedResult type in the creation of the BaseResult?

ihor-ramskyi-globallogic commented 1 month ago

thanks for the issue @ihor-ramskyi-globallogic - we'll take a look here

just out of curiosity, what's your use case for passing data to a Completed state? i.e. how did you use data downstream in 2.x?

I return Failed if task should be failed for any reason - and for unified approach, I use Completed otherwise. So, I still have to return data from the task, and I put it as data of Completed state.

However, I cannot use type dict for data for Failed state anyways... Because it says it cannot resolve dict in exception.

ihor-ramskyi-globallogic commented 1 week ago

@zzstoatzz sorry to bother you, but is there a plan to merge the fix in the near future?

zzstoatzz commented 1 week ago

hi @ihor-ramskyi-globallogic - thanks for the bump

i hesitated with that PR because I couldn't reason about the solution, even though it did appear to accomplish the desired outcome. I'll revisit it as soon as I can