PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
15.35k stars 1.5k forks source link

Add ability to wait on or get results of groups of futures #14234

Closed desertaxle closed 2 weeks ago

desertaxle commented 2 weeks ago

Adds some syntactic sugar to get the results or wait on futures returned from .map. The implementation is very naive and will need some iterative improvements, but this makes it feel a big more natural to work with groups of futures.

With this PR, .map will return a PrefectFutureList, which walk and quacks like list, but also exposes .wait_for_all and .results methods. These methods can be used to wait for all futures in the PrefectFutureList or get their results.

Very open to push back on this interface, so please review with a critical eye.

Closes https://github.com/PrefectHQ/prefect/issues/14017

Example

Map a task and wait for all the returned futures:

from prefect import flow, task

@task
async def say_hello(name):
    print f"Hello {name}!"

@flow
def my_flow():
    futures = say_hello.map(["world", "Alex", "Sam", "Ned"])

    futures.wait()

if __name__ == "__main__":
    my_flow()

Map a task and get all the results:

from prefect import flow, task

@task
async def say_hello(name):
    return f"Hello {name}!"

@flow
def my_flow():
    futures = say_hello.map(["world", "Alex", "Sam", "Ned"])

    print(futures.result())

if __name__ == "__main__":
    my_flow()

Checklist

cicdw commented 2 weeks ago

Should probably be in a separate PR, but I think we'd also benefit from a wait(*all_my_futures) utility for waiting on arbitrary groups of futures (not necessarily mapped). That being said, I like this interface for mapped futures!

desertaxle commented 2 weeks ago

I think we'd also benefit from a wait(*all_my_futures) utility for waiting on arbitrary groups of futures

Agreed! I'd like us to have wait and as_completed utilities that mirror those in concurrent.futures in the stdlib. I'll add issues to make sure we remember.