PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
17.45k stars 1.64k forks source link

Orjson TypeError when used with PostgreSQL database #8744

Open aeisenbarth opened 1 year ago

aeisenbarth commented 1 year ago

First check

Bug summary

When using Prefect with a PostgreSQL database and loading the UI flow-runs page, I get a TypeError "Integer exceeds 64-bit range" caused by https://github.com/ijl/orjson/issues/301.

I just wanted to mention it so that it is tracked here. The orjson bug report mentions a work-around which would require patching serialization code in Prefect.

My current workaround is not to configure a PostgreSQL database but resort to the default sqlite until the upstream issue is solved.

Reproduction

I deployed Prefect and PostgreSQL according to this docker-compose file (only URLs adjusted):

https://github.com/rpeden/prefect-docker-compose/blob/main/docker-compose.yml

I also tried running Prefect directly from a conda environment against this PostgreSQL database.

Error

Exception in ASGI application
Traceback (most recent call last):
  File " …/lib/python3.8/site-packages/uvicorn/protocols/http/h11_impl.py", line 407, in run_asgi
    result = await app(  # type: ignore[func-returns-value]
  File " …/lib/python3.8/site-packages/uvicorn/middleware/proxy_headers.py", line 78, in __call__
    return await self.app(scope, receive, send)
  File " …/lib/python3.8/site-packages/uvicorn/middleware/message_logger.py", line 86, in __call__
    raise exc from None
  File " …/lib/python3.8/site-packages/uvicorn/middleware/message_logger.py", line 82, in __call__
    await self.app(scope, inner_receive, inner_send)
  File " …/lib/python3.8/site-packages/fastapi/applications.py", line 274, in __call__
    await super().__call__(scope, receive, send)
  File " …/lib/python3.8/site-packages/starlette/applications.py", line 118, in __call__
    await self.middleware_stack(scope, receive, send)
  File " …/lib/python3.8/site-packages/starlette/middleware/errors.py", line 184, in __call__
    raise exc
  File " …/lib/python3.8/site-packages/starlette/middleware/errors.py", line 162, in __call__
    await self.app(scope, receive, _send)
  File " …/lib/python3.8/site-packages/starlette/middleware/cors.py", line 92, in __call__
    await self.simple_response(scope, receive, send, request_headers=headers)
  File " …/lib/python3.8/site-packages/starlette/middleware/cors.py", line 147, in simple_response
    await self.app(scope, receive, send)
  File " …/lib/python3.8/site-packages/starlette/middleware/exceptions.py", line 79, in __call__
    raise exc
  File " …/lib/python3.8/site-packages/starlette/middleware/exceptions.py", line 68, in __call__
    await self.app(scope, receive, sender)
  File " …/lib/python3.8/site-packages/fastapi/middleware/asyncexitstack.py", line 21, in __call__
    raise e
  File " …/lib/python3.8/site-packages/fastapi/middleware/asyncexitstack.py", line 18, in __call__
    await self.app(scope, receive, send)
  File " …/lib/python3.8/site-packages/starlette/routing.py", line 706, in __call__
    await route.handle(scope, receive, send)
  File " …/lib/python3.8/site-packages/starlette/routing.py", line 443, in handle
    await self.app(scope, receive, send)
  File " …/lib/python3.8/site-packages/fastapi/applications.py", line 274, in __call__
    await super().__call__(scope, receive, send)
  File " …/lib/python3.8/site-packages/starlette/applications.py", line 118, in __call__
    await self.middleware_stack(scope, receive, send)
  File " …/lib/python3.8/site-packages/starlette/middleware/errors.py", line 184, in __call__
    raise exc
  File " …/lib/python3.8/site-packages/starlette/middleware/errors.py", line 162, in __call__
    await self.app(scope, receive, _send)
  File " …/lib/python3.8/site-packages/starlette/middleware/exceptions.py", line 79, in __call__
    raise exc
  File " …/lib/python3.8/site-packages/starlette/middleware/exceptions.py", line 68, in __call__
    await self.app(scope, receive, sender)
  File " …/lib/python3.8/site-packages/fastapi/middleware/asyncexitstack.py", line 21, in __call__
    raise e
  File " …/lib/python3.8/site-packages/fastapi/middleware/asyncexitstack.py", line 18, in __call__
    await self.app(scope, receive, send)
  File " …/lib/python3.8/site-packages/starlette/routing.py", line 706, in __call__
    await route.handle(scope, receive, send)
  File " …/lib/python3.8/site-packages/starlette/routing.py", line 276, in handle
    await self.app(scope, receive, send)
  File " …/lib/python3.8/site-packages/starlette/routing.py", line 66, in app
    response = await func(request)
  File " …/lib/python3.8/site-packages/prefect/server/utilities/server.py", line 103, in handle_response_scoped_depends
    response = await default_handler(request)
  File " …/lib/python3.8/site-packages/fastapi/routing.py", line 238, in app
    raw_response = await run_endpoint_function(
  File " …/lib/python3.8/site-packages/fastapi/routing.py", line 164, in run_endpoint_function
    return await dependant.call(**values)
  File " …/lib/python3.8/site-packages/prefect/server/api/flow_runs.py", line 284, in read_flow_runs
    encoded = [
  File " …/lib/python3.8/site-packages/prefect/server/api/flow_runs.py", line 285, in <listcomp>
    schemas.responses.FlowRunResponse.from_orm(fr).dict(json_compatible=True)
  File " …/lib/python3.8/site-packages/prefect/server/utilities/schemas.py", line 293, in dict
    return json.loads(self.json(*args, **kwargs))
  File " …/lib/python3.8/site-packages/prefect/server/utilities/schemas.py", line 247, in json
    return super().json(*args, **kwargs)
  File "pydantic/main.py", line 505, in pydantic.main.BaseModel.json
  File " …/lib/python3.8/site-packages/prefect/server/utilities/schemas.py", line 126, in orjson_dumps
    return orjson.dumps(v, default=default).decode()
TypeError: Integer exceeds 64-bit range

Versions

Version:             2.8.4
API version:         0.8.4
Python version:      3.8.16
Git commit:          f09ccc32
Built:               Thu, Mar 2, 2023 12:34 PM
OS/Arch:             linux/x86_64
Profile:             default
Server type:         <client error>

Additional context

No response

zanieb commented 1 year ago

@aeisenbarth thanks for the report!

Do you have any clue what integer we're passing that would be this large?

This looks like a wont-fix upstream https://github.com/ijl/orjson/issues/116

aeisenbarth commented 1 year ago

I couldn't get a debugger stop at orjson_dumps in prefect server. But I printed the to-be-serialized object and it seems to be a FlowRunResponse from a previous flow run:

v = {
    'id': UUID('73ce02d0-6b1e-40b7-b37e-b08991b2ea6d'),
    'created': DateTime(2023, 3, 9, 15, 21, 28, 762739, tzinfo = Timezone('UTC')),
    'updated': DateTime(2023, 3, 9, 15, 21, 39, 828928, tzinfo = Timezone('UTC')),
    'name': 'batch_test.py::test_run_flow_from_deployment[flows0-parameters0-True-True]',
    'flow_id': UUID('d221605b-a6f0-4b92-bbcd-b3eddf03399a'),
    'state_id': UUID('8633aaaf-37a1-410c-9959-14b3c7d7adb6'),
    'deployment_id': None,
    'work_queue_name': 'default',
    'flow_version': '996b9d88712d1f8baddf7bb958c44972',
    'parameters': {
        'max_limit': 179769313486231570000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000
    },
    'idempotency_key': None,
    'context': {},
    'empirical_policy': {
        'max_retries': 0,
        'retry_delay_seconds': 0.0,
        'retries': 0,
        'retry_delay': 0,
        'pause_keys': set(),
        'resuming': False
    },
    'tags': [],
    'parent_task_run_id': None,
    'state_type': StateType.COMPLETED,
    'state_name': 'Completed',
    'run_count': 1,
    'expected_start_time': DateTime(2023, 3, 9, 15, 21, 28, 747364, tzinfo = Timezone('UTC')),
    'next_scheduled_start_time': None,
    'start_time': DateTime(2023, 3, 9, 15, 21, 39, 797307, tzinfo = Timezone('UTC')),
    'end_time': DateTime(2023, 3, 9, 15, 21, 39, 827905, tzinfo = Timezone('UTC')),
    'total_run_time': datetime.timedelta(microseconds = 30598),
    'estimated_run_time': datetime.timedelta(microseconds = 30598),
    'estimated_start_time_delta': Duration(seconds = 11, microseconds = 49943),
    'auto_scheduled': False,
    'infrastructure_document_id': UUID('0f04b26b-4ae5-4be5-b993-7a0aa6e424c9'),
    'infrastructure_pid': 'computer:594873',
    'created_by': None,
    'work_pool_name': 'default-agent-pool',
    'state': {
        'id': UUID('8633aaaf-37a1-410c-9959-14b3c7d7adb6'),
        'type': StateType.COMPLETED,
        'name': 'Completed',
        'timestamp': DateTime(2023, 3, 9, 15, 21, 39, 827905, tzinfo = Timezone('UTC')),
        'message': None,
        'data': {
            'type': 'literal',
            'value': None
        },
        'state_details': {
            'flow_run_id': UUID('73ce02d0-6b1e-40b7-b37e-b08991b2ea6d'),
            'task_run_id': None,
            'child_flow_run_id': None,
            'scheduled_time': None,
            'cache_key': None,
            'cache_expiration': None,
            'untrackable_result': False,
            'pause_timeout': None,
            'pause_reschedule': False,
            'pause_key': None,
            'refresh_cache': None
        }
    }
}

It seems the culprit is in our data. We had some code where a float64 (!) parameter defaults to the maximum value. This seemed to be valid Python code and did not appear as a problem as long as we used the sqlite database. But in combination with PostgreSQL, the float is somehow converted/stored/parsed as an integer, which is obviously bigger than 64-bit.

Here is a minimal example flow:

import sys
from prefect import flow

@flow
def my_flow(max_limit: float):
    return None

my_flow(max_limit=sys.float_info.max)

We can find a solution to avoid such large floats in our code. But still there is somewhere an underlying problem. I would at least expect the float to be still a float (possibly the maximum supported by the database, or even overlowed).