scikit-hep / uproot5

ROOT I/O in pure Python and NumPy.
https://uproot.readthedocs.io
BSD 3-Clause "New" or "Revised" License
235 stars 75 forks source link

Memory problem in parallel processing while using uproot 4, it wasn't the case in uproot 3 #277

Closed shahidzk1 closed 3 years ago

shahidzk1 commented 3 years ago

I have this root file which is available on Google drive at this link, and when I used to convert it to arrays in root 3 using parallel processing, it took less time and memory. The code I was using was something like ` from concurrent.futures import ThreadPoolExecutor executor = ThreadPoolExecutor(8)

branches = pd.DataFrame.from_dict(uproot.open(''+file_with_path+'')[''+tree_name+''].arrays(namedecode='utf-8', executor = executor))`

But now it consumes all my memory in root 4, may be I am not doing it properly. Could you please have a look at it? Also it is not that speedy as it used to be.

`from concurrent.futures import ThreadPoolExecutor executor = ThreadPoolExecutor(8)

input_tree = uproot.open('/path/10k_events_PFSimplePlainTree.root:PlainTree', decompression_executor=executor)

branches = input_tree.arrays(library='pd', decompression_executor=executor)`

@jpivarski and I discussed this in the issue on this link and he suggested that it may be just 10% more memory but it is more than 10% for me. May be 60-80% more

jpivarski commented 3 years ago

Same clarification as on StackOverflow: I said that for all I know, it could be a small difference, like 10%. I didn't say that it is or should be 10%. The statement was about my lack of knowledge.

tamasgal commented 3 years ago

I guess a GitHub discussion would have been a better choice than StackOverflow since it's more closer to an actual discussion than to a problem with a definite answer :wink:

I had a quick look and tried the code above (with 8 executors). On my machine™ it takes 2m 12s and ~18.29GB of peak memory. Interestingly I get the same numbers when I run with a single executor.

The minimum (uncompressed) data size of the resulting pandas.Dataframe is ~5GB:

>>> sum(dt.itemsize for dt in branches.dtypes) * branches.shape[0] / 1024**3                                                                        
4.912072330713272

You should in principle be able to do the decompression and Dataframe creation with less than double the memory (~10GB) in worst case, assuming that each branch's data is independently stored, so that there is no overlapping data occupying the memory in each executor. Btw. you get this number when you simply use .arrays() (which takes 10s):

import sys
import resource
import uproot

def peak_memory_usage():
    """Return peak memory usage in MB"""
    mem = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
    factor_mb = 1 / 1024
    if sys.platform == "darwin":
        factor_mb = 1 / (1024 * 1024)
    return mem * factor_mb

f = uproot.open('10k_events_PFSimplePlainTree.root')
f["PlainTree"].arrays()
print(peak_memory_usage())

Also please note that Python is garbage collected and sometimes it's a bit lazy of cleaning things up, so your mileage may vary and peak memory usage might give weird results. Do you have an actual comparison of the peak memory compared to your uproot3 code other than guess the percentage? That would certainly be interesting...

I don't know much about the internals of the uproot's arrays(library="pd") mechanics, but if you want to go crazy, you could preallocate the underlying numpy array and fill it in place using different threads to be as memory efficient as one can be. The overhead would then be the cache which is used to grab a chunk of compressed data, uncompress it and write it directly to the target memory location. Which means you could get away with 5GB + cache overhead, given that you don't read the same data multiple times. That however might be much more complicated than it sounds ;) I think that all the tools are there but you'll probably not find a copy&pasteable example out there.

Anyways...

If memory is an issue, I'd definitely cite Jim who already pointed out: parallel I/O usually trades memory for speed. This could explain the higher memory footprint (however, still a bit of a mystery why the footprint is the same for 1 or 8 executors) and regarding the speed: you might not gain much benefit from parallel processes due to a GIL fight, which might even be responsible for a slowdown in some cases, see for example the wonderful work of David Beazley (http://www.dabeaz.com/GIL/) which is from 2010 but still valid.

My current conclusion is: don't use parallel executors at all and simply go with .arrays(libary="pd") which takes 11GB of memory and 10 seconds.

