dask-contrib / dask-awkward

Native Dask collection for awkward arrays, and the library to use it.
https://dask-awkward.readthedocs.io
BSD 3-Clause "New" or "Revised" License
61 stars 19 forks source link

Task graph building becomes uncomfortably slow for large physics analyses #446

Open lgray opened 10 months ago

lgray commented 10 months ago

When the unoptimized task graph size starts to approach a few thousand layers (which is typical in end-stage physics analysis), generating the task graph can take a few seconds. Typical analyses have a O(100) datasets, so this means that a user can easily incur 10 minutes of overhead before their jobs even start (and if the compute time of the full analysis itself is ~20 minutes this is a large slowdown).

I suppose one thing that could be done if we can't squeeze the building time down any further is to parallelize the building with dask.Delayed and collect the graphs back at the client before submitting them, but we should see if we can make the building process more efficient first.

Profile of the task-graph building of two datasets attached using awkward@main, coffea@master. (macOS/arm64) apply_to_fileset.prof.zip

@agoose77 @douglasdavis

lgray commented 10 months ago

FYI The code that produced the profile above is here: https://github.com/cmstas/ewkcoffea/blob/coffea2023/analysis/wwz/wwz4l.py#L33

and you can run it yourself by doing: install:

pip install git+https://github.com/scikit-hep/awkward.git@main
pip install git+https://github.com/CoffeaTeam/coffea.git@master
pip install xgboost
pip install mt2
git clone https://github.com/TopEFT/topcoffea.git -b coffea2023
pushd topcoffea && pip install -e . && popd
git clone https://github.com/cmstas/ewkcoffea.git -b coffea2023
pushd ewkcoffea && pip install -e . && popd

running:

cd ewkcoffea/analysis/wwz
source run_wrapper.sh
martindurant commented 10 months ago

I slack conversations, a few different factors were mentioned, so I would like to be clear here which of them we are talking about.

I would say that creating multiple objects or converting to dask.array in parallel is possible. Running the graph for the purposes of optimisation might parallelise, but amending the graph probably does not.

lgray commented 10 months ago

conversion of dak array to dask.array has been removed - it was an artifact of simpler times :-)

lgray commented 10 months ago

and in particular the profile above tends to only your first bullet point - optimizing the graph for columns is quite fast (dak.necessary_columns is 10x slower for big graphs but that's another, less important issue).

lgray commented 10 months ago

Though if we can get building of the task graph itself faster then the next item on the list will be the speed of optimization.

There's also some related issues of task graphs with ML algorithms in them or large corrections being multiple MB in size and that seems to make dask unhappy.

We could go to using scatter and futures but that's a big leap in programming paradigm for some folks. For sure it is something we can roll out, but it's not what we taught people to do for five years so far :D.

martindurant commented 10 months ago

large corrections being multiple MB in size and that seems to make dask unhappy

Some computes are just hard, it's ok is dask complains so long as it - eventually - works. We need to set reasonable expectations.

lgray commented 10 months ago

Yeah that I can be happy with for now, there are other bigger issues to tackle in terms of UX.

lgray commented 10 months ago

In particular the multiple 10s of megabytes is just metadata so we could scatter it to the cluster. It's more a matter of showing people how to do it in their analyses.

martindurant commented 10 months ago

You don't need scatter() for that, you just need to have that multi-MB value be a dask key in the graph as opposed to a literal value. So wrapping it in ak.from_awkward (1 partition) would be enough, I think .

lgray commented 10 months ago

huh, interesting - I'll try that!

lgray commented 10 months ago

Hmm - the first encounter with that doesn't work out so well since the metadata is an object (a functor), and awkward array does not like that.

martindurant commented 10 months ago

Maybe as any delayed(), then (this works on objects, not only functions).

lgray commented 10 months ago

At present that yields: dask_awkward.utils.DaskAwkwardNotImplemented: Use of Array with other Dask collections is currently unsupported.

martindurant commented 10 months ago

@douglasdavis , any ideas? ;)

lgray commented 10 months ago

I have found a dirty dirty hack. No it doesn't quite work all the way through. I was almost able to hide it in a dask_awkward.Scalar.

lgray commented 10 months ago

Hope springs anew - if I can hide it in an attrs of an awkward array I think I can get it done. However, this exposed a bug!

douglasdavis commented 10 months ago

We can definitely make delayed objects work nicely with binary/map_parititions dak.Array operations. that exception is pretty strict and there's room for relaxing it a bit

lgray commented 10 months ago

Being able to pass in a single delayed object as an argument would be perfect in my case.

lgray commented 10 months ago

Anyway this particular direction is vastly beside the major source of slow down, which appears to be awkward operations on zero length arrays and typetracers. I understand those take time but seems to add up quickly!

martindurant commented 10 months ago

awkward operations on zero length arrays and typetracers

Whether or not they are slow is debatable, but we have MANY of them, so it adds up

lgray commented 10 months ago

If there's performance to squeeze out, we should find it! Telling everyone to make their analyses simpler won't go over very well. :S

douglasdavis commented 10 months ago

When I wrote that exception (some time ago), the problem with mixing dask-awkward collection objects with "anything else" was metadata determination. We always want to be able to keep around an awkward typetracer. If we are mixing a dask-awkward collection object with, for example, a "black box" Delayed object:

@delayed
def a_delayed_array():
    return ak.Array([1, 2])

array = ak.Array([[1, 2, 3], [4], [5, 6, 7], [8]])
dak_array = dak.from_awkward(array, npartitions=2)
result = dak_array * a_delayed_array()

We're going to get an exception trying to figure out what result._meta should be, because a_delayed_array() does not have a metadata that can be used with dak_array._meta

In this case we know result._meta should be the same as dak_array._meta, so we can be explicit with:

result = map_partitions(operator.mul, dak_array, a_delayed_array(), meta=dak_array._meta, output_divisions=1)

But this is of course a bit heavy handed and we don't like suggesting people use map_partitions in cases that are not at least quite nontrivial.

Right now that last code snippet won't work because of the exception mention in @lgray's https://github.com/dask-contrib/dask-awkward/issues/446#issuecomment-1887750609 - we can remove the exception to allow that flavor of map_partitions call at least, but the simple actually use * case will require more thought.

lgray commented 10 months ago

Yeah I just need it internal to coffea for a map_partitions call where I can pre-calculate the meta by hand quite easily.

I think you can even leave in that exception if someone doesn't supply a meta by hand.

douglasdavis commented 10 months ago

Alright that seems like a good starting point for a PR

lgray commented 10 months ago

I was almost able to get this to work by hiding the correction I need to apply in the attrs of a throw-away awkward array, but the partitioning bites me in the ass and it won't compute for large datasets.

So a PR is definitely need so we can have a single delayed object get sent around and applied to multiple calls.

lgray commented 10 months ago

So close!

douglasdavis commented 10 months ago

Alright https://github.com/dask-contrib/dask-awkward/pull/449 allows map_partitions calls (where meta= is explicitly passed by the caller) to have Delayed objects in the arguments.