PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
15.87k stars 1.56k forks source link

Ability to use unmapped on wait_for #14727

Open tjgalvin opened 1 month ago

tjgalvin commented 1 month ago

First check

Describe the current behavior

The .map interface of a prefect task is useful to submit a series of operations across an iterable of arguments for execution execution. The return of this map method is a list of PrefectFutures of the individual .submit results.

If there was a secondary prefect task that does not have direct data deficiencies, we can use its wait_for argument to tell the orchestration not to start until all mapped inputs in the first task have completed. However, there is no way to both use the map interface while specifying that it only has to wait for the corresponding future from task a to finish.

If task b has no relation to task a, but if task a has some side-effects that task b has to be aware of, it is necessary to wait for task a to finish appropriately. As an example - I have a pipeline that uses (almost exclusively) the .map interface. When I am processing the data set (a nest series of tables on disk called a Measurement Set, a format HPCs hate) I can not operate with said data. But, I want to zip/tar it asap to avoid file-quota related issues. I can either modify the expected return and input types between functions to pass around this extra information, or I can keep the functions as I would otherwise use and leverage the dependency management of prefect.

To put with some code:

futures = task_a.map("Break this string".split())

for some_number, future in zip([1,2,3], futures):
    task_b.submit(some_number, wait_for=future)

Although the above works, it has a rather unattractive mixture of prefect methods. And I am sure that when the inputs to the mapped function are sufficiently large there are performance penalties when interacting with the API.

Describe the proposed behavior

Have an operation similar to unmapped that works for wait_for.

futures = task_a.map("Break this string".split())

task_b.submit([1,2,3], wait_for=unmapped(futures))

This would also make it easier to build up more complex set of wait_for statements without having to do any extra indexing or logic within a for loop.

futures = task_a.map("Break this string".split())
more_futures = new_task.map("Some other string".split())

task_b.submit([1,2,3], wait_for=(unmapped(futures), unmapped(more_futures)))

Example Use

No response

Additional context

I attempt to use the .map interface as much as possible in my pipelines. It makes the code much more readable, and avoid unnecessary layers of indentation.

I find myself in these situation a fair amount

telescope_mss = glob("*.ms")
image_data = image_ms_data.map(ms=telescope_mss)
zip = zip_data.map(ms=telescope_mss, wait_for=image_data)

In the above situation I can not zip the measurement sets (these nasty radio-telescope formats that are folders in folders, many small files and many big files that HPCs absolutely hate) until the imaging is finished. Although I could pass the image_data future into the input of zip_data, it would mean I either have to modify the return of image_ms_data (which is the image, not the data) and updated what zip_ms expects, or add a dummy arg input to make dependency that way. This is a little smelly to me, as realistically the zip_data function should be completely agnostic to what it accepts for zipping.

This is an example. but is the type of use case pops up a little for me. As I try to convince the powers the be that we have a workable maintainable solution I'd love to keep my usage of prefect as consistent and as readable as possible.

desertaxle commented 1 month ago

Thanks for the enhancement request @tjgalvin! If you're chaining .map calls, would you expect this new functionality to wait for upstream completion to be index-based (i.e. mapped execution 0 waits for upstream future 0, mapped execution 1 waits for upstream future 1, etc.)? Or is there other criteria that you'd expect to be able to use when waiting for upstream completion in a .map call?