PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
15.77k stars 1.54k forks source link

Dataclass `__post_init__` gets invoked again during task call #9747

Open rosscleung opened 1 year ago

rosscleung commented 1 year ago

First check

Bug summary

I have a dataclass, a task, and a flow.

In the dataclass, it checks one of its own parameters and initializes that parameter to a value if it's a None inside the post_init(). If the post_init() gets called a second time, then it will think that the same parameter is no longer None and the parameter gets set a different value.

A lead software engineer from Prefect addressed and confirmed this behavior. It is due to when futures objects get passed down the DAG, the parameters get "resolved", and that's why __post_init__() was called again. This is a pretty "silent" behavior that can catch people off guard, that's why I'm still posting this issue with reproducible code below.

TL;DR: Be careful of the "state" of your objects as they get passed around the DAG!

Reproduction

import prefect
from prefect import flow, task
from dataclasses import dataclass

@dataclass
class testDC:
    frodo: int
    baggings: int = None

    def __post_init__(self):
        print("post_init triggered")
        if self.baggings is None:
            print("self.baggings is None triggered")
            self.baggings = self.frodo * 2
        else:
            print("self.baggings should not be checked again. This should never be triggered.")
            self.baggings = 420

# Simple task that takes in an instance of the test dataclass above and prints it. Does no manipulation.
@task(log_prints=True)
def testTask(dc):
    print(dc)

# Simple flow that takes in a parameter, instantiates the test dataclass, and pass the dataclass to the task.
@flow(log_prints=True)
def testFlow(a: int):

    foo = testDC(a)
    testTask(foo)

if __name__ == "__main__":

    # Run the flow with a simple parameter
    testFlow(a=1)
    print(prefect.__version__)

Error

12:32:41.624 | INFO    | prefect.engine - Created flow run 'russet-doberman' for flow 'testFlow'
12:32:41.775 | INFO    | Flow run 'russet-doberman' - post_init triggered
12:32:41.777 | INFO    | Flow run 'russet-doberman' - self.baggings is None triggered
12:32:41.811 | INFO    | Flow run 'russet-doberman' - Created task run 'testTask-0' for task 'testTask'
12:32:41.812 | INFO    | Flow run 'russet-doberman' - Executing 'testTask-0' immediately...
12:32:41.817 | INFO    | Flow run 'russet-doberman' - post_init triggered
12:32:41.818 | INFO    | Flow run 'russet-doberman' - self.baggings should not be checked again. This should never be triggered.
12:32:41.879 | INFO    | Task run 'testTask-0' - testDC(frodo=1, baggings=420)
12:32:41.923 | INFO    | Task run 'testTask-0' - Finished in state Completed()
12:32:41.969 | INFO    | Flow run 'russet-doberman' - Finished in state Completed('All states completed.')
2.8.6

Versions

2.8.6

Additional context

Thank you Zanie for the quick response on Prefect Slack!

zanieb commented 1 year ago

Work around with

from prefect.utilities.annotations import quote

testTask(quote(foo))

See also https://github.com/PrefectHQ/prefect/pull/8763 / https://github.com/PrefectHQ/prefect/issues/8542

We should add documentation about this.