CoffeaTeam / coffea

Basic tools and wrappers for enabling not-too-alien syntax when running columnar Collider HEP analysis.
https://coffea-hep.readthedocs.io
BSD 3-Clause "New" or "Revised" License
134 stars 129 forks source link

DAG optimization in recent coffea slows down workflow performance #1139

Open JinZhou5042 opened 4 months ago

JinZhou5042 commented 4 months ago

Yesterday, @cmoore24-24 and I have encountered noticeable performance discrepancies while running the same physics application (test_skimmer.py) with TaskVine. After some elimination, we believe this stems from recent updates to Coffea, specifically optimizations made to DAG generation that increased the number of tasks but also introduced some long-running outliers. These outliers are the main reason for the prolonged overall runtime of the application.

This observation was drawn from three representative tests, and besides differing versions of Coffea, all other environmental settings are identical:

There are three versions of Coffea in our experiments:

Coffea 2024.2.2 This version of Coffea generates 163 tasks in total, below is the task graph where the label on:

This figure shows how tasks are distributed and the total execution time is 666.4s.

image

As seen in the CDF of task execution time, it shows that 90.18% tasks finish in 91s and the longest one is 394.09s.

image

7 categories are there with each consisting of a bunch of files if look into the csv file. category_info_coffea2024.2.2.csv

Coffea 2024.4.0 With using a later version of Coffea, Connor suggested that this version has the magic combination of the task graph changes (so we see 222 tasks here) where presumably the small tasks are completing more quickly, but still has the runtime of 10 minutes, meaning the long tasks aren't suffering so much. Which means there's a balance there that was changed along the way.

The task graph suggests that the structure has been drastically changed and we suspect there is a DAG optimization: test_skimmer_coffea2024 4 0

And the execution details, 652.1s in total, pretty much the same compared with the first one:

image

90.09% tasks finish in 65.42s and the longest one is 395.26s. Notably, most tasks run faster than the previous one, but due to the increased number of tasks and no performance gain with the longest tasks, the total duration isn't either improved or reduced.

image

We also see the increased number of categories with this version (10>7). category_info_coffea2024.4.0.csv

Coffea 2024.6.1 This version seems to have a mysterious optimization of the dag, it has the same number of tasks (222) compared with Coffea2024.4.0 but the longest task runs much slower (554.99>395.26) than the other one, which results in the improved runtime.

The task graph: test_skimmer_coffea2024 6 1

Execution details (200s longer):

image

Execution CDF (90.54% tasks finish in 44.34s):

image

The same number of categories compared with Coffea2024.4.0 (10) category_info_coffea2024.6.1.csv

From the comparisons, it's clear that Coffea 2024.4.0 optimized the DAG by introducing a wider variety of tasks and dividing the graph into finer granularities, which increased the overall concurrency and optimized the runtime for most tasks.

Coffea 2024.6.1 built on this by further shortening the runtime for the majority of tasks. However, it also lengthened the runtime for a few longer tasks, which extended the critical path needed to complete, thus not only failing to improve but actually reducing the overall application performance by 20%.

In our experience, optimizing long tasks is crucial for reducing the DAG's completion time. This includes strategies such as scheduling them in advance using existing information, replicating them to minimize recovery delays, or gathering useful data about long tasks during the graph-building phase.

We believe these optimizations likely lead to significant performance enhancements in many scenarios involving graph operations. However, we also want to identify under what circumstances these optimizations might degrade performance and would appreciate collaboration to resolve this issue.

tagging @dthain because this came up in a discussion with him

JinZhou5042 commented 4 months ago

For each of the examples I conducted 3+ experiments so that the results are faithful and can be re-produced.

cmoore24-24 commented 4 months ago

Maybe also worth pointing out, this behavior seemingly started with Coffea 2024.4.1; while testing this application with different versions of Coffea, and keeping other packages in the environment the same, this behavior with longer-running large tasks began when upgrading from 2024.4.0 to 2024.4.1, with no other changes being made to the environment (so awkward, dask_awkward, dask, and so on did not change).

lgray commented 4 months ago

Thank you for this study, it is incredibly useful to dig into these sorts of things. There are many moving parts in generating taskgraphs and it's hard to monitor all of it. Finding tasks like this which are sensitive to changes is super useful! We can make integration tests out of it.

