dask / dask

Parallel computing with task scheduling
https://dask.org
BSD 3-Clause "New" or "Revised" License
12.65k stars 1.72k forks source link

Efficient cloning of taskgraphs on top of new input partitions? #11435

Open lgray opened 1 month ago

lgray commented 1 month ago

Hi!

I'm a high energy particle physicist using dask, dask-awkward, and dask-histogram to compute complex analyses over billions of rows with many variations of systematic uncertainties to create rich statistical models of our collider data.

The task graphs that we generate are often rather complex and can easily reach thousands of nodes before optimization for a single dataset, and we then construct the task graph for multiple datasets. This can be time consuming (tens of minutes for a ~200 datasets!), resulting in sluggish user experience that is largely because of repeated work. The structure of the task graph likely only needs to be calculated 2-3 times at most, depending on if the dataset is signal simulation, background simulation, or experiment data.

Would it be possible to instead calculate the structure, which is the expensive part, once and then re-key the graph for new input partitions (and rekeying all dependents of the inputs)? This should be much faster since it's essentially walking through the taskgraph, calculating some hashes, and creating the appropriate number of new partitions. This approach may even work for tree-reduction or repartitioning layers (in modes where you don't need to know the partition structure a priori), since these are often calculated only when the low level graph is finally materialized.

Has this operation already been implemented somewhere (I couldn't find anything)? Is this a reasonable thing to do? Are there any missing necessary components in dask to implementing a strategy like this?

Thanks in advance!

mrocklin commented 1 month ago

My first suggestion would be to profile graph construction. That might give you a better understanding of what's slowing things down and give you ideas on where to focus effort. It could be that what you suggest is the best course of action, but if you haven't yet gone through a round of profiling and optimization I wouldn't be surprised if there was a 10x cost reduction in there somewhere.

On Wed, Oct 16, 2024, 12:23 PM Lindsey Gray @.***> wrote:

Hi!

I'm a high energy particle physicist using dask, dask-awkward https://github.com/dask-contrib/dask-awkward, and dask-histogram https://github.com/dask-contrib/dask-histogram to compute complex analyses over billions of rows with many variations of systematic uncertainties to create rich statistical models of our collider data.

The task graphs that we generate are often rather complex and can easily reach thousands of nodes before optimization for a single dataset, and we then construct the task graph for multiple datasets. This can be time consuming (tens of minutes for a ~200 datasets!), resulting in sluggish user experience that is largely because of repeated work. The structure of the task graph likely only needs to be calculated 2-3 times at most, depending on if the dataset is signal simulation, background simulation, or experiment data.

Would it be possible to instead calculate the structure, which is the expensive part, once and then re-key the graph for new input partitions (and rekeying all dependents of the inputs)? This should be much faster since it's essentially walking through the taskgraph, calculating some hashes, and creating the appropriate number of new partitions. This approach may even work for tree-reduction or repartitioning layers (in modes where you don't need to know the partition structure a priori), since these are often calculated only when the low level graph is finally materialized.

Has this operation already been implemented somewhere? Is this a reasonable thing to do? Are there any missing necessary components in dask to implementing a strategy like this?

Thanks in advance!

— Reply to this email directly, view it on GitHub https://github.com/dask/dask/issues/11435, or unsubscribe https://github.com/notifications/unsubscribe-auth/AACKZTCUKHDLNN2VTFFHL53Z32OKNAVCNFSM6AAAAABQB6VTB6VHI2DSMVQWIX3LMV43ASLTON2WKOZSGU4TENJWGM3DSNA . You are receiving this because you are subscribed to this thread.Message ID: @.***>

lgray commented 1 month ago

@mrocklin Thanks for the reply - we have already done one pass of extensive profile guided optimization, which got this process from taking > 1 hour to the ~10 minutes quoted above.

mrocklin commented 1 month ago

I think the next question would then be "what is the slow part?"

At the end of the day making layers of a Dask graph is just constructing Python dictionaries. How to make a Python dictionary quickly isn't something that we really control. It has much more to do with the code in the libraries you mention, rather than in anything in the core dask library itself. If you think you can make a dictionary more quickly by creating a template first then that makes sense to pursue. This isn't something that we have much say in.

If there is some function in the Dask library that is particularly slow then we can look at that function. My guess though is that you'll find that most of the issue here is downstream of the core dask library.

lgray commented 1 month ago

I can pull up the profiles in a bit for you to see (not at the right computer) but generally it boils down to that we are doing a very large number of small operations resulting in building that dictionary and predicting the output types (a la dask-array) takes quite some time. More succinctly: our task graphs are just big and it's kinda awkward to deal with.

Largely - it's the output type prediction that takes a significant amount of time, which is what gave me the idea for the proposal above and interest in corresponding tools. We've already optimized the type tracing/prediction to point of selectively removing correctness guarantees when we know they are met for other reasons. However, since we don't necessarily need to build the task graph again for the same data flow, it seemed prudent to instead rekey a graph that we know will be the same for a given dataset.

Joining all the datasets that share the same dataflow together creates other problems since it's easy to end up in a situation where you have a few billion individual tasks even after graph optimization. This creates other problems.

Revisiting the joining just now brings up another possible idea - if we only did the joining as a "logical" step and then "unjoined" the task graph at the end for actual submission it could be a way around. Though I can imagine it being a bit clunky for users, but that's also an engineering problem more than fundamental usage.

rjzamora commented 1 month ago

Does this issue/discussion fit under the umbrella of dask-expr and/or array-expr? I haven't carefully digested this discussion, but it seems like it could be solved by generating/caching a graph template at the expression level.

lgray commented 2 weeks ago

Hi! Sorry for the delay, I was on a quite needed vacation for a bit.

Does this issue/discussion fit under the umbrella of dask-expr and/or array-expr? I haven't carefully digested this discussion, but it seems like it could be solved by generating/caching a graph template at the expression level.

I'm not sure if dask(-array)-expr and dask-awkward are miscible with each other (or if they could be made to be). However, if it can cache a task graph and apply it to some other input dataset this is definitely a thread to pull on, even if we can only use some part of it. Perhaps there's an interface to the caching mechanism we can extract and work out generally?

I think the next question would then be "what is the slow part?"

Yes we've profiled this to a reasonable degree at this point and even after that a lot of time is spent in the "typetracing" mechanism to predict output types of various array transformations in awkward-array.

It's been a while since I made the profiles I'll regenerate those and post them to this thread and we can continue the discussion.

@martindurant @jpivarski