dask-contrib / dask-awkward

Native Dask collection for awkward arrays, and the library to use it.
https://dask-awkward.readthedocs.io
BSD 3-Clause "New" or "Revised" License
61 stars 19 forks source link

feat: Only return parquet metadata if intending to write #549

Closed martindurant closed 3 days ago

codecov-commenter commented 1 month ago

:warning: Please install the 'codecov app svg image' to ensure uploads and comments are reliably processed by Codecov.

Codecov Report

Attention: Patch coverage is 75.00000% with 3 lines in your changes missing coverage. Please review.

Project coverage is 92.93%. Comparing base (8cb8994) to head (64b3649). Report is 148 commits behind head on main.

Files with missing lines Patch % Lines
src/dask_awkward/lib/io/parquet.py 75.00% 3 Missing :warning:

:exclamation: Your organization needs to install the Codecov GitHub app to enable full functionality.

Additional details and impacted files ```diff @@ Coverage Diff @@ ## main #549 +/- ## ========================================== - Coverage 93.06% 92.93% -0.14% ========================================== Files 23 22 -1 Lines 3290 3395 +105 ========================================== + Hits 3062 3155 +93 - Misses 228 240 +12 ```

:umbrella: View full report in Codecov by Sentry.
:loudspeaker: Have feedback on the report? Share it here.


🚨 Try these New Features:

martindurant commented 2 weeks ago

@pfackeldey : added fire_and_forget flag to to_parquet. Give it a try? You must already have your dask client instantiated.

pfackeldey commented 2 weeks ago

Thank you for adding this option @martindurant so quickly! I'm redirecting this test to @ikrommyd as he has an analysis setup that can test this :+1:

lgray commented 2 weeks ago

@martindurant could we also get an option for the tree reduce?

martindurant commented 2 weeks ago

tree reduce?

Reducing N*None -> None in each reduction? I suppose so. I'll get back to you in about an hour. Naturally, all these approaches are mutually exclusive, and purely experimental for now.

martindurant commented 2 weeks ago

@lgray : tree= option seems to work

ikrommyd commented 2 weeks ago

I will try those out. Takes some time because I’m trying a large enough sample to see those issues and also because I’m monitoring the dashboard as it’s running.

ikrommyd commented 2 weeks ago

@pfackeldey : added fire_and_forget flag to to_parquet. Give it a try? You must already have your dask client instantiated.

So we need the client to be already up when we will dak.to_parquet (when building the graph)?

martindurant commented 2 weeks ago

Correct. This was the quick and easy way to do it.

On 8 Nov 2024 17:50, Iason Krommydas @.***> wrote:

@pfackeldeyhttps://github.com/pfackeldey : added fire_and_forget flag to to_parquet. Give it a try? You must already have your dask client instantiated.

So we need the client to be already up when we will dak.to_parquet (when building the graph)?

— Reply to this email directly, view it on GitHubhttps://github.com/dask-contrib/dask-awkward/pull/549#issuecomment-2465867944, or unsubscribehttps://github.com/notifications/unsubscribe-auth/ABODEZCYBEHZ6EB3C6EV4C3Z7U537AVCNFSM6AAAAABQMSMIOSVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDINRVHA3DOOJUGQ. You are receiving this because you were mentioned.Message ID: @.***>

ikrommyd commented 1 week ago

So first report from trying out the new fire_and_forget and tree options:

1) The tree option seems to work very well. The workflow succeeded the first time without errors. The workers still had unmanaged memory(old) in the GB scale but the to-parquet tasks weren't accumulating on them. They were being forgotten and weren't staying in memory. When the merging to the final write-parquet task after the tree reduction was about to happen, I saw no spike in memory of 1-2 workers that which would happen if they had to accumulate all the to-parquet tasks. Even better, because one worked had died for different reasons, not a lot of tasks hard to be redone with tree reduction. The computation reached the end, couldn't gather the results from this dead worker and only went back and re-did ~100 tasks (out of many thousands) and then the computation finished first try.

