dask-contrib / dask-awkward

Native Dask collection for awkward arrays, and the library to use it.
https://dask-awkward.readthedocs.io
BSD 3-Clause "New" or "Revised" License
61 stars 19 forks source link

Performance issue with `repartition` #509

Open yimuchen opened 5 months ago

yimuchen commented 5 months ago

I was testing a workflow of file skimming, and to account for the possibility that the rate that events of interest is very low in the skimming scheme, I attempted to use array.repartition to reduce the number of files that would be generated, as all file writing methods that I know of creates 1 file per partition.

I've provided a code to generate a set of dummy data that roughly matches the data schema (jagged arrays with very mismatch collection sizes), and performing a simple skim operation. What is observed is that during the is repartition is specified, the memory is pinned at ~5-7GB regardless of the partitioning scheme that is used defined by uproot. A suggestion to use dask.array.persist makes the computation of the array.repartition step takes a very long time and just as much memory.

This is how I am attempting to skim the files in question:

import uproot
import dask
from dask.distributed import Client

client = Client(processes=False, n_workers=1, threads_per_worker=1)

def make_skimmed_events(events):
    # Place your selection logic here
    skimmed = events[events.nJets > 10]
    skimmed["myCustom"] = 137 * 9.8
    return skimmed

events = uproot.dask({f"dummy_{idx}.root": "Events" for idx in range(0, 2)}, step_size=2_000) # This only helps save memory if no repartition is used.

print("Calculate skimm")
skimmed = make_skimmed_events(events)

# Trying persist
print("Calculating persisted")
persisted = skimmed.persist()
print("Calculating repartition")
parted = persisted.repartition(rows_per_partition=10_000)

# Or trying direct repartition doesn't work
# parted = skimmed.repartition(rows_per_partition=5_000)
print("Calculating running uproot write")
writer = uproot.dask_write(
    parted,
    destination="skimtest/",
    prefix="mytest/skimmed",
    compute=False,
)
print("Calculating the graphs")
dask.visualize(writer, filename="Skim_test_puredask.pdf")
dask.visualize(writer, filename="Skim_test_opt_puredask.pdf", optimize_graph=True)
print("Executing the final task")
dask.compute(writer, optimize_graph=False)

The data in question can be generated using this script (each file will be about 2.5GB in size)

import awkward as ak
import numpy as np
import uproot

for name_idx in range(0, 10):
    n_events = 100_000

    n_jets = np.random.poisson(lam=5, size=n_events)
    n_part = np.random.poisson(lam=300, size=n_events)
    n_obj = np.random.poisson(lam=75, size=n_events)

    jets_arr = ak.zip(
        {
            field: ak.unflatten(np.random.random(size=ak.sum(n_jets)), n_jets)
            for field in ["a", "b", "c", "d", "e", "f", "g", "j", "i"]
        }
    )
    part_arr = ak.zip(
        {
            field: ak.unflatten(np.random.random(size=ak.sum(n_part)), n_part)
            for field in ["a", "b", "c", "d", "e", "f", "g", "j", "i"]
        }
    )
    obj_arr = ak.zip(
        {
            field: ak.unflatten(np.random.random(size=ak.sum(n_obj)), n_obj)
            for field in ["a", "b", "c", "d", "e", "f", "g", "j", "i"]
        }
    )

    with uproot.recreate(f"dummy_{name_idx}.root") as f:
        f["Events"] = {
            "Jets": jets_arr,
            "Particles": part_arr,
            "Object": obj_arr,
        }
lgray commented 5 months ago

@martindurant when you have time - some thought here would be nice.

If we do this in two steps, i.e. write out files with no repartitioning and then repartition those files the memory issues vanish.

This is odd since the repartition shouldn't care about data that we've cut out, but some how it's acting like it needs all the data before cuts in the tasks doing the partition aggregation.

This really sounds like some data lifecycle problem.

martindurant commented 5 months ago

memory is pinned at ~5-7GB

