PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
15.73k stars 1.53k forks source link

Passing pydantic recursive type to at @flow as argument causes error #11524

Open kzeitz opened 8 months ago

kzeitz commented 8 months ago

First check

Bug summary

I was working on passing json to my deployment. I defined a pydantic schema that describes the json. One element of the schema is self referencing or recursive. If I define a flow function and pass the recursive type or any other type that includes the recursive type I get the error.

Reproduction

from __future__ import annotations
import os
import json
from typing import List, Optional
from pydantic import BaseModel
from prefect import flow, get_run_logger
#from prefect.deployments import Deployment

class task(BaseModel):
    task_name: str
    container_name: str
    ecs_image: str

class container(BaseModel):
    sep: str
    runMode: str
    runType: str
    prtAPIBaseURLKey: str
    ftpUserPassword: str

class model(BaseModel):
    s3ModelFolders: str
    s3MECFolders: str
    make7DayModel: bool
    outputModelServer: str
    outputModelFolders: str
    outputWebServer: str
    outputWebFolders: str
    inputUnityServer: str

class copyBack(BaseModel):
    task: task
    container: container
    model: model        

class _model(BaseModel):
    model_config = { 'protected_namespaces': () }
    process_name: str
    worker_flavor: str
    model_type: str
    flow_name: str
    model_name: str
    model_stage: str
    forecast_type: Optional [str] = None
    forecast_zone: Optional [str] = None
    weather_forecast_combine_method: str
    copy_back: Optional [copyBack] = None
    model_dependencies: Optional [List[_model]] = None

class _modelExecution(BaseModel):
    plan: List[_model]

@flow(name="Simplify Model Execution with json validation")
def define_flow(model_execution: _model):
    logger = get_run_logger()

