PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
16.4k stars 1.59k forks source link

Creating flow run from deployment fails with Pydantic 2 parameters #12270

Open aeisenbarth opened 7 months ago

aeisenbarth commented 7 months ago

First check

Bug summary

When a flow uses Pydantic 2 models and is run from a deployment, orjson fails to serialize them.

This is similar to https://github.com/PrefectHQ/prefect/issues/9536, except that parameters don't use a custom class, but a normal Pydantic model, which seems to be supported by Prefect. This issue is blocking the switch to Pydantic 2 for me.

Reproduction

def test_prefect_pydantic_serialization():
    from prefect import Flow, flow
    from prefect.deployments import Deployment, run_deployment
    from pydantic import BaseModel  # pydantic 2!

    class Parameters(BaseModel):
        param: int = 1

    @flow
    def my_flow(parameters: Parameters):
        pass

    parameters = Parameters(param=2)

    # Works
    my_flow(parameters=parameters)

    # Fails
    deployment = Deployment.build_from_flow(flow=my_flow, name="test")
    deployment_id = deployment.apply(upload=True)
    deployment_name = f"{deployment.flow_name}/{deployment.name}"
    run_deployment(name=deployment_name, parameters=dict(parameters=parameters))

Error

src/spacem-batch/spacem_batch/tests/workflow_spacem_reprocessing_test.py:185 (test_prefect_pydantic_serialization)
TypeError: Object of type 'Parameters' is not JSON serializable

The above exception was the direct cause of the following exception:

    def test_prefect_pydantic_serialization():
        from prefect import Flow, flow
        from prefect.deployments import Deployment, run_deployment
        from pydantic import BaseModel  # pydantic 2!

        class Parameters(BaseModel):
            param: int = 1

        @flow
        def my_flow(parameters: Parameters):
            pass

        parameters = Parameters(param=2)

        # Works
        my_flow(parameters=parameters)

        # Fails
        deployment = Deployment.build_from_flow(flow=my_flow, name="test")
        deployment_id = deployment.apply(upload=True)
        deployment_name = f"{deployment.flow_name}/{deployment.name}"
>       run_deployment(name=deployment_name, parameters=dict(parameters=parameters))

workflow_spacem_reprocessing_test.py:207: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
…/lib/python3.10/site-packages/prefect/utilities/asyncutils.py:259: in coroutine_wrapper
    return call()
…/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py:431: in __call__
    return self.result()
…/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py:317: in result
    return self.future.result(timeout=timeout)
…/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py:178: in result
    return self.__get_result()
…/lib/python3.10/concurrent/futures/_base.py:403: in __get_result
    raise self._exception
…/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py:388: in _run_async
    result = await coro
…/lib/python3.10/site-packages/prefect/client/utilities.py:51: in with_injected_client
    return await fn(*args, **kwargs)
…/lib/python3.10/site-packages/prefect/deployments/deployments.py:188: in run_deployment
    flow_run = await client.create_flow_run_from_deployment(
…/lib/python3.10/site-packages/prefect/client/orchestration.py:560: in create_flow_run_from_deployment
    json=flow_run_create.dict(json_compatible=True, exclude_unset=True),
…/lib/python3.10/site-packages/prefect/_internal/schemas/bases.py:157: in dict
    return json.loads(self.json(*args, **kwargs))
…/lib/python3.10/site-packages/prefect/_internal/schemas/bases.py:111: in json
    return super().json(*args, **kwargs)
…/lib/python3.10/site-packages/pydantic/v1/main.py:504: in json
    return self.__config__.json_dumps(data, default=encoder, **dumps_kwargs)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

v = {'context': {}, 'idempotency_key': None, 'name': 'blond-ibex', 'parameters': {'parameters': Parameters(param=2)}, ...}

    def orjson_dumps_extra_compatible(v: Any, *, default: Any) -> str:
        """
        Utility for dumping a value to JSON using orjson, but allows for
        1) non-string keys: this is helpful for situations like pandas dataframes,
        which can result in non-string keys
        2) numpy types: for serializing numpy arrays

        orjson.dumps returns bytes, to match standard json.dumps we need to decode.
        """
>       return orjson.dumps(
            v, default=default, option=orjson.OPT_NON_STR_KEYS | orjson.OPT_SERIALIZE_NUMPY
        ).decode()
E       TypeError: Type is not JSON serializable: Parameters

Versions

Version:             2.16.3
API version:         0.8.4
Python version:      3.10.13
Git commit:          e3f02c00
Built:               Thu, Mar 7, 2024 4:56 PM
OS/Arch:             linux/x86_64
Profile:             default
Server type:         ephemeral
Server:
  Database:          sqlite
  SQLite version:    3.41.2

Additional context

prefect==2.16.3 # same for prefect==2.14.20, prefect==2.15.0 pydantic==2.6.3 # same for pydantic==2.0.0 orjson==3.9.15

aaazzam commented 7 months ago

Hey @aeisenbarth! That's for the reproducible example, I can confirm it also throws an error on my end.

I have 👀 on this!

aaazzam commented 7 months ago

Hey @aeisenbarth!

Did some digging, if you:

then I was able to get your provided example to run as expected. Do either of these seem satisfactory to you?

aeisenbarth commented 7 months ago

Thanks, that is an ingenious idea! The first one is ok for me.

kkondamadugula commented 5 months ago

@desertaxle @tardunge I am trying to run a simple spark-operator job on prefect server by following the instructions provided here: https://github.com/tardunge/prefect-spark-on-k8s-operator/blob/main/README.md. When I try to run the job locally (python3.11 job.py), I run into "prefect.exceptions.ScriptError: Script is attached below encountered an exception: TypeError("Object of type 'FieldInfo' is not JSON serializable")". Note: I am running the code inside a container.

Attaching simple reproducible code for the pydantic errors. `from pydantic import Field, version as pydantic_version from prefect.blocks.core import Block

class Bar(Block): interval_seconds: int = Field( default=5, description="The number of seconds to wait between application status checks.", ) b = Bar() print(f"{pydantic_version=}") if pydantic_version >= '2': print(b.interval_seconds) else: print(b.interval_seconds)`

Temporary Fix: I was able to workaround this issue 2 ways: 1) Pass the parameters directly without dict'ing them in app.py in the source package code. 2) Downgrading to pydantic 1.10 from 2.x in my image container makes it work without needing to make pydantic/dict changes in app.py.

Ideally, I would like to use the integration feature without making any modifications straight off the shelf.