PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
17.65k stars 1.65k forks source link

Explain or implement nested parallelism #2786

Closed shaunc closed 4 years ago

shaunc commented 4 years ago

Current behavior

Please describe how the feature works today

I am unable to figure out how to use map to parallelize nested loops, requiring two levels of fan in/fan out. I'm not sure if this is a documentation or an implementation issue.

For example (pseudo-code):

array_d2 = task_1()
vec = task_2()
u = zeros(array.shape[0])
for i, row in array_d2:
    s = task_3(vec, row)
    t = zeros(array.shape[1])
    for j, cell in row:
        t[j] = task_4(vec, s, cell)
    u[i] = task_5(vec, s, t)
v = task_6(u)

I guess that I would start task_3, task_4 and task_5 mapped; I would pass vec wrapped in unmapped to task_3. and both vec and s in unmapped to task_4. Should I pass both vec and t unmapped to task_4?? Seems magical to use for both reduce and broadcast.

Related stack overflow issue

Proposed behavior

Please describe your proposed change to the current behavior

Perhaps for broadcast, use unmapped; for (partial) reduce (as t in task_5, used reduced). (The idea is that you read off the parallelism from any arguments that are neither unmapped or reduced. However, the reduced fan in has to trace the flow back to the fan-out to see how it relates to the marginal mapped arguments. For more complex situations, like 2D convolution, more explicit axis and slice specifications could be devised, but I'm not sure what the syntax should look like.)

Alternately, document current interface, however it deals with this, to make clear how to go about it. Even making clear how a low level interface works (if there is one) would be useful.

Thanks!

Example

Please give an example of how the enhancement would be useful

I am translating parallel DAGs specified in another system to use Prefect as a backend executor. Nested parallelism obviously comes up quite often in data science flows.

jlowin commented 4 years ago

Hi @shaunc - thanks very much for opening this issue. At the moment, dynamic parallelism in Prefect is performed in an iterated one-dimensional fashion (parallel pipelines of constant cardinaltiy). We have an open issue for a flat_map operator that would allow dynamic fanout / in of the pipelines. An extension of this would support your use case natively, as your second loop appears to depend entirely on the outputs of the first loop.

Until flat_map is introduced, most users solve this with a reduce step followed by a second loop. This is, admittedly, suboptimal, as it slows computation and (potentially unnecessarily) gather the intermediate steps together. The goal of the flat_map operator would be not only to allow dynamic fanning, but also to take advantage of the new depth-first-execution feature (releasing this week, maybe tomorrow?) that doesn't wait for all mapped tasks to finish before going on to the next task; the parallel pipelines complete as fast as possible.

That said, I recognize that flat_map isn't exactly the semantic you're looking for, even if it supports the same behavior, and we'll want to ensure that Prefect exposes utilities that make it easy to get the behavior you want.

shaunc commented 4 years ago

@jlowin -- thank you for your response. Hmm... flat_map does some of what is requested here. In particular, it allows the change of arity of the mapping. But I don't see how it helps with passing "partially mapped" or "partially reduced" results, as is possible in nested loops.

For example, I can see how it could be used to run task_4 with the correct arity over cells; and presumably vec could be passed in using unmapped. But what about s -- which might contain per-row aggregate over columns.

Again, with task_5, t could contain -- say -- a normalized version of the column given the info from s but task_5 still is parallel with the arity of the rows, so this is for the current row and isn't entirely reduced.

flat_map changes the arity, but doesn't seem to let me specify how arguments should be broadcast/reduced. If I had three nested loops in the example, that would cause even more problems, of course.

A related question (which might allow a workaround): if a task uses a dask dataframe, will that allow dask to parallelize the task itself (for some tasks at least)? If I can "smuggle" the higher arity through to dask, that might be one way to go about this. NB -- for advanced cases, you might want to allow interaction with dask (through the context?).

Another thought: another way to tackle this would be to create "edges" dynamically: for the inner loop I could use "LOOP" to "build" a bunch of dummy tasks corresponding to the arity, and dynamically add edges from these to new instances of tasks in the inner loop body. The dummy tasks would be created sequentially, but (since they are dummy tasks) would execute quite quickly; then the dynamic edges would let prefect execute the loop body instances as resources are available.

jlowin commented 4 years ago

Your question about Dask is certainly a good one - you can easily reach out to the parent Dask cluster, and submit any work from the Prefect task. We have contemplated adding a client to context, but once in the cluster all you really need to do is instantiate your own worker_client() inside task.run() and I believe it will work as expected.

As a larger design point -- perhaps you are seeking to have Prefect orchestrate your work at too granular a level. In general, we encourage making Prefect tasks as small as your unit of work, but not smaller. If the entire loop can be efficiently performed in one of your existing / preferred tools, like Dask, then your "unit of work" might be the nested loops themselves, rather than each item thereof. In this, you'd build a Prefect task that accepted all required inputs and executed the loop (possibly using Dask directly for parallelism). Prefect would govern the execution of the loop complex as a whole; Dask would take care of the loop itself.

shaunc commented 4 years ago

@jlowin Thank you for your quick response. We are designing yet another tool to orchestrate dataOps/machine learning tools. We wanted to use Prefect rather than Dask directly because of Prefect's robust handling of state. The plan was to have tasks specified in very granular detail for debugging / quick development cycling, but then once the flow worked on sample data, create another flow that grouped the tasks together into larger chunks for performance reasons. We have our own wrappers (using Dask dataframes) that allow us to specify the tasks in a way that allows us to translate them in either manner.

The issue here is that there seem to be other reasons besides performance that force us to use Dask rather than Prefect, which is unfortunate, as for small cases we'd rather have Prefect to rely on.

I guess that always reducing and then starting a new parallel map still doesn't violate our design goals. The hope, however, was that even if Prefect was operating at its most granular on "small" flows, they could actually be rather large small flows :), as scaling up at least modestly often turns up new issues that "toy" samples don't exhibit.

jlowin commented 4 years ago

Got it! So maybe your "unit of work" is, in fact, this small, in which case you may need to use the slightly more "naive" approach of iterated reduce / map steps. Prefect's ability to robustly manage states is due, in part, to our ability to know the shape of the DAG at compile time, with limited extensions at runtime (such as the cardinality of mapped tasks). That's why it's hard for Prefect to easily extend into more dynamic settings - each dimension of dynamicism requires some new form of DAG introspection in order to plan the state management. It's definitely not impossible - witness map and flat_map :) - but it's more involved than just passing the information to Dask. In the medium term, we will be adopting a backend that allows fully dynamic tasks that will be a perfect match for your use case -- Prefect will govern each item and Dask will execute it, but we're not quite there yet. In the meantime, happy to support whichever approach makes the most sense for your work and definitely let us know the design patterns that emerge - we'll use them to improve this area of Prefect.

shaunc commented 4 years ago

Ok ... and thanks for the insight on your roadmap. Please feel free to close this issue as it seems overly large to be actionable for the moment. (Though you can keep it open if it serves as a motivation to future work, as it seems it speaks to the direction you are going.)

In any case, would be eager to see which issues/milestones/discussions I should follow to track progress (aside from "flat_map" itself).

lauralorenz commented 4 years ago

Hi all! Closing this issue for now as this is a bit bigger/nebulous than what we will be working on in the near future, and linked this discussion to the stackoverflow question in the meantime. For now I think the issue @jlowin linked about flat_map (https://github.com/PrefectHQ/prefect/issues/2091) will be the soonest to land.

shaunc commented 4 years ago

Thanks!