json_str = json.dumps({
    "plan":[
        {
            "process_name": "forecast",
            "worker_flavor": "ensemble",
            "model_type": "ensemble",
            "flow_name": "ENSEMBLE TOTAL LOAD NORMAL",
            "model_name": "ERCOT_LOAD_TOTAL_ENSEMBLE_WEIGHTEDENSEMBLE",
            "model_stage": "Production",
            "forecast_type": "load",
            "forecast_zone": "total",
            "weather_forecast_combine_method": "Normal",
            "model_dependencies": [
                {
                    "process_name": "forecast",
                    "worker_flavor": "tensorflow",
                    "model_name": "ERCOT_LOAD_TOTAL_NELF_TENSORFLOW",
                    "model_stage": "Production",
                    "flow_name": "TOTAL LOAD NELF NORMAL",
                    "model_type": "nelf",
                    "weather_forecast_combine_method": "Normal",
                    "model_dependencies": []
                },
                {
                    "process_name": "forecast",
                    "worker_flavor": "tensorflow",
                    "model_name": "ERCOT_LOAD_TOTAL_WOLF_TENSORFLOW",
                    "model_stage": "Production",
                    "model_type": "wolf",
                    "flow_name": "TOTAL LOAD WOLF NORMAL",
                    "weather_forecast_combine_method": "Normal",
                    "model_dependencies": []
                }
            ],
            "copy_back": {
                "task": {
                    "task_name": "prt-fs1-servercore",
                    "container_name": "ercot_ercotloadtensorflow",
                    "ecs_image": "040927785588.dkr.ecr.us-east-1.amazonaws.com/prt-fs1-servercore: 1.0.9"
                },
                "container": {
                    "sep": "|",
                    "runMode": "default|America/Chicago",
                    "runType": "executePost-process",
                    "prtAPIBaseURLKey": "https://app.drillinginfo.com|2_oojj8Bcu3JTub32aZ9oP4OPSDpAzllSmsn9qPCayYKtogXANP3GZEaFxqVQer7",
                    "ftpUserPassword": "CopyBack|copyback1"
                },
                "model": {
                    "s3ModelFolders": "enverus-pr-ue1-mlops-production-prod|legacy/fs1/d_root|main/iso_ercot/ercotloadtensorflow/ercotload15",
                    "s3MECFolders": "model-execution-context|ERCOT_LOAD_TOTAL_ENSEMBLE_WEIGHTEDENSEMBLE|ERCOT_LOAD_TOTAL_NELF_TENSORFLOW",
                    "make7DayModel": "True",
                    "outputModelServer": "fileserver|52.54.73.39|d/main/iso_ercot/ercotloadtensorflow|*.*|post.ps1;*_load.act;.err;band/;forecast/;html/;weather/",
                    "outputModelFolders": "ercotload15simplify|ercotload07simplify",
                    "outputWebServer": "fileserver|52.54.73.39|f/inetpub/wwwroot/forecast/Private|*.htm*;*.csv|band/;forecast/;html/;weather/",
                    "outputWebFolders": "ercot15simplify|ercot07simplify",
                    "inputUnityServer": "dapi---b7360|ERCOT_LOAD_TOTAL_WOLF_TENSORFLOW_CATCH_UP|ERCOT_LOAD_TOTAL_NELF_TENSORFLOW_CATCH_UP",
                }
            }
        },
        {
            "process_name": "forecast",
            "worker_flavor": "ensemble",
            "model_type": "ensemble",
            "flow_name": "ENSEMBLE TOTAL LOAD HIGH",
            "model_name": "ERCOT_LOAD_TOTAL_ENSEMBLE_WEIGHTEDENSEMBLE",
            "model_stage": "Production",
            "forecast_type": "load",
            "forecast_zone": "total",
            "weather_forecast_combine_method": "HighBand",
            "model_dependencies": [
                {
                    "process_name": "forecast",
                    "worker_flavor": "tensorflow",
                    "model_name": "ERCOT_LOAD_TOTAL_NELF_TENSORFLOW",
                    "model_stage": "Production",
                    "model_type": "nelf",
                    "flow_name": "TOTAL LOAD NELF HIGH",
                    "weather_forecast_combine_method": "HighBand",
                    "model_dependencies": []
                },
                {
                    "process_name": "forecast",
                    "worker_flavor": "tensorflow",
                    "model_name": "ERCOT_LOAD_TOTAL_WOLF_TENSORFLOW",
                    "model_stage": "Production",
                    "model_type": "wolf",
                    "flow_name": "TOTAL LOAD WOLF HIGH",
                    "weather_forecast_combine_method": "HighBand",
                    "model_dependencies": []
                }
            ]
        },
        {
            "process_name": "forecast",
            "worker_flavor": "ensemble",
            "model_type": "ensemble",
            "flow_name": "ENSEMBLE TOTAL LOAD LOW",
            "model_name": "ERCOT_LOAD_TOTAL_ENSEMBLE_WEIGHTEDENSEMBLE",
            "model_stage": "Production",
            "forecast_type": "load",
            "forecast_zone": "total",
            "weather_forecast_combine_method": "LowBand",
            "model_dependencies": [
                {
                    "process_name": "forecast",
                    "worker_flavor": "tensorflow",
                    "model_name": "ERCOT_LOAD_TOTAL_NELF_TENSORFLOW",
                    "model_stage": "Production",
                    "model_type": "nelf",
                    "flow_name": "TOTAL LOAD NELF LOW",
                    "weather_forecast_combine_method": "LowBand",
                    "model_dependencies": []
                },
                {
                    "process_name": "forecast",
                    "worker_flavor": "tensorflow",
                    "model_name": "ERCOT_LOAD_TOTAL_WOLF_TENSORFLOW",
                    "model_stage": "Production",
                    "model_type": "wolf",
                    "flow_name": "TOTAL LOAD WOLF LOW",
                    "weather_forecast_combine_method": "LowBand",
                    "model_dependencies": []
                }
            ]
        }
    ]
})

if __name__ == "__main__":
    modelExecutionPlan = _modelExecution.model_validate_json(json_str)
    print(modelExecutionPlan)

Error

