iris-hep / idap-200gbps-atlas

benchmarking throughput with PHYSLITE
6 stars 1 forks source link

Memory use and unmanaged memory in Dask #55

Open alexander-held opened 6 months ago

alexander-held commented 6 months ago

We need to understand why so much unmanaged memory is observed and whether the memory use overall is as expected. It naively seems rather high. Reproducer in https://gist.github.com/alexander-held/d23dda86bf82b83b0b9f6159b8091f07.

Worth trying out https://www.coiled.io/blog/introducing-the-dask-active-memory-manager presumably.

alexander-held commented 6 months ago

For comparison, here is a simple "black box" (for Dask) function with uproot.open to distribute. This looks very solid in comparison, very little unmanaged memory after some initial start-up and subsequent stable execution.

# create list of files to run over
all_files = []
for process in fileset:
    all_files += fileset[process]["files"]

# define work to be done
def uproot_open_materialize(fname):
    BRANCH_LIST = [
        "AnalysisJetsAuxDyn.pt", "AnalysisJetsAuxDyn.eta", "AnalysisJetsAuxDyn.phi", "AnalysisJetsAuxDyn.m",
        "AnalysisElectronsAuxDyn.pt", "AnalysisElectronsAuxDyn.eta", "AnalysisElectronsAuxDyn.phi",
        "AnalysisElectronsAuxDyn.m", "AnalysisMuonsAuxDyn.pt", "AnalysisMuonsAuxDyn.eta",
        "AnalysisMuonsAuxDyn.phi", "AnalysisJetsAuxDyn.EnergyPerSampling", "AnalysisJetsAuxDyn.SumPtTrkPt500",
        "AnalysisJetsAuxDyn.TrackWidthPt1000", "PrimaryVerticesAuxDyn.z", "PrimaryVerticesAuxDyn.x",
        "PrimaryVerticesAuxDyn.y", "AnalysisJetsAuxDyn.NumTrkPt500", "AnalysisJetsAuxDyn.NumTrkPt1000",
        "AnalysisJetsAuxDyn.SumPtChargedPFOPt500", "AnalysisJetsAuxDyn.Timing",
        "AnalysisJetsAuxDyn.JetConstitScaleMomentum_eta", "AnalysisJetsAuxDyn.ActiveArea4vec_eta",
        "AnalysisJetsAuxDyn.DetectorEta", "AnalysisJetsAuxDyn.JetConstitScaleMomentum_phi",
        "AnalysisJetsAuxDyn.ActiveArea4vec_phi", "AnalysisJetsAuxDyn.JetConstitScaleMomentum_m",
        "AnalysisJetsAuxDyn.JetConstitScaleMomentum_pt", "AnalysisJetsAuxDyn.EMFrac",
        "AnalysisJetsAuxDyn.Width", "AnalysisJetsAuxDyn.ActiveArea4vec_m", "AnalysisJetsAuxDyn.ActiveArea4vec_pt",
        "AnalysisJetsAuxDyn.DFCommonJets_QGTagger_TracksWidth", "AnalysisJetsAuxDyn.PSFrac",
        "AnalysisJetsAuxDyn.JVFCorr", "AnalysisJetsAuxDyn.DFCommonJets_QGTagger_TracksC1",
        "AnalysisJetsAuxDyn.DFCommonJets_fJvt", "AnalysisJetsAuxDyn.DFCommonJets_QGTagger_NTracks",
        "AnalysisJetsAuxDyn.GhostMuonSegmentCount", "AnalysisMuonsAuxDyn.muonSegmentLinks",
        "AnalysisMuonsAuxDyn.msOnlyExtrapolatedMuonSpectrometerTrackParticleLink",
        "AnalysisMuonsAuxDyn.extrapolatedMuonSpectrometerTrackParticleLink",
        "AnalysisMuonsAuxDyn.inDetTrackParticleLink", "AnalysisMuonsAuxDyn.muonSpectrometerTrackParticleLink",
        "AnalysisMuonsAuxDyn.momentumBalanceSignificance", "AnalysisMuonsAuxDyn.topoetcone20_CloseByCorr",
        "AnalysisMuonsAuxDyn.scatteringCurvatureSignificance", "AnalysisMuonsAuxDyn.scatteringNeighbourSignificance",
        "AnalysisMuonsAuxDyn.neflowisol20_CloseByCorr", "AnalysisMuonsAuxDyn.topoetcone20",
        "AnalysisMuonsAuxDyn.topoetcone30", "AnalysisMuonsAuxDyn.topoetcone40", "AnalysisMuonsAuxDyn.neflowisol20",
        "AnalysisMuonsAuxDyn.segmentDeltaEta", "AnalysisMuonsAuxDyn.DFCommonJetDr",
        "AnalysisMuonsAuxDyn.combinedTrackParticleLink", "AnalysisMuonsAuxDyn.InnerDetectorPt",
        "AnalysisMuonsAuxDyn.MuonSpectrometerPt", "AnalysisMuonsAuxDyn.clusterLink",
        "AnalysisMuonsAuxDyn.spectrometerFieldIntegral", "AnalysisElectronsAuxDyn.ambiguityLink",
        "AnalysisMuonsAuxDyn.EnergyLoss", "AnalysisJetsAuxDyn.NNJvtPass", "AnalysisElectronsAuxDyn.topoetcone20_CloseByCorr",
        "AnalysisElectronsAuxDyn.topoetcone20ptCorrection", "AnalysisElectronsAuxDyn.topoetcone20",
        "AnalysisMuonsAuxDyn.ptvarcone30_Nonprompt_All_MaxWeightTTVA_pt500_CloseByCorr",
        "AnalysisElectronsAuxDyn.DFCommonElectronsECIDSResult", "AnalysisElectronsAuxDyn.neflowisol20",
        "AnalysisMuonsAuxDyn.ptvarcone30_Nonprompt_All_MaxWeightTTVA_pt500", "AnalysisMuonsAuxDyn.ptcone40",
        "AnalysisMuonsAuxDyn.ptvarcone30_Nonprompt_All_MaxWeightTTVA_pt1000_CloseByCorr",
        "AnalysisMuonsAuxDyn.ptvarcone30_Nonprompt_All_MaxWeightTTVA_pt1000", "AnalysisMuonsAuxDyn.ptvarcone40",
        "AnalysisElectronsAuxDyn.f1", "AnalysisMuonsAuxDyn.ptcone20_Nonprompt_All_MaxWeightTTVA_pt500",
        "PrimaryVerticesAuxDyn.vertexType", "AnalysisMuonsAuxDyn.ptvarcone30", "AnalysisMuonsAuxDyn.ptcone30",
        "AnalysisMuonsAuxDyn.ptcone20_Nonprompt_All_MaxWeightTTVA_pt1000",
        "AnalysisElectronsAuxDyn.ptvarcone30_Nonprompt_All_MaxWeightTTVALooseCone_pt500", "AnalysisMuonsAuxDyn.CaloLRLikelihood"
    ]

    filter_name = lambda x: x in BRANCH_LIST

    size_uncompressed = 0
    try:
        with uproot.open(fname, filter_name=filter_name) as f:
            for b in BRANCH_LIST:
                print(b)
                f["CollectionTree"][b].array()
                size_uncompressed += f["CollectionTree"][b].uncompressed_bytes

            size_read = f.file.source.num_requested_bytes

        return {"read": size_read, "uncompressed": size_uncompressed, "failed": []}
    except OSError:
        return {"read": 0, "uncompressed": 0, "failed": [fname]}

