scikit-hep / uproot3

ROOT I/O in pure Python and NumPy.
BSD 3-Clause "New" or "Revised" License
314 stars 67 forks source link

Memory usage by daskframe #497

Open wiso opened 4 years ago

wiso commented 4 years ago

I need to open several TTree from different files and to merge them. They are quite big and so I am using daskframe, something like this:

for info, fn in catalogue.items():
    df = uproot.daskframe(fni, "CollectionTree", branches) 
    df = df.rename(columns = lambda x: x.replace('HGamEventInfoAuxDyn.', '').replace('HGamTruthEventInfoAuxDyn.', 'truth_'))     df['isPassed'] = df['isPassed'].astype(bool)
    df['mc'] = info[0]
    df['prod'] = info[1]
    all_df.append(df)

I was convinced that this was very fast, I guess you just need to known how many events you have in each TTree and the type of each branches.

On the contrary this is very slow and uses a lot of memory. See plot.

uproot: 3.10.12 plot2

jpivarski commented 4 years ago

I don't know what dask.frame does when it merges DataFrames, but it looks like it's evaluating them. It depends on whether they wanted "merge" to be a lazy operation (pushing the work of actually merging down to the next operation, which can multiply the number of times that it happens) or they wanted "merge" to be eager (which means that columns are loaded that might not be used later). They both have advantages and disadvantages.

As we've accumulated experience with lazy data types, I'm becoming less enamored with them. It often trades having to think up-front about what's in memory when with having to think about it later, after disaster strikes. If the delayed read and caching algorithms are perfect for a given workflow, the performance cost is still not any better than a carefully designed eager workflow, so all it buys is not having to worry about the details in cases when it works.

The laziness algorithm (when to load what, given that you want to compute a particular thing) is in the computation framework—in this case Dask. (I've had to think about it in the context of PartitionedArrays and VirtualArrays in Awkward Array, another computation framework, and there are some hard problems there.) Uproot just initiates a read when Dask asks for one.

wiso commented 4 years ago

@jpivarski: forget the merging. The plot is during the loop I have posted.

jpivarski commented 4 years ago

Well, any sane implementation of rename would remain lazy, but

df['isPassed'] = df['isPassed'].astype(bool)

forces an eager read. I wonder if the memory is all due to that column.

If it's the case that any other columns are being read, then that's an Uproot error.

I suppose it's also possible that the time and memory usage is due to reading the TFile and TTree metadata, which includes all the TStreamerInfo and generating Python classes for each type, for each TFile. I've been fixing exactly that in Uproot4 (since it requires deep modifications).

wiso commented 4 years ago

I tried to remove everything except for

for info, fn in catalogue.items():
    df = uproot.daskframe(fni, "CollectionTree", branches) 
    all_dfs.append(df)

still I see an increase of memory (actually jupyter was killed)

plot4

jpivarski commented 4 years ago

How many files is it, how many branches, and how many classes? These things would add up because we need to open each file to get the metadata to Dask (the number of events and columns).

wiso commented 4 years ago

~20 files (actually the plot show only ~3 files), eight branches, these are ATLAS mxAOD so I am not sure how to reply "how many classes": they can be read by plain root and you see just a set of branches.

jpivarski commented 4 years ago

I meant the classes defined in the TStreamerInfo, which aren't lazily read in Uproot3. The other parameters do sound small, though.

ademus4 commented 4 years ago

I also see very high memory usage of daskframe. If I read a file with on disk size 2.7G, htop tells me I'm using 14.1G (RES), more than I expect considering compression (factor ~2 I would assume). Usually with dask I get minimal memory usage until I call "compute" on my dataframe.