PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
16k stars 1.57k forks source link

Incomplete Model for StateCreate #15591

Open WolfgangWenzel opened 2 days ago

WolfgangWenzel commented 2 days ago

Bug summary

First: congrats to an ingenious product! We have been evaluating workflow systems forever and this seems to be great.

However: if i trigger a run via client.create_flow_from_deployment, this calls DeploymentFlowRunCreate in line 628 in orchestration.py, which calls state.to_state_create() in line 631.

This code for this is in line 310 of objects.py which is supposed to return a StateCreate object that is not fully defined in actions.py. This is because ResultRecordMetadata is not imported there because TYPE_CHECKING (which is from typing.py -- dont touch it) is FALSE.

In anycase, if your would import ResultRecordMetadata at this point it results into a circular import.

A partial fix to this is to add

 from prefect.results import ResultRecordMetadata
 StateCreate.model_rebuild()

directly before StateCreate in objects.py but i am not sure this fixes all problems.

Cheers

Version info (prefect version output)

Version:             3.0.3
API version:         0.8.4
Python version:      3.9.6
Git commit:          d9c51bc3
Built:               Fri, Sep 20, 2024 8:52 AM
OS/Arch:             win32/AMD64
Profile:             local
Server type:         server
Pydantic version:    2.9.1
Integrations:
  prefect-aws:       0.5.0
  prefect-shell:     0.3.0

Additional context

No response

malachibazar commented 14 hours ago

I'm running into this issue as well. I'm using run_deployment

zzstoatzz commented 14 hours ago

hi @WolfgangWenzel and @malachibazar - thank you for raising this!

do either of you have an MRE we can use to readily reproduce this? also the actual traceback would be helpful if possible

malachibazar commented 11 hours ago

@zzstoatzz I'm having the issue if I deploy a flow in one file and then use run_deployment in a separate file to trigger the deployed flow.

initial_sync.py

from prefect import serve, flow

@flow(log_prints=True)
def initial_sync(uuid: str)
    print(f'uuid: {uuid}')

if __name__ == "__main__":
    initial_sync_deployment = initial_sync.to_deployment(
        name="initial-sync",
    )

   serve(initial_sync_deployment)

trigger_deployment.py

from prefect.deployments import run_deployment

if __name__ == "__main__":

    uuid = 'some-random-string'

    run_deployment(
        name="initial-sync/initial-sync",
        parameters={"uuid": uuid},
        timeout=0,
        as_subflow=False,
        flow_run_name=f"Initial Sync ({uuid})",
    )

I've found a semi-solution that prevents it from getting the error. If I import initial_sync into trigger_deployment.py, the error goes away.

zzstoatzz commented 4 hours ago

hi @malachibazar - thanks for that! I have reproduced this using your example - will take a look

ikseek commented 2 hours ago

Just doing import prefect.main before calling run_deployment is enough to work around this error