print(f"running with {len(all_files)} files")
scattered_data = client.scatter([f for f in all_files])  # instead of submitting (possibly big) object directly
futures = client.map(uproot_open_materialize, scattered_data)
out = ak.Array([r for r in client.gather(iter(futures))])

print(f"total read: {sum(out['read']) / 1000**2:.2f} MB")
print(f"uncompressed: {sum(out['uncompressed']) / 1000**2:.2f} MB")

failed = ak.flatten(out["failed"])
print(f"{len(failed)} file(s) failed:")
for fname in failed:
    print(f"  {fname}")
lgray commented 6 months ago

A new piece of information, when we generate data and only use dask.array and dask-awkward:

import dask
import distributed
import dask.array as da
import dask_awkward as dak
import numpy as np

if __name__ == "__main__":

    with distributed.Client() as _:

        N_files = 10_000
        events_per_file = 100_128

        events = da.random.normal(size=N_files*events_per_file,chunks=events_per_file)

        dak_events = dak.from_dask_array(events)

        unflatten = dak.from_dask_array(da.tile(da.arange(448), N_files))

        jagged_events = dak.unflatten(dak_events, unflatten)

        nonzeros_axis1 = dak.count_nonzero(jagged_events, axis=1)

        nonzeros_reduction = dak.count_nonzero(nonzeros_axis1)

        computed = nonzeros_reduction.compute()

        print(computed)

The memory usage is pretty flat and even well into the run on my laptop it really hasn't budged at all: image

It would be good to make this a few orders of magnitude bigger and run it on a cluster.