PrefectHQ / prefect

Prefect is a workflow orchestration tool empowering developers to build, observe, and react to data pipelines
https://prefect.io
Apache License 2.0
15.29k stars 1.5k forks source link

Add support for generator flows #14061

Closed jlowin closed 1 week ago

jlowin commented 1 week ago

Mirror of #13820 but allowing generator flows. A generator flow permits interesting asynchronous composition in which a flow can yield results to consumers while remaining in a running state.

Here is a generator flow being consumed and interspaced with extra-flow logic (print('---'))


from prefect import flow, task

@task
def consume(x):
    return x

@flow
def generator():
    for i in range(10):
        yield consume(i)

gen = generator()
for g in generator():
    print(g)
    print('---')
10:37:13.608 | INFO    | prefect.engine - Created flow run 'resilient-malamute' for flow 'generator'
10:37:13.609 | WARNING | prefect.utilities.urls - No URL found for the Prefect UI, and no default base path provided.
10:37:13.711 | INFO    | prefect.engine - Created task run 'consume-0' for task 'consume'
10:37:13.780 | INFO    | Task run 'consume-0' - Finished in state Completed()
0
---
10:37:13.809 | INFO    | prefect.engine - Created task run 'consume-1' for task 'consume'
10:37:13.843 | INFO    | Task run 'consume-1' - Finished in state Completed()
1
---
10:37:13.851 | INFO    | prefect.engine - Created task run 'consume-2' for task 'consume'
10:37:13.875 | INFO    | Task run 'consume-2' - Finished in state Completed()
2
---
10:37:13.885 | INFO    | prefect.engine - Created task run 'consume-3' for task 'consume'
10:37:13.924 | INFO    | Task run 'consume-3' - Finished in state Completed()
3
---
10:37:13.932 | INFO    | prefect.engine - Created task run 'consume-4' for task 'consume'
10:37:13.956 | INFO    | Task run 'consume-4' - Finished in state Completed()
4
---
10:37:13.964 | INFO    | prefect.engine - Created task run 'consume-5' for task 'consume'
10:37:14.005 | INFO    | Task run 'consume-5' - Finished in state Completed()
5
---
10:37:14.017 | INFO    | prefect.engine - Created task run 'consume-6' for task 'consume'
10:37:14.052 | INFO    | Task run 'consume-6' - Finished in state Completed()
6
---
10:37:14.063 | INFO    | prefect.engine - Created task run 'consume-7' for task 'consume'
10:37:14.103 | INFO    | Task run 'consume-7' - Finished in state Completed()
7
---
10:37:14.114 | INFO    | prefect.engine - Created task run 'consume-8' for task 'consume'
10:37:14.149 | INFO    | Task run 'consume-8' - Finished in state Completed()
8
---
10:37:14.160 | INFO    | prefect.engine - Created task run 'consume-9' for task 'consume'
10:37:14.192 | INFO    | Task run 'consume-9' - Finished in state Completed()
9
---
10:37:14.204 | INFO    | Flow run 'resilient-malamute' - Finished in state Completed()
jlowin commented 1 week ago

The only test failing here is https://github.com/PrefectHQ/prefect/issues/13963 which I can't replicate locally so I'm going to merge