It seems that the in-memory size of each (unfiltered) partition really is at least a few GB. This must be completely loaded, in fact each output partition will need inputs from multiple files. Only then do you do the filtering. As far as I know, it's no possible to filter during the process of streaming data from a file. I read-and-filter operation would be great! But this basic process is the reason that parquet partitions (for an example I am more familiar with) are usually <<100MB.

In addition, this whole workflow is entirely disk bound, since filtering is very fast. That means that any parallel tasks are trying to read various parts of various files and write too, all over the same bus. I don't really expect dask to be able to do anything for this case. (I realise that this is not the real workflow you want to run)

Nevertheless, this reminds me, that I think we considered at some point a type of repartition that is exactly every N inputs -> 1 output; that at least simplifies each task.

martindurant commented 5 months ago

this whole workflow is entirely disk bound

I might be wrong - the dask profile shows decompression is taking 93% of time

martindurant commented 5 months ago

btw: for me, the output only has 2805 rows, so it's a single partition.

yimuchen commented 5 months ago

I wasn't able to replicate this example [1] with just dask/uproot, but it looks like if we expose the dask_awkward structure before performing a repartition, this does help with the memory issue. I also tried to use the uproot.dask(step_size="100MB") [2] to force each unfiltered partition to be small, but it still takes upwards of 6GBs of memory in single threaded execution, meaning that the program is still loading multiple unfiltered partitions into memory. I was assuming that repartition would only be called after determining the size of the filtered partition, meaning that we only have at most 1 unfiltered partition in memory at the same time? Or is this too naive of a picture of what repartition is trying to do?

[1] https://github.com/CoffeaTeam/coffea/discussions/1100 [2] https://uproot.readthedocs.io/en/latest/uproot._dask.dask.html

lgray commented 5 months ago

I guess I'm not entirely clear as to why the repartition on the filtered data needs to have any knowledge at all about the filtered data... and why the filtered data is hanging around that long in dask worker. At the point it isn't needed any more it should just be dropped, and there's nothing in this workflow that needs to know about the original partitions for the repartition step, iiuc.

martindurant commented 5 months ago

When you say filtered.repartition(num_rows=), dask-awkward needs to know the number of rows per input partition, in order to know which of them belong in a given output partition. This means loading the whole of the filter predicate, partition by partition. It then loads the (whole of) each input partition and filters them. In the given example, all of the input partitions end up in the same output partition.

uproot.dask(step_size="100MB")

I don't know what this does.

yimuchen commented 5 months ago

I see... after running the same process with a parquet file. I'm seeing this behavior as well, so this is a limit of dask, not some strange interaction with uproot.

So if I'm understanding this restriction correctly, this is mainly because repartitions must be evaluated during the generation of the task graph, so it needs to have all inputs available. Would it be possible to have a different method of obtaining the repartition scheme, where we aggregate filtered results as they arrive? Or this is fundamentally at odds with the paradigm of dask?


The step_size="100MB" attempts to limit each partition extracted by uproot to be no larger than 100MB.

lgray commented 5 months ago

Right but once it is done loading the input partitions and filtering them it doesn't need to keep the original input data in memory.

What's really weird is that if I put a .persist() as a firebreak before making the repartition it still keeps all the memory for reading in the unfiltered data and that unfiltered data should be completely meaningless to the repartitioning data at that point.

martindurant commented 5 months ago

Given the new options in #250 , can we try again?

yimuchen commented 5 months ago

Hi @martindurant, just tried out on the latest master branch.

The current implementation does not play nicely with uproot or dak.to_parquet, in particular this line in uproot, as npartitions is now set to 0 after a repartition(n_to_one=1xx) operation (Any value assigned to n_to_one seems to give this result).

This I'm guessing can be a "simple" fix in uproot with by changing npartitions to max([npartitions, 1]), but I want to check what is meant by npartitions=0, and where this simple fix may break things down the line.