I am still curious about the footprint of the uproot3 parallel approach.

shahidzk1 commented 3 years ago

Same clarification as on StackOverflow: I said that for all I know, it could be a small difference, like 10%. I didn't say that it is or should be 10%. The statement was about my lack of knowledge.

I am sorry for misquoting you, it is due to my English. It is my 3rd language and sometimes I may confuse may and should. I edited again my posts.

jpivarski commented 3 years ago

You should in principle be able to do the decompression and Dataframe creation with less than double the memory (~10GB) in worst case, assuming that each branch's data is independently stored, so that there is no overlapping data occupying the memory in each executor.

There's another doubling because all of the little arrays from the TBaskets have to be concatenated into a big array representing the whole TBranch. That particular doubling would not scale with the number of executor, since each task is responsible for a disjoint subset of the TBaskets and the single output array is common to all. So that's 1× 5GB for the individual TBaskets, 1× 5GB for the resulting array, and 1× 5GB for the Pandas DataFrame = 15GB. That's pretty close to 18GB.

Also please note that Python is garbage collected and sometimes it's a bit lazy of cleaning things up, so your mileage may vary and peak memory usage might give weird results.

If you have the memory available, the garbage collector won't bother cleaning it up, so you could get different results on a machine with a lower ceiling. It's not smart enough to delay tasks until previous ones are done and their garbage gets collected.

I don't know much about the internals of the uproot's arrays(library="pd") mechanics

I took a quick look at what library="pd" does differently (since it's such a big difference in this case). Assuming that the data are simple types, it just wraps each TBranch's fully concatenated array as a Series,

https://github.com/scikit-hep/uproot4/blob/446da9f5e9417a8aa2dc36958b80b9ee195f713b/uproot/interpretation/library.py#L899-L900

and puts those Series (called arrays here) into a new DataFrame using the standard constructor.

https://github.com/scikit-hep/uproot4/blob/446da9f5e9417a8aa2dc36958b80b9ee195f713b/uproot/interpretation/library.py#L932-L933

While writing this comment, I got sucked into it and investigated. (There's a few hours between this sentence and the previous.) This pandas.DataFrame constructor makes a copy of all the Series it is given, but the total memory usage can be minimized by appending one column at a time:

https://github.com/scikit-hep/uproot4/blob/38ee68bfd4f0c7d90fdf5d12a2d894fb42f629ad/uproot/interpretation/library.py#L765-L778

This is the first time I've ever used gc.collect() in production code, but without it, Python doesn't notice that it can garbage collect before running out of memory. It could be because Pandas's memory allocation is unconnected to the Python garbage collector. I would normally recommend strongly against such a thing, but this is a very limited use and only associated with making flat Pandas DataFrames. (Last month, I wrote my first goto in decades, so anything's possible.)

There were a few places where TBaskets (with associated raw data), arrays from TBaskets, and arrays before computing expressions could be deleted, which lowers the overall memory use before approaching Pandas, and these are the data that need to be collected with gc.collect().

None of this had anything to do with parallelizing execution. The parallel processing was completely done and we were back on a single thread before any unnecessary data could be deleted. Parallelizing the decompression does make it considerably faster, so this file is in the regime of spending most of its time in the GIL-released decompression routines.

jpivarski commented 3 years ago

The PR that trims memory usage is #281, so if that works for you, this issue can be closed.

tamasgal commented 3 years ago

OK that's a nice wrap-up!

If you have the memory available, the garbage collector won't bother cleaning it up, so you could get different results on a machine with a lower ceiling. It's not smart enough to delay tasks until previous ones are done and their garbage gets collected.

Exactly. Thanks for the better explanation. I had issues with this in past quite often where users were stuck in debugging memory leaks, instead you could easily limit the (V)RAM of the process and everything was "fine", so it was a no-problem. It is definitely a (let's call it) Python feature which can confuse and mislead people.

You made my day with gc.collect() and your goto 😆

Now it's time for @shahidzk1 to try #281. Unfortunately I cannot redo the test on the same machine currently because I am running a processing chain and need every possible memory address...