PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
15.77k stars 1.54k forks source link

`prefect build` does not produce deterministic results #5776

Closed c10k-cc closed 2 years ago

c10k-cc commented 2 years ago

Description

In some cases, prefect build command does not produce a deterministic output. Consequently, it causes prefect register to bump up the flow version, even when there are no flow code changes! (as mentioned in #5777 )

Expected Behavior

prefect build should produce an identical output for a particular input flow

Actual Behavior

prefect build produces a different output for each of the 2 consecutive runs

Reproduction

Take this simple example:

from prefect import task, Flow, Parameter

@task()
def task1(p1, p2):
    return p1, p2

with Flow('flow1') as flow:
    param1 = Parameter('param1')
    t1 = task1(param1, param1)

Save it as flow.py Keep in mind that this issue only occurs when the same parameter is passed to a particular task multiple times. Run this command:

prefect build -p flow.py -o flow1.json

And then again:

prefect build -p flow.py -o flow2.json

Now, compare the outputs:

diff flow1.json flow2.json

As you can see, the outputs differ! The same can be reproduced with the -m <module> flag as well.

NOTE: In the above example, there is a total of 2 possible outputs, so there is a 50% chance of different output being generated. Nondeterminism! Run it a few times to reproduce the results.

Proposed solution

Output JSON object needs to be properly sorted during serialization. It is the cause of this non-deterministic behavior. Please take a look below in this thread to see 2 different file outputs. One naive solution is to sort all lists in the output object before before being outputted. But this needs more investigating...

Environment

Versions (irrelevant for this issue):

c10k-cc commented 2 years ago

File outputs

flow1.json output (formatted) ```json { "flows": [ { "__version__": "1.2.1", "edges": [ { "__version__": "1.2.1", "downstream_task": { "__version__": "1.2.1", "slug": "task1-1" }, "flattened": false, "key": "p1", "mapped": false, "upstream_task": { "__version__": "1.2.1", "slug": "param1" } }, { "__version__": "1.2.1", "downstream_task": { "__version__": "1.2.1", "slug": "task1-1" }, "flattened": false, "key": "p2", "mapped": false, "upstream_task": { "__version__": "1.2.1", "slug": "param1" } } ], "name": "myflow", "parameters": [ { "__version__": "1.2.1", "default": null, "name": "param1", "outputs": "typing.Any", "required": true, "slug": "param1", "tags": [], "type": "prefect.core.parameter.Parameter" } ], "reference_tasks": [], "run_config": { "__version__": "1.2.1", "env": null, "labels": [], "type": "UniversalRun" }, "schedule": null, "storage": { "__version__": "1.2.1", "flows": { "myflow": "algotrading.prefect.myflow" }, "module": "algotrading.prefect.myflow", "secrets": [], "type": "Module" }, "tasks": [ { "__version__": "1.2.1", "auto_generated": false, "cache_for": null, "cache_key": null, "cache_validator": { "fn": "prefect.engine.cache_validators.never_use", "kwargs": {} }, "inputs": {}, "max_retries": 0, "name": "param1", "outputs": "typing.Any", "retry_delay": null, "skip_on_upstream_skip": true, "slug": "param1", "tags": [], "timeout": null, "trigger": { "fn": "prefect.triggers.all_successful", "kwargs": {} }, "type": "prefect.core.parameter.Parameter" }, { "__version__": "1.2.1", "auto_generated": false, "cache_for": null, "cache_key": null, "cache_validator": { "fn": "prefect.engine.cache_validators.never_use", "kwargs": {} }, "inputs": { "p1": { "required": true, "type": "typing.Any" }, "p2": { "required": true, "type": "typing.Any" } }, "max_retries": 0, "name": "task1", "outputs": "typing.Any", "retry_delay": null, "skip_on_upstream_skip": true, "slug": "task1-1", "tags": [], "timeout": null, "trigger": { "fn": "prefect.triggers.all_successful", "kwargs": {} }, "type": "prefect.tasks.core.function.FunctionTask" } ], "type": "prefect.core.flow.Flow" } ], "version": 1 } ```
flow2.json output (formatted) ```json { "flows": [ { "__version__": "1.2.1", "edges": [ { "__version__": "1.2.1", "downstream_task": { "__version__": "1.2.1", "slug": "task1-1" }, "flattened": false, "key": "p2", "mapped": false, "upstream_task": { "__version__": "1.2.1", "slug": "param1" } }, { "__version__": "1.2.1", "downstream_task": { "__version__": "1.2.1", "slug": "task1-1" }, "flattened": false, "key": "p1", "mapped": false, "upstream_task": { "__version__": "1.2.1", "slug": "param1" } } ], "name": "myflow", "parameters": [ { "__version__": "1.2.1", "default": null, "name": "param1", "outputs": "typing.Any", "required": true, "slug": "param1", "tags": [], "type": "prefect.core.parameter.Parameter" } ], "reference_tasks": [], "run_config": { "__version__": "1.2.1", "env": null, "labels": [], "type": "UniversalRun" }, "schedule": null, "storage": { "__version__": "1.2.1", "flows": { "myflow": "algotrading.prefect.myflow" }, "module": "algotrading.prefect.myflow", "secrets": [], "type": "Module" }, "tasks": [ { "__version__": "1.2.1", "auto_generated": false, "cache_for": null, "cache_key": null, "cache_validator": { "fn": "prefect.engine.cache_validators.never_use", "kwargs": {} }, "inputs": {}, "max_retries": 0, "name": "param1", "outputs": "typing.Any", "retry_delay": null, "skip_on_upstream_skip": true, "slug": "param1", "tags": [], "timeout": null, "trigger": { "fn": "prefect.triggers.all_successful", "kwargs": {} }, "type": "prefect.core.parameter.Parameter" }, { "__version__": "1.2.1", "auto_generated": false, "cache_for": null, "cache_key": null, "cache_validator": { "fn": "prefect.engine.cache_validators.never_use", "kwargs": {} }, "inputs": { "p1": { "required": true, "type": "typing.Any" }, "p2": { "required": true, "type": "typing.Any" } }, "max_retries": 0, "name": "task1", "outputs": "typing.Any", "retry_delay": null, "skip_on_upstream_skip": true, "slug": "task1-1", "tags": [], "timeout": null, "trigger": { "fn": "prefect.triggers.all_successful", "kwargs": {} }, "type": "prefect.tasks.core.function.FunctionTask" } ], "type": "prefect.core.flow.Flow" } ], "version": 1 } ```
"diff flow1.json flow2.json" output ``` 13c13 < "key": "p1", --- > "key": "p2", 27c27 < "key": "p2", --- > ```
zanieb commented 2 years ago

Thanks for the issue! Flow.serialized_hash does some handling aiming for deterministic sorting of keys that may be relevant.

tirkarthi commented 2 years ago

Edit : Just noticed edges and tasks are retrieved in sorted order. So please ignore this comment to be irrelevant

https://github.com/PrefectHQ/prefect/blob/f6d79b7f90dc6e271ea6427184a710229f8586b6/src/prefect/serialization/flow.py#L88-L89

self.edges is a set and it might not be of the same order as previous runs. Perhaps copying it in sorted order can make self.edges deterministic. Same issue might be there for self.tasks too which is a set. Perhaps they can be sorted before serialization. Below is a print statement of self.edges and self.tasks during different runs that is more prevalent with more edges and tasks. Using watch command we can see the changes : watch python -m prefect build -p /tmp/flow.py . Rest of the fields are dict and dict order is deterministic as per insertion since Python 3.6+

from prefect import task, Flow, Parameter

@task()
def task1(p1, p2, p3):
    return p1, p2, p3

with Flow("flow1") as flow:
    param1 = Parameter("param1")
    param2 = Parameter("param2")
    param3 = Parameter("param3")
    t1 = task1(param1, param2, param3)
(.env) ➜  prefect git:(master) ✗ python -m prefect build -p /tmp/flow.py 
Collecting flows...
Processing '/tmp/flow.py':
  Building `Local` storage...
  Building 'flow1'...{<Edge(key=p2, mapped=False, flattened=False): param2 to task1>, <Edge(key=p1, mapped=False, flattened=False): param1 to task1>, <Edge(key=p3, mapped=False, flattened=False): param3 to task1>}
{<Task: task1>, <Parameter: param2>, <Parameter: param3>, <Parameter: param1>}
 Done
Writing output to 'flows.json'
========================== 1 built ==========================
(.env) ➜  prefect git:(master) ✗ python -m prefect build -p /tmp/flow.py 
^[[ACollecting flows...
Processing '/tmp/flow.py':
  Building `Local` storage...
  Building 'flow1'...{<Edge(key=p3, mapped=False, flattened=False): param3 to task1>, <Edge(key=p2, mapped=False, flattened=False): param2 to task1>, <Edge(key=p1, mapped=False, flattened=False): param1 to task1>}
{<Task: task1>, <Parameter: param2>, <Parameter: param3>, <Parameter: param1>}
 Done
Writing output to 'flows.json'
========================== 1 built ==========================
diff --git a/src/prefect/core/flow.py b/src/prefect/core/flow.py
index e785465ca..6d7f70168 100644
--- a/src/prefect/core/flow.py
+++ b/src/prefect/core/flow.py
@@ -242,6 +242,8 @@ class Flow:
         new.constants = self.constants.copy()
         new.tasks = self.tasks.copy()
         new.edges = self.edges.copy()
+        print(new.edges)
+        print(new.tasks)
         new.slugs = self.slugs.copy()
         new.set_reference_tasks(self._reference_tasks)
         return new
tirkarthi commented 2 years ago

I have raised https://github.com/PrefectHQ/prefect/pull/5785 as a possible fix. Additional fields can be included if it makes it more robust.