Traceback (most recent call last):
  File "/Users/karlton.zeitz/src/scratch/prefect/deployment/scratch.py", line 53, in <module>
    @flow(name="Simplify Model Execution with json validation")
     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/karlton.zeitz/.pyenv/versions/3.11.5/lib/python3.11/site-packages/prefect/flows.py", line 1378, in flow
    Flow(
  File "/Users/karlton.zeitz/.pyenv/versions/3.11.5/lib/python3.11/site-packages/prefect/context.py", line 185, in __register_init__
    __init__(__self__, *args, **kwargs)
  File "/Users/karlton.zeitz/.pyenv/versions/3.11.5/lib/python3.11/site-packages/prefect/flows.py", line 307, in __init__
    self.parameters = parameter_schema(self.fn)
                      ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/karlton.zeitz/.pyenv/versions/3.11.5/lib/python3.11/site-packages/prefect/utilities/callables.py", line 336, in parameter_schema
    create_schema(
  File "/Users/karlton.zeitz/.pyenv/versions/3.11.5/lib/python3.11/site-packages/prefect/utilities/callables.py", line 296, in create_v1_schema
    return model.schema(by_alias=True)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/karlton.zeitz/.pyenv/versions/3.11.5/lib/python3.11/site-packages/pydantic/v1/main.py", line 664, in schema
    s = model_schema(cls, by_alias=by_alias, ref_template=ref_template)
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/karlton.zeitz/.pyenv/versions/3.11.5/lib/python3.11/site-packages/pydantic/v1/schema.py", line 188, in model_schema
    m_schema, m_definitions, nested_models = model_process_schema(
                                             ^^^^^^^^^^^^^^^^^^^^^
  File "/Users/karlton.zeitz/.pyenv/versions/3.11.5/lib/python3.11/site-packages/pydantic/v1/schema.py", line 582, in model_process_schema
    m_schema, m_definitions, nested_models = model_type_schema(
                                             ^^^^^^^^^^^^^^^^^^
  File "/Users/karlton.zeitz/.pyenv/versions/3.11.5/lib/python3.11/site-packages/pydantic/v1/schema.py", line 623, in model_type_schema
    f_schema, f_definitions, f_nested_models = field_schema(
                                               ^^^^^^^^^^^^^
  File "/Users/karlton.zeitz/.pyenv/versions/3.11.5/lib/python3.11/site-packages/pydantic/v1/schema.py", line 256, in field_schema
    f_schema, f_definitions, f_nested_models = field_type_schema(
                                               ^^^^^^^^^^^^^^^^^^
  File "/Users/karlton.zeitz/.pyenv/versions/3.11.5/lib/python3.11/site-packages/pydantic/v1/schema.py", line 528, in field_type_schema
    f_schema, f_definitions, f_nested_models = field_singleton_schema(
                                               ^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/karlton.zeitz/.pyenv/versions/3.11.5/lib/python3.11/site-packages/pydantic/v1/schema.py", line 927, in field_singleton_schema
    if issubclass(field_type, BaseModel):
       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "<frozen abc>", line 123, in __subclasscheck__
TypeError: issubclass() arg 1 must be a class

Versions

Version:             2.14.12
API version:         0.8.4
Python version:      3.11.5
Git commit:          5a2482d0
Built:               Wed, Dec 20, 2023 4:58 PM
OS/Arch:             darwin/x86_64
Profile:             default
Server type:         server

Additional context

Sorry for the large json string...

I received the error originally when calling deployment.apply() as I'm trying to get better validation and parameter handling of the json string, as it is large and easy to get incorrect.

I removed the deploy code as the issue occurs with or without it.

Just changing the parameter type to a non-recursive type or directly to BaseModel, and the problem disappears.

NoahKusaba commented 8 months ago

I found a fix. Update your pydantic version to pydantic==1.10.8

kzeitz commented 7 months ago

I'll give that a try. Thanks!

urimandujano commented 7 months ago

Hey @kzeitz, I think you've found a gap in our pydantic v2 support logic. Based on your comment and original bug report, it looks like the issue is specifically when pydantic v2 is installed, is that right? While downgrading to pydantic v1 is the quickest fix, this is something that we want working on v2 too. We'll work on a fix for this.

kzeitz commented 7 months ago

Yes... you are correct. I was using pydantic 2.5.2 I believe.

I'm pleased that you plan on fixing it. I'll see if downgrading works for me in the meantime. I appreciate your attention to this.

Andrew-S-Rosen commented 7 months ago

I'm surprised this isn't also an error due to #7502. Just a head's up that there could be some relation.

serinamarie commented 7 months ago

Unrelated @kzeitz but I believe your reproduction has opened a [secret scanning alert] since your Databricks token was exposed (https://github.com/PrefectHQ/prefect/security/secret-scanning/2).

Edit: Revoked secret and closed alert.

kzeitz commented 7 months ago

Thanks...