CoffeaTeam / coffea

Basic tools and wrappers for enabling not-too-alien syntax when running columnar Collider HEP analysis.
https://coffea-hep.readthedocs.io
BSD 3-Clause "New" or "Revised" License
134 stars 128 forks source link

Large dask graph size/memory usage when opening multiple correctionlib.CorrectionSet #1131

Open kawaho opened 4 months ago

kawaho commented 4 months ago

I am trying to use correctionlib within coffea+dask to apply a set of corrections. Specifically, I am applying jet smearing, pileup corrections, muon id/iso/trigger, electron reco/id scale factors (pretty standard workflow). The following code is a minimal example:

import awkward as ak
import dask_awkward as dak

from coffea import processor
from correctionlib import CorrectionSet

class MyProcessor(processor.ProcessorABC):
    def __init__(self, **kwargs):
      pass

    def process(self, events):

      #jet smearing     
      jerc = CorrectionSet.from_file(f'/cvmfs/cms.cern.ch/rsync/cms-nanoAOD/jsonpog-integration/POG/JME/2018_UL/jet_jerc.json.gz')
      jer_smear = CorrectionSet.from_file(f'/cvmfs/cms.cern.ch/rsync/cms-nanoAOD/jsonpog-integration/POG/JME/jer_smear.json.gz')['JERSmear']
      jets = events.Jet
      jets["event_rho"] = events.fixedGridRhoFastjetAll

      jets['jer'] = jerc['Summer19UL18_JRV2_MC_ScaleFactor_AK4PFchs'].evaluate(jets.eta, 'nom')
      jets['jersf'] = jerc['Summer19UL18_JRV2_MC_PtResolution_AK4PFchs'].evaluate(jets.eta, jets.pt, jets.event_rho)

      jersmear_factor = jer_smear.evaluate(jets.pt, jets.eta, jets.matched_gen.pt, jets.event_rho, events.event, jets.jer, jets.jersf)

      events['Jet', 'pt'] = jets.pt*jersmear_factor
      events['Jet', 'mass'] = jets.mass*jersmear_factor

      #PU Weights
      pu_sf = CorrectionSet.from_file("/cvmfs/cms.cern.ch/rsync/cms-nanoAOD/jsonpog-integration/POG/LUM/2018_UL/puWeights.json.gz")
      puWeight = pu_sf[f"Collisions18_UltraLegacy_goldenJSON"].evaluate(events.Pileup.nTrueInt, "nominal")
      weight = events.genWeight*puWeight

      #Mu ID/ISO SF
      m_sf = CorrectionSet.from_file("/cvmfs/cms.cern.ch/rsync/cms-nanoAOD/jsonpog-integration/POG/MUO/2018_UL/muon_Z.json.gz")
      M_collections = ak.pad_none(events.Muon[(events.Muon.pt > 26) & (abs(events.Muon.eta) < 2.4)], 1)[:,0]
      MuID_SF = m_sf["NUM_TightID_DEN_TrackerMuons"].evaluate(abs(M_collections.eta), M_collections.pt, "nominal")
      MuISO_SF = m_sf["NUM_TightRelIso_DEN_TightIDandIPCut"].evaluate(abs(M_collections.eta), M_collections.pt, "nominal")
      weight = weight*MuID_SF*MuISO_SF

      #SingleMu Trigger SF
      Trig_SF = m_sf['NUM_IsoMu24_DEN_CutBasedIdTight_and_PFIsoTight'].evaluate(abs(M_collections.eta), M_collections.pt, "nominal")
      weight = weight*Trig_SF

      #Electron SF
      e_sf = CorrectionSet.from_file("/cvmfs/cms.cern.ch/rsync/cms-nanoAOD/jsonpog-integration/POG/EGM/2018_UL/electron.json.gz")
      E_collections = ak.pad_none(events.Electron[(events.Electron.pt > 20) & (abs(events.Electron.eta) < 2.5)], 1)[:,0]
      EleReco_SF = e_sf["UL-Electron-ID-SF"].evaluate('2018', "sf", "RecoAbove20", E_collections.eta, E_collections.pt)
      EleID_SF = e_sf["UL-Electron-ID-SF"].evaluate('2018',"sf", "wp90iso", E_collections.eta, E_collections.pt)
      weight = weight*EleReco_SF*EleID_SF

      events['Jet', 'weight'] = ak.fill_none(weight, 0)

      quantities = ak.zip(
                             {
                              'jet_pt': ak.flatten(events['Jet']).pt,
                              'weight': ak.flatten(events['Jet']).weight,
                             }
                           )

      dak.to_parquet(
          quantities,
          "test_corr",
          compute=True,
      )

      return

    def postprocess(self, accumulator):
        pass

However, dask gives warning about large graph size

UserWarning: Sending large graph of size 18.62 MiB.
This may cause some slowdown.
Consider scattering data ahead of time and using futures.

and more importantly, on the dask dashboard, the workers show unreasonable memory usage (labelled as umanaged (old) memory in dask), for example with step_size=20_000,

Screenshot 2024-07-11 at 4 32 11 PM

Eventually, the jobs would fail because of the high memory usage.

If one chooses to run only the smearing or the scale factors, the problem disappears but the large graph size warning persists when running jet smearing alone.

coffea version is '2024.5.0' correctionlib version is '2.5.0'

lgray commented 4 months ago

Oh that's not a serious warning you may ignore it. I've not seen any serious slowdown with graphs that are 100s of MB in size.

lgray commented 4 months ago

The large memory usage you're observing is coming from another source.

kawaho commented 4 months ago

About the large memory, the number of workers that have particularly high memory usage causing the failure is exactly the number of correctionlib objects I am opening, which is 9 in this case. What I understood is that these workers are the ones dealing with the SFs/smearing stuff. Naively, I would expect if I launched x workers, there will be at least 9x of such tasks but the task number always remain 9 (even with like 100 workers). I wonder if it can be handled better with some tricks because a lot of memory is wasted.

lgray commented 3 months ago

@kawaho sorry for the long reply time.

This might be something we can fix by inlining the correctionlib objects in the graph so that it appears on each node that need to access correctionlib. We'd have to play with it a bit.

Right now it opens the correctionlib CorrectionSet 9 times for the whole workflow and transports the needed parts to the workers that need a specific correction within the file. I'll see if this can be done a little bit more leanly (at the cost of speed, probably).

I think you can also get some mileage of out not opening the correction sets each time you call process, but rather ones when you make the processor.