2) the fire_and_forget option seems to work (I see files being written on disk) but there is a problem. The dask.compute(to_compute) doesn't hold the interpreter. So the script proceeds and then reaches its end and when the python interpreter exits, it kills the client and the scheduler. I don't know if there is a way to prevent this or if I'm doing something wrong. However, until the client got killed, I was able to see tasks being computed in the dashboard and files being written to disk. There is some sense of tracking in the dashboard as well. I'm seeing 0/X to-parquet tasks completed as it doesn't track them with this option but X keeps getting smaller and smaller due to the remaining tasks number becoming smaller. So you do have a sense of how many writing tasks you have remaining.

lgray commented 1 week ago

Cool - this is useful information re: tree reduction. It would seem we should try to use it in as many remaining places as possible where we otherwise have N:1 input-to-output partitions (like collecting finalized arrays or similar things).

Histograms are already a tree reduction but those face different issues. However, used in a few places here it could bring us the robustness we appear to be missing?

This also brings up the issue - why the heck are distributed tasks taking up so much memory!? There's an additional class that represents a task in distributed which is surely eating up some space if tasks are hanging around.

I guess we should think carefully about lifecycles.

martindurant commented 1 week ago

why the heck are distributed tasks taking up so much memory

Quite. I suggested that perhaps a worker plugin can figure out what's being allocated as tasks go through their lifecycles, perhaps on one-thread workers. Usual tools like object growth and reference cycle finders would be the first line of attack. I'm not certain that the worker plugin system (transition method) has enough granularity, but it's an easy place to start. https://distributed.dask.org/en/stable/plugins.html#worker-plugins

lgray commented 1 week ago

We need to be careful with fire_and_forget since it depends on whatever is executing tasks being interface-similar to distributed. We already have options that are not, some logic to check what's being used to execute the graph and error out if it isn't distributed is probably useful.

martindurant commented 1 week ago

it depends on whatever is executing tasks being interface-similar to distributed.

At least we would fail early at get_client, but your point is valid. As implemented in this PR, it is only for trialing and getting the kind of information @ikrommyd supplied, of course.

ikrommyd commented 1 week ago

I've just tried the fire_and_forget as well. I stopped the interpreter from going past the dask.compute() call by just adding a input("Press enter when the computation finishes") and monitored the dashboard. All went fine just like in the tree-reduction case. By the end, two workers had died and the tasks those workers had into memory were just redone by other workers that spawned in the end to do just that. I got exactly the same number of parquet files with tree-reduction and fire and forget and no memory problems (the unmanaged memory (old) of the workers was still in the GB scale for fire and forget as well).

martindurant commented 1 week ago

I'm the weekly awkward meeting, we decided that tree reduction should become the only implementation for write-parquet (it amounts to the same layout in the case of few partitions). The fire-and-forget route will be removed from this PR and maybe can be resurrected in a separate one for those that want it. Aside from being distributed-specific, it comes with the problem of not knowing when your process is finished.

ikrommyd commented 4 days ago

Would be nice to add the same feature here: https://github.com/scikit-hep/uproot5/blob/734700ef1f822338b03a7573df484909b317b2c2/src/uproot/writing/_dask_write.py

martindurant commented 4 days ago

@ikrommyd - certainly, but that would be a separate PR of course. It does seem like the metadata in that case is simply discarded anyway.

ikrommyd commented 4 days ago

@ikrommyd - certainly, but that would be a separate PR of course. It does seem like the metadata in that case is simply discarded anyway.

Oh yeah, I just mentioned it here for documentation purposes since there was a discussion above.

martindurant commented 3 days ago

I don't suppose there's any testing we should be doing here, except that the existing parquet tests continue to work?

lgray commented 3 days ago

Yeah it's hard to achieve the scale in CI to test the actual performance impact of this PR.