PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
15.96k stars 1.57k forks source link

default JSON serializer cannot serialize pydantic models #12224

Open li-dennis opened 6 months ago

li-dennis commented 6 months ago

First check

Bug summary

I am trying to turn on result persistence with 'json' as the default serializer. As I understand from the docs, the json serializer supports pydantic models

We supply a custom JSON serializer at prefect.serializers.JSONSerializer. Prefect's JSON serializer uses custom hooks by default to support more object types. Specifically, we add support for all types supported by Pydantic.

And looking through issues, it looks like prefect supports pydantic v1 and v2 https://github.com/PrefectHQ/prefect/issues/10145#issuecomment-1791675801

I have tried to enable json serialization for all flows/tasks for debugging purposes, but I quickly run into errors

Reproduction

export PREFECT_RESULTS_PERSIST_BY_DEFAULT=true
export PREFECT_RESULTS_DEFAULT_SERIALIZER='json'
from prefect import flow, task
from pydantic import BaseModel

class Foo(BaseModel, frozen=True):
    id: int
    description: str
    is_alive: bool

@task
def test_task():
    return Foo(id=1, description="test", is_alive=True)

@flow
def test_flow():
    return test_task()

test_flow()

### Error

```python3
13:41:18.960 | DEBUG   | prefect.profiles - Using profile 'default'
13:41:18.977 | DEBUG   | prefect.client - Connecting to API at http://127.0.0.1:4200/api/
13:41:19.236 | INFO    | prefect.engine - Created flow run 'stimulating-myna' for flow 'test-flow'
13:41:19.237 | INFO    | Flow run 'stimulating-myna' - View at http://127.0.0.1:4200/flow-runs/flow-run/87cc76b9-d9d3-42aa-ac43-4428d331b23b
13:41:19.237 | DEBUG   | Flow run 'stimulating-myna' - Starting 'ConcurrentTaskRunner'; submitted tasks will be run concurrently...
13:41:19.242 | DEBUG   | prefect.task_runner.concurrent - Starting task runner...
13:41:19.262 | DEBUG   | prefect.client - Connecting to API at http://127.0.0.1:4200/api/
13:41:19.359 | DEBUG   | Flow run 'stimulating-myna' - Executing flow 'test-flow' for flow run 'stimulating-myna'...
13:41:19.360 | DEBUG   | Flow run 'stimulating-myna' - Beginning execution...
13:41:19.378 | INFO    | Flow run 'stimulating-myna' - Created task run 'test_task-0' for task 'test_task'
13:41:19.379 | INFO    | Flow run 'stimulating-myna' - Executing 'test_task-0' immediately...
13:41:19.815 | DEBUG   | Task run 'test_task-0' - Beginning execution...
13:41:19.818 | ERROR   | Task run 'test_task-0' - Crash detected! Execution was interrupted by an unexpected exception: TypeError: Object of type 'Foo' is not JSON serializable
13:41:19.818 | DEBUG   | Task run 'test_task-0' - Crash details:
Traceback (most recent call last):
  File "/path/to/.venv/lib/python3.12/site-packages/prefect/engine.py", line 2367, in report_task_run_crashes
    yield
  File "/path/to/.venv/lib/python3.12/site-packages/prefect/engine.py", line 1852, in begin_task_run
    state = await orchestrate_task_run(
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/path/to/.venv/lib/python3.12/site-packages/prefect/engine.py", line 2170, in orchestrate_task_run
    terminal_state = await return_value_to_state(
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/path/to/.venv/lib/python3.12/site-packages/prefect/states.py", line 305, in return_value_to_state
    return Completed(data=await result_factory.create_result(data))
                          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/path/to/.venv/lib/python3.12/site-packages/prefect/results.py", line 452, in create_result
    return await PersistedResult.create(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/path/to/.venv/lib/python3.12/site-packages/prefect/results.py", line 661, in create
    data = serializer.dumps(obj)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/path/to/.venv/lib/python3.12/site-packages/prefect/serializers.py", line 222, in dumps
    result = json.dumps(data, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/python@3.12/3.12.2_1/Frameworks/Python.framework/Versions/3.12/lib/python3.12/json/__init__.py", line 238, in dumps
    **kw).encode(obj)
          ^^^^^^^^^^^
  File "/opt/homebrew/Cellar/python@3.12/3.12.2_1/Frameworks/Python.framework/Versions/3.12/lib/python3.12/json/encoder.py", line 200, in encode
    chunks = self.iterencode(o, _one_shot=True)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/python@3.12/3.12.2_1/Frameworks/Python.framework/Versions/3.12/lib/python3.12/json/encoder.py", line 258, in iterencode
    return _iterencode(o, 0)
           ^^^^^^^^^^^^^^^^^
  File "/path/to/.venv/lib/python3.12/site-packages/prefect/serializers.py", line 48, in prefect_json_object_encoder
    "data": pydantic_encoder(obj),
            ^^^^^^^^^^^^^^^^^^^^^
  File "/path/to/.venv/lib/python3.12/site-packages/pydantic/v1/json.py", line 90, in pydantic_encoder
    raise TypeError(f"Object of type '{obj.__class__.__name__}' is not JSON serializable")
TypeError: Object of type 'Foo' is not JSON serializable
13:41:19.833 | DEBUG   | prefect.engine - Reported crashed task run 'test_task-0' successfully!
13:41:19.834 | ERROR   | Flow run 'stimulating-myna' - Encountered exception during execution:
Traceback (most recent call last):
  File "/path/to/.venv/lib/python3.12/site-packages/prefect/engine.py", line 867, in orchestrate_flow_run
    result = await flow_call.aresult()
             ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/path/to/.venv/lib/python3.12/site-packages/prefect/_internal/concurrency/calls.py", line 326, in aresult
    return await asyncio.wrap_future(self.future)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/path/to/.venv/lib/python3.12/site-packages/prefect/_internal/concurrency/calls.py", line 351, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/path/to/test.py", line 18, in test_flow
    return test_task()
           ^^^^^^^^^^^
  File "/path/to/.venv/lib/python3.12/site-packages/prefect/tasks.py", line 587, in __call__
    return enter_task_run_engine(
           ^^^^^^^^^^^^^^^^^^^^^^
  File "/path/to/.venv/lib/python3.12/site-packages/prefect/engine.py", line 1421, in enter_task_run_engine
    return from_sync.wait_for_call_in_loop_thread(begin_run)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/path/to/.venv/lib/python3.12/site-packages/prefect/_internal/concurrency/api.py", line 243, in wait_for_call_in_loop_thread
    return call.result()
           ^^^^^^^^^^^^^
  File "/path/to/.venv/lib/python3.12/site-packages/prefect/_internal/concurrency/calls.py", line 317, in result
    return self.future.result(timeout=timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/path/to/.venv/lib/python3.12/site-packages/prefect/_internal/concurrency/calls.py", line 178, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/python@3.12/3.12.2_1/Frameworks/Python.framework/Versions/3.12/lib/python3.12/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/path/to/.venv/lib/python3.12/site-packages/prefect/_internal/concurrency/calls.py", line 388, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "/path/to/.venv/lib/python3.12/site-packages/prefect/engine.py", line 1601, in get_task_call_return_value
    return await future._result()
           ^^^^^^^^^^^^^^^^^^^^^^
  File "/path/to/.venv/lib/python3.12/site-packages/prefect/futures.py", line 237, in _result
    return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/path/to/.venv/lib/python3.12/site-packages/prefect/states.py", line 91, in _get_state_result
    raise await get_state_exception(state)
  File "/path/to/.venv/lib/python3.12/site-packages/prefect/task_runners.py", line 231, in submit
    result = await call()
             ^^^^^^^^^^^^
  File "/path/to/.venv/lib/python3.12/site-packages/prefect/engine.py", line 1852, in begin_task_run
    state = await orchestrate_task_run(
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/path/to/.venv/lib/python3.12/site-packages/prefect/engine.py", line 2170, in orchestrate_task_run
    terminal_state = await return_value_to_state(
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/path/to/.venv/lib/python3.12/site-packages/prefect/states.py", line 305, in return_value_to_state
    return Completed(data=await result_factory.create_result(data))
                          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/path/to/.venv/lib/python3.12/site-packages/prefect/results.py", line 452, in create_result
    return await PersistedResult.create(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/path/to/.venv/lib/python3.12/site-packages/prefect/results.py", line 661, in create
    data = serializer.dumps(obj)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/path/to/.venv/lib/python3.12/site-packages/prefect/serializers.py", line 222, in dumps
    result = json.dumps(data, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/python@3.12/3.12.2_1/Frameworks/Python.framework/Versions/3.12/lib/python3.12/json/__init__.py", line 238, in dumps
    **kw).encode(obj)
          ^^^^^^^^^^^
  File "/opt/homebrew/Cellar/python@3.12/3.12.2_1/Frameworks/Python.framework/Versions/3.12/lib/python3.12/json/encoder.py", line 200, in encode
    chunks = self.iterencode(o, _one_shot=True)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/python@3.12/3.12.2_1/Frameworks/Python.framework/Versions/3.12/lib/python3.12/json/encoder.py", line 258, in iterencode
    return _iterencode(o, 0)
           ^^^^^^^^^^^^^^^^^
  File "/path/to/.venv/lib/python3.12/site-packages/prefect/serializers.py", line 48, in prefect_json_object_encoder
    "data": pydantic_encoder(obj),
            ^^^^^^^^^^^^^^^^^^^^^
  File "/path/to/.venv/lib/python3.12/site-packages/pydantic/v1/json.py", line 90, in pydantic_encoder
    raise TypeError(f"Object of type '{obj.__class__.__name__}' is not JSON serializable")
TypeError: Object of type 'Foo' is not JSON serializable

### Versions

```Text
Version:             2.16.3
API version:         0.8.4
Python version:      3.12.2
Git commit:          e3f02c00
Built:               Thu, Mar 7, 2024 4:56 PM
OS/Arch:             darwin/arm64
Profile:             default
Server type:         server


### Additional context

_No response_
zzstoatzz commented 6 months ago

hi @li-dennis - i can reproduce this, thank you for the issue!

we'll get this looked at

can you share your version of pydantic please?

li-dennis commented 6 months ago

sure -- pydantic==2.6.3. thanks for the prompt response!!

hi @li-dennis - i can reproduce this, thank you for the issue!

we'll get this looked at

can you share your version of pydantic please?

li-dennis commented 5 months ago

For the time being, i've created a new custom serializer to handle this, but it's far from ergonomic.

class Pydantic2JsonSerializer(JSONSerializer, Generic[PydanticModel]):
    def dumps(self, data: PydanticModel) -> bytes:
        result = data.model_dump_json(indent=2).encode("utf-8")
        return result

    def loads(self, blob: bytes) -> str:
        return json.loads(blob.decode("utf-8"))

As it seems like, in serializers.py, only pydantic v1 is supported


if HAS_PYDANTIC_V2:
    import pydantic.v1 as pydantic
    from pydantic.v1 import BaseModel
    from pydantic.v1.json import pydantic_encoder
else:
    import pydantic
    from pydantic import BaseModel
    from pydantic.json import pydantic_encoder
zzstoatzz commented 3 months ago

hi @li-dennis - apologies for the delayed response!

lately we've been working on prefect 3.x which has native pydantic 2 support, so if you now pip install -U prefect --pre your original example should work.

Do you want to keep this issue open for 2.x versions (where we do not plan to use pydantic 2 internals) or close it since the behavior is tested and working in 3.x?