If I add this additional new check to uproot, we do get much more reasonable memory usage (a new hundred MB usage rather than a few GB).

yimuchen commented 5 months ago

If the assignment of zfill is purely aesthetic [1,2], I think this fix should be sufficient? Though it would break the output file parity with and without repartitioning if we use a trivial repartition like n_to_one=1.

[1] https://github.com/scikit-hep/uproot5/blob/main/src/uproot/writing/_dask_write.py#L36 [2] https://github.com/dask-contrib/dask-awkward/blob/main/src/dask_awkward/lib/io/parquet.py#L502

martindurant commented 5 months ago

The current implementation does not play nicely with uproot or dak.to_parquet

Since parquet does not have that line, what is the problem there?

martindurant commented 5 months ago

(remembering that in your previous version, the number of output partitions was actually 1)

yimuchen commented 5 months ago

The current implementation does not play nicely with uproot or dak.to_parquet

Since parquet does not have that line, what is the problem there?

to_parquet actually also calls a similar line here [1]. I think this is purely an aesthetic choice of having the index in the files names be 00x, 0xy, xyz is there are an order of 1000-9000 files being written for example. At the time the function is called, arr.npartitions evaluates to 0. [1] https://github.com/dask-contrib/dask-awkward/blob/main/src/dask_awkward/lib/io/parquet.py#L491

martindurant commented 5 months ago

OK, so the actual problem is, that the output of the repartition shows as having 0 partitions?

yimuchen commented 5 months ago

Yes, arrays after a repartition(num_to_one) has npartitions=0. I have tried to have an additional eager_compute_divisions(), but that still renders a partition count of 0. I also just noticed that this is no output to be generated (I was accidentally looking at the old files generated), though some computation is being processing.

yimuchen commented 5 months ago

I just had time to test a bit more. In my test, the original array will be separated into 52 partition.

martindurant commented 5 months ago

https://github.com/dask-contrib/dask-awkward/pull/517 should fix those issues for you (tests included). In your case, you wanted n_to_one=52 which is the same as npartitions=1.

yimuchen commented 5 months ago

This new implementation seems to have problems when loading from files:

import dask_awkward as dak

array = dak.from_lists([[[1, 2, 3], [], [4, 5]]] * 100)
array.to_parquet("test.parquet")

array2 = dak.from_parquet("test.parquet")
array2.repartition(n_to_one=10) # Fails on this line: 

The full error message is:

File "/srv/dask-awkward/src/dask_awkward/lib/core.py", line 1006, in repartition
    new_layer_raw, new_divisions = simple_repartition_layer(
                                   ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/srv/dask-awkward/src/dask_awkward/lib/structure.py", line 1424, in simple_repartition_layer
    (arr.name, part) for part in range(new_divisions[-2], new_divisions[-1])
                                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: 'NoneType' object cannot be interpreted as an integer
martindurant commented 5 months ago

Can you try with https://github.com/dask-contrib/dask-awkward/pull/518 ?

yimuchen commented 5 months ago

This now seems to be working! Without any modifications to uproot required.

I think the only last piece that I wanted to confirm is that there seems to be a massive discrepancy in memory usage if repartition is called second-to-last step as opposed to being the last step before running a write call. An extra "conversion" function is required for uproot writing to avoid double-ly nested structures appearing in the awkward array (see the uproot_writable function here [1]). If this is after the repartition method, we get back our rather massive memory footprint.

Is this a behavior we should expect? I wanted to have a record to help future users avoid some hidden gotchas, in case things change in the future (I've attached the calculated tasks graphs and recorded memory footprint with the conversion placed before and after the repartition call in this thread)

re2conv_opt_new.pdf conv2re_opt_new.pdf memusage_conv2re_new.csv memusage_re2conv_new.csv

[1]https://github.com/CoffeaTeam/coffea/discussions/1100

lgray commented 5 months ago

@yimuchen uproot_writeable should be able to operate correctly using only the _meta (the typetracer) of the dask_awkward array, you don't need to map_partitions it or apply all those ak.to_packed to the actual data to normalize the forms.

You may be able to just drop it?

yimuchen commented 5 months ago

I tried without the ak.to_packed, and I still run into the same memory issue. I think this is not strictly an issue with repartition (since I also saw something similar before the reparition fix). Should I mark this issue as closed and open a new issue if this becomes a recuring problem?

lgray commented 5 months ago

I'm fine with tracking it down a bit more here so we understand. Memory problems are pretty crucial for us so we should make sure it isn't somehow related.

yimuchen commented 3 months ago

Pardon me, was caught up with other items, but I managed to get a script that better illustrates the issue. This script can be ran with just the master branch of dask_awkward and uproot, and the dummy input will be automatically generated.

import dask_awkward as dak
import numpy as np
import uproot
from dask.distributed import Client

import awkward as ak

def make_events(rng: np.random.Generator, n_events: int):
    # Event-level variables
    events = ak.zip({f"prop_{idx}": rng.random(size=n_events) for idx in range(10)})

    # Large collection with many entries per event
    n_entries = rng.poisson(lam=300, size=n_events)
    large_col = ak.zip(
        {f"prop_{idx}": rng.random(size=ak.sum(n_entries)) for idx in range(10)}
    )
    events["large_col"] = ak.unflatten(large_col, n_entries)

    # Small collections with a handul of entries per event
    n_entries = rng.poisson(lam=10, size=n_events)
    small_col = ak.zip(
        {f"prop_{idx}": rng.random(size=ak.sum(n_entries)) for idx in range(10)}
    )
    events["small_col"] = ak.unflatten(small_col, n_entries)
    return events

def make_skimmed_events(events):
    return events[events.prop_0 < 0.1]  # Random 10% file reduction

if __name__ == "__main__":
    # Creating single thread client to better monitor performance
    client = Client(processes=False, n_workers=1, threads_per_worker=1)

    rng = np.random.default_rng(seed=123456)
    for file_idx in range(10):
        print("Making file", file_idx, "...")
        events = make_events(rng, 10_000)  # Each file will take ~200MB
        ak.to_parquet(events, f"unskimmed_{file_idx}.parquet")
        with uproot.recreate(f"unskimmed_{file_idx}.root") as f:
            f["Event"] = {k: events[k] for k in events.fields}

    print("Skimming with parquet file inputs")
    events = dak.from_parquet("unskimmed_*.parquet")
    events = make_skimmed_events(events)
    events = events.repartition(
        n_to_one=20
    )  # Given our basic estimate, everything should fit in one file
    dak.to_parquet(events, "skimmed.parquet")

    print("Skimming with root file inputs")
    events = uproot.dask("unskimmed_*.root")
    events = events.repartition(n_to_one=20)
    events = make_skimmed_events(events)
    uproot.dask_write(events, "skimmed.root")

Using memory_profiler [1] to map the memory usage, we get the following

test

Basically, we still get a very large spike in memory usage, despite attempting to merge the results. This is true both for parquet and uproot file writing. The magnitude of these memory usage peaks appears regardless of the filtering efficiency.

If I disable the repartition line, I get a much more reasonable memory usage, but with much more fragmented outputs.

test_no_repart

Let me know if any more testing could provide more insight.

[1] https://pypi.org/project/memory-profiler/

yimuchen commented 3 months ago

Another discover that might help pin down what is causing this memory consumption is that if we attempt to strip the event before repartitioning:

events = events[["small_col"]] # New line to strip down save content
events = events.repartition(n_to_one=20)

This reduces the memory usage for dask.to_parquet [1], but not of uproot.dask_write [2], so the differences in these 2 implementations may be and indication for what part of memory is not being released?

[1] https://github.com/dask-contrib/dask-awkward/blob/main/src/dask_awkward/lib/io/parquet.py#L511 [2] https://github.com/scikit-hep/uproot5/blob/main/src/uproot/writing/_dask_write.py#L45