Before I make comments, a question: did you use the same version of dask (and dask-distributed) in all of these test cases? If not, it would be really useful to try keeping dask constant while you change coffea versions so we can bisect the problem a little. Same for dask-awkward, as much as you are able (I know we re-pin versions from time to time).

Finally, one thing that happened recently is that "data touching" in awkward array had some regressions in the most recent versions, and could cause some workflows to load much more data than they need to (from awkward 2.6.5).

Please check dak.necessary_columns for what you're calculating in each case to make sure you're not being affected by this issue.

dthain commented 4 months ago

@JinZhou5042 are you able to dump the conda package solution for both cases? That would be helpful to have in general.

JinZhou5042 commented 4 months ago

Besides ndcctools, the only other packages required are fastjet and coffea. Both ndcctools and fastjet are up-to-date, so all package discrepancies are due to different versions installed with the command pip install coffea==xxx.

Below are package lists generated by conda list --export, respectively

packages_coffea2024.2.2.txt packages_coffea2024.4.0.txt packages_coffea2024.6.1.txt

And these are dask part

coffea2024.2.2

(tmp_coffea2024.2.2) [jzhou24@condorfe ~]$ conda list dask
# packages in environment at /afs/crc.nd.edu/user/j/jzhou24/miniconda3/envs/tmp_coffea2024.2.2:
#
# Name                    Version                   Build  Channel
dask                      2024.1.1                 pypi_0    pypi
dask-awkward              2024.7.0                 pypi_0    pypi
dask-histogram            2024.3.0                 pypi_0    pypi

coffea2024.4.0

(tmp_coffea2024.4.0) [jzhou24@condorfe ~]$ conda list dask
# packages in environment at /afs/crc.nd.edu/user/j/jzhou24/miniconda3/envs/tmp_coffea2024.4.0:
#
# Name                    Version                   Build  Channel
dask                      2024.7.1                 pypi_0    pypi
dask-awkward              2024.7.0                 pypi_0    pypi
dask-histogram            2024.3.0                 pypi_0    pypi

coffea2024.6.1

(tmp_coffea2024.6.1) [jzhou24@condorfe ~]$ conda list dask
# packages in environment at /afs/crc.nd.edu/user/j/jzhou24/miniconda3/envs/tmp_coffea2024.6.1:
#
# Name                    Version                   Build  Channel
dask                      2024.7.1                 pypi_0    pypi
dask-awkward              2024.7.0                 pypi_0    pypi
dask-histogram            2024.3.0                 pypi_0    pypi

I didn't find dask-distributed in my environment.

JinZhou5042 commented 4 months ago

@cmoore24-24 I don't know much about the dak.necessary_columns, do we need to set and re-test some experiments?

JinZhou5042 commented 4 months ago

You might also be interested in the total data transferred in each of the cases:

coffea2024.2.2

image

coffea2024.4.0

image

coffea2024.6.1

image

There is only 1 worker, the x axis represents the time and the y represents the disk size increasement in MB. As seen, the later two have more data transferred than the first one, but inside of them they are pretty much the same.

lgray commented 4 months ago

Interesting, in this most recent version it's picked up a synchronization point.

Can you also dump for me the versions of awkward and uproot that you're using in each case?

JinZhou5042 commented 4 months ago

coffea2024.2.2

(tmp_coffea2024.2.2) [jzhou24@condorfe ~]$ conda list awkward
# packages in environment at /afs/crc.nd.edu/user/j/jzhou24/miniconda3/envs/tmp_coffea2024.2.2:
#
# Name                    Version                   Build  Channel
awkward                   2.6.6                    pypi_0    pypi
awkward-cpp               35                       pypi_0    pypi
dask-awkward              2024.7.0                 pypi_0    pypi

(tmp_coffea2024.2.2) [jzhou24@condorfe ~]$ conda list uproot
# packages in environment at /afs/crc.nd.edu/user/j/jzhou24/miniconda3/envs/tmp_coffea2024.2.2:
#
# Name                    Version                   Build  Channel
uproot                    5.3.10                   pypi_0    pypi

coffea2024.4.0

(tmp_coffea2024.4.0) [jzhou24@condorfe ~]$ conda list awkward
# packages in environment at /afs/crc.nd.edu/user/j/jzhou24/miniconda3/envs/tmp_coffea2024.4.0:
#
# Name                    Version                   Build  Channel
awkward                   2.6.6                    pypi_0    pypi
awkward-cpp               35                       pypi_0    pypi
dask-awkward              2024.7.0                 pypi_0    pypi

(tmp_coffea2024.4.0) [jzhou24@condorfe ~]$ conda list uproot
# packages in environment at /afs/crc.nd.edu/user/j/jzhou24/miniconda3/envs/tmp_coffea2024.4.0:
#
# Name                    Version                   Build  Channel
uproot                    5.3.10                   pypi_0    pypi

coffea2024.6.1

(tmp_coffea2024.6.1) [jzhou24@condorfe ~]$ conda list awkward
# packages in environment at /afs/crc.nd.edu/user/j/jzhou24/miniconda3/envs/tmp_coffea2024.6.1:
#
# Name                    Version                   Build  Channel
awkward                   2.6.6                    pypi_0    pypi
awkward-cpp               35                       pypi_0    pypi
dask-awkward              2024.7.0                 pypi_0    pypi

(tmp_coffea2024.6.1) [jzhou24@condorfe ~]$ conda list uproot
# packages in environment at /afs/crc.nd.edu/user/j/jzhou24/miniconda3/envs/tmp_coffea2024.6.1:
#
# Name                    Version                   Build  Channel
uproot                    5.3.10                   pypi_0    pypi
cmoore24-24 commented 4 months ago

Hi @JinZhou5042 and @lgray, I've done a couple checks on calculations being done in test_skimmer.py, and dak.necessary_columns isn't returning anything more than expected. So I don't believe that this is affecting us in this case.

lgray commented 4 months ago

And, indeed, the versions from conda are all the same so you're consistently using the one with the data-touching bug. :-)

So it all comes down to coffea... That's interesting!

I'll look into what's going on between versions there. I know in a few versions there wasn't much change in how coffea was assembling some calls dask, but since you're moving month to month there may be some drift. I'll take a look at the PRs that went in.

If you feel like git bisecting these changes (if you can figure out a script to automate it), that would be an interesting view on this as well.

JinZhou5042 commented 4 months ago

I'm spending the day testing that application with many version combinations of coffea and dask, the issue might lie on the coffea site and I'm not so sure yet as of now, will update some results later...

JinZhou5042 commented 4 months ago

Here's the table showing different versions of coffea, dask, and uproot, along with the corresponding number of submitted tasks and the total execution time:

image

I previously confused the concepts of restructuring the graph and optimizing the graph. It appears that in one of the dask-awkward versions between 2024.2.0 and 2024.7.0, the logic for generating the task graph was improved, introducing more tasks and categories, which slightly enhanced performance. However, as @cmoore24-24 pointed out, a commit between coffea 2024.4.0 and 2024.4.1 altered the logic of task arrangement, which notably slowed down the application's performance.

My next step would be trying git bisect to target that commit.

JinZhou5042 commented 4 months ago

It seems 609c912b is the culprit...

image

Don't know if this matters

image
lgray commented 4 months ago

the nopython=True shouldn't matter at all!

lgray commented 4 months ago

@nsmith- https://github.com/CoffeaTeam/coffea/commit/609c912b is somehow causing a slowdown in the resulting taskgraph!

Any thoughts in this direction would be extremely helpful.

nsmith- commented 4 months ago

Would it be easy to run the same analysis task using a different data source (e.g. xrootd from FNAL EOS or something else?) The reason I ask is that for me this particular commit was to improve performance, as part of https://github.com/scikit-hep/uproot5/issues/1157

cmoore24-24 commented 4 months ago

Hi @nsmith-

I've put together a reproducer that has the moving parts of the analysis, so should be a faithful abridged version of the analysis that Jin has been posting results for. The actual code to run is here: https://gist.github.com/cmoore24-24/9d1e49c118272f05d12107cc7e0da993 And a second file has functions that are imported into the above: https://gist.github.com/cmoore24-24/71ed4c2785c9ea56fdb4796ec251516d

The reproducer includes the postprocessing step, which has the same step size as Jin's. The benchmark here is the Hbb sample, found in the lpcpfnano EOS, here: /store/group/lpcpfnano/rkansal/v2_3/2017/GluGluHToBB/GluGluHToBB_Pt-200ToInf_M-125_TuneCP5_MINLO_13TeV-powheg-pythia8/GluGluHToBB/220926_153209/0000/*

I hope this is what you had in mind, let me know if you run into issues.