dask-contrib / dask-histogram

Histograms with task scheduling.
https://dask-histogram.readthedocs.io
BSD 3-Clause "New" or "Revised" License
23 stars 4 forks source link

histgram filling not reproducible except in "processes" scheduler? #100

Closed lgray closed 1 year ago

lgray commented 1 year ago

@iasonkrom @NJManganelli

@douglasdavis we've run into a problem where the bins filled for the histogram are mangled in the case of running in the sync, threads, and distributed executor, but somehow not processes (that or we haven't triggered it yet).

The plot below shows the difference between the first run of filling a histogram and then resetting that histogram and filling again with varying number of threads. If things were OK this would be a flat line! image

What's interesting is that the total number of events filled does not change but the per-bin contents do!

We're trying to work on a minimal repro - but this is clearly not really acceptable behavior, so posting before we have the complete picture.

douglasdavis commented 1 year ago

Lindsey Gray @.***> writes:

@iasonkrom @NJManganelli

@douglasdavis we've run into a problem where the bins filled for the histogram are mangled in the case of running in the sync, threads, and distributed executor, but somehow not processes (that or we haven't triggered it yet).

The plot below shows the difference between the first run of filling a histogram and then resetting that histogram and filling again with varying number of threads. If things were OK this would be a flat line! image

What's interesting is that the total number of events filled does not change but the per-bin contents do!

We're trying to work on a minimal repro - but this is clearly not really acceptable behavior, so posting before we have the complete picture.

Alright thanks for bringing to my attention- I'll keep an eye out for a reproducer and try to be useful debugging :)

NJManganelli commented 1 year ago

Yeah, I have stripped away some, but still need to remove the rest of the coffea stuff, and I will post that (near-)minimal reproducer here

lgray commented 1 year ago

I couldn't get anything to reproduce just using dask.array.random even with a billion inputs. :-/

NJManganelli commented 1 year ago

So... non-trivial. The below is not a minimal reproducer yet, and it needs to be shown that swapping the input file to something else publicly available(maybe the CMS Open Data files?) will still show the bug

Iason should lay out how to install their environment and getting into it. It may not be integral, but I don't know if this will apply to other environment setups equally well (pip list freeze output is attached, though).

With Iason's package installed, and using a single file from the dataset being studied, I was able to reduce things to the below.

Things are finnicky, and several times it seemed a small change 'eliminated' the bug, and it's unclear if it did or it was chance, because sometimes it'll complete without showing discrepancies (as I discovered later on). Some things that seemed to result in this included: eliminating some of the fills (normally 2 per histogram), eliminating the eta histograms entirely, simplifying the filter_events function to be nearly-trivial (restrict to at least 2 electrons and accept all electrons with a trivially-true pass_selection)...

import os

import dask_awkward as dak
import awkward as ak
from coffea.lumi_tools import LumiMask

import hist
import matplotlib.pyplot as plt

from egamma_tnp.utils import get_nanoevents_file

def filter_events(events, pt):
    events = events[dak.num(events.Electron) >= 2]
    abs_eta = abs(events.Electron.eta)
    pass_eta_ebeegap = (abs_eta < 1.4442) | (abs_eta > 1.566)
    pass_tight_id = events.Electron.cutBased == 4
    pass_pt = events.Electron.pt > pt
    pass_eta = abs_eta <= 2.5
    pass_selection = pass_pt & pass_eta & pass_eta_ebeegap & pass_tight_id
    n_of_tags = dak.sum(pass_selection, axis=1)
    good_events = events[n_of_tags >= 2]
    good_locations = pass_selection[n_of_tags >= 2]

    return good_events, good_locations

def perform_tnp(events, pt, goldenjson):
    #if goldenjson:
    #    events = apply_lumimasking(events, goldenjson)
    good_events, good_locations = filter_events(events, pt)
    ele_for_tnp = good_events.Electron[good_locations]

    zcands1 = dak.combinations(ele_for_tnp, 2, fields=["tag", "probe"])
    zcands2 = dak.combinations(ele_for_tnp, 2, fields=["probe", "tag"])
    #p1, a1 = find_probes(zcands1.tag, zcands1.probe, good_events.TrigObj, pt)
    #p2, a2 = find_probes(zcands2.tag, zcands2.probe, good_events.TrigObj, pt)
    p1 = zcands1.probe#[zcands1.probe.eta > -1.3]
    a1 = zcands1.probe
    p2 = zcands2.probe#[zcands2.probe.eta > -1.3]
    a2 = zcands2.probe

    return p1, a1, p2, a2, 

def get_pt_and_eta_arrays(events, pt, goldenjson):
    p1, a1, p2, a2 = perform_tnp(events, pt, goldenjson)

    pt_pass1 = dak.flatten(p1.pt)
    pt_pass2 = dak.flatten(p2.pt)
    pt_all1 = dak.flatten(a1.pt)
    pt_all2 = dak.flatten(a2.pt)

    eta_pass1 = dak.flatten(p1.eta)
    eta_pass2 = dak.flatten(p2.eta)
    eta_all1 = dak.flatten(a1.eta)
    eta_all2 = dak.flatten(a2.eta)

    return (
        pt_pass1,
        pt_pass2,
        pt_all1,
        pt_all2,
        eta_pass1,
        eta_pass2,
        eta_all1,
        eta_all2,
    )

def get_tnp_histograms(events, pt, goldenjson):
    import json
    import os

    import hist
    from hist.dask import Hist

    ptbins = [5, 10, 15, 20, 22, 26, 28, 30, 32, 34, 36, 38, 40, 45, 50, 60, 80, 100, 150, 250, 400]
    etabins = [-2.5, -2.4, -2.3, -2.2, -2.1, -2.0, -1.9, -1.8, -1.7, -1.566, -1.4442, -1.3, -1.2, -1.1, -1.0, -0.9, -0.8, -0.7, -0.6, -0.5, -0.4, -0.3, -0.2, -0.1, 0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1, 1.1, 1.2, 1.3, 1.4442, 1.566, 1.7, 1.8, 1.9, 2.0, 2.1, 2.2, 2.3, 2.4, 2.5]
    (
        pt_pass1,
        pt_pass2,
        pt_all1,
        pt_all2,
        eta_pass1,
        eta_pass2,
        eta_all1,
        eta_all2,
    ) = get_pt_and_eta_arrays(events, pt, goldenjson)

    ptaxis = hist.axis.Variable(ptbins, name="pt")
    hpt_all = Hist(ptaxis)
    hpt_pass = Hist(ptaxis)

    etaaxis = hist.axis.Variable(etabins, name="eta")
    heta_all = Hist(etaaxis)
    heta_pass = Hist(etaaxis)

    hpt_pass.fill(pt_pass1)
    hpt_pass.fill(pt_pass2)
    #hpt_all.fill(pt_all1)
    #hpt_all.fill(pt_all2)
    heta_pass.fill(eta_pass1)
    heta_pass.fill(eta_pass2)
    #heta_all.fill(eta_all1)
    #heta_all.fill(eta_all2)

    #return hpt_pass, hpt_all, heta_pass, heta_all
    return hpt_pass, heta_pass

def get_and_compute_tnp_histograms(events, pt, goldenjson, scheduler, progress):
    import dask
    from dask.diagnostics import ProgressBar

    (
        hpt_pass,
        #hpt_all,
        eta_pass,
        #eta_all,
    ) = get_tnp_histograms(events, pt, goldenjson)

    if progress:
        pbar = ProgressBar()
        pbar.register()

    res = dask.compute(
        hpt_pass,
        #hpt_all,
        eta_pass,
        #eta_all,
        scheduler=scheduler,
    )

    if progress:
        pbar.unregister()

    return res

class TagNProbe:
    def __init__(
        self,
        names,
        trigger_pt,
        *,
        goldenjson=None,
        toquery=False,
        redirect=False,
        custom_redirector="root://cmsxrootd.fnal.gov/",
        invalid=False,
        preprocess=False,
        preprocess_args={},
    ):
        """Tag and Probe for HLT Trigger efficiency from NanoAOD.

        Parameters
        ----------
            names : str or list of str
                The dataset names to query that can contain wildcards or a list of file paths.
            trigger_pt : int or float
                The Pt threshold of the trigger.
            goldenjson : str, optional
                The golden json to use for luminosity masking. The default is None.
            toquery : bool, optional
                Whether to query DAS for the dataset names. The default is False.
            redirect : bool, optional
                Whether to add an xrootd redirector to the files. The default is False.
            custom_redirector : str, optional
                The xrootd redirector to add to the files. The default is "root://cmsxrootd.fnal.gov/".
                Only used if redirect is True.
            invalid : bool, optional
                Whether to include invalid files. The default is False.
                Only used if toquery is True.
            preprocess : bool, optional
                Whether to preprocess the files using coffea.dataset_tools.preprocess().
                The default is False.
            preprocess_args : dict, optional
                Extra arguments to pass to coffea.dataset_tools.preprocess(). The default is {}.
        """
        self.names = names
        self.pt = trigger_pt - 1
        self.goldenjson = goldenjson
        self.toquery = toquery
        self.redirect = redirect
        self.custom_redirector = custom_redirector
        self.invalid = invalid
        self.events = None
        self.preprocess = preprocess
        self.preprocess_args = preprocess_args
        self.file = get_nanoevents_file(
            self.names,
            toquery=self.toquery,
            redirect=self.redirect,
            custom_redirector=self.custom_redirector,
            invalid=self.invalid,
            preprocess=self.preprocess,
            preprocess_args=self.preprocess_args,
        )
        if goldenjson is not None and not os.path.isfile(goldenjson):
            raise ValueError(f"Golden JSON {goldenjson} does not exist.")

    def load_events(self, from_root_args={}):
        """Load the events from the names.

        Parameters
        ----------
            from_root_args : dict, optional
                Extra arguments to pass to coffea.nanoevents.NanoEventsFactory.from_root().
                The default is {}.
        """
        from coffea.nanoevents import NanoEventsFactory

        self.events = NanoEventsFactory.from_root(
            self.file,
            permit_dask=True,
            **from_root_args,
        ).events()

tag_n_probe = TagNProbe(
    ["../d45e45dd-5dd8-4cf3-a03a-141a6ff45d44.root"],
    32,
    #goldenjson="../json/Cert_Collisions2023_366442_370790_Golden.json",
    goldenjson = None,
    toquery=False,
    redirect=False,
    preprocess=False,
    preprocess_args={"maybe_step_size": 100_000},
)
print("Done preprocessing")

print("Starting to load events")
tag_n_probe.load_events(
    from_root_args={"uproot_options": {"timeout": 120}, "chunks_per_file": 10}
)

tag_n_probe = TagNProbe(
    ["../d45e45dd-5dd8-4cf3-a03a-141a6ff45d44.root"],
    32,
    #goldenjson="../json/Cert_Collisions2023_366442_370790_Golden.json",
    goldenjson = None,
    toquery=False,
    redirect=False,
    preprocess=False,
    preprocess_args={"maybe_step_size": 100_000},
)
print("Done preprocessing")

print("Starting to load events")
tag_n_probe.load_events(
    from_root_args={"uproot_options": {"timeout": 120}, "chunks_per_file": 10}
)

tmp1 = get_and_compute_tnp_histograms(tag_n_probe.events, 25.0, None, "threads", True)
tmp2 = get_and_compute_tnp_histograms(tag_n_probe.events, 25.0, None, "threads", True)
tmp3 = get_and_compute_tnp_histograms(tag_n_probe.events, 25.0, None, "threads", True)
tmp4 = get_and_compute_tnp_histograms(tag_n_probe.events, 25.0, None, "threads", True)

stk = hist.Stack.from_dict(
    {
     "threads_1": tmp1[0] - tmp1[0],
     "threads_2": tmp2[0] - tmp1[0],
     "threads_3": tmp3[0] - tmp2[0],
     "threads_4": tmp4[0] - tmp3[0],
    }
)
stk.plot()
plt.legend()

print((tmp1[0] - tmp2[0]).values())
print((tmp3[0] - tmp2[0]).values())

iason_packages.txt

ikrommyd commented 1 year ago

https://github.com/iasonkrom/egamma-tnp/blob/debug_dask_histogram/debug_dask_histogram.ipynb

I'm also working here now that I got some time. I've removed all the classes and kept only the basic functionality of my code in one notebook. No install of egamma-tnp is required and the only requirements are what is imported (coffea, awkward, hist....etc). I'll keep striping down the code to make it as minimal as possible.

However, @NJManganelli pointed out (and it can also be seen in the snippet), the bug is also there if you run on only one file. However I don't see when I'm running on two files locally while I see it on one file on a cluster. Makes me think....is it architecture based? I have an apple sillicon mac. Anyways, I'll try it also on my old x86 ubuntu laptop and get to the bottom of this.

I'll also try to find a file that is publicly accesible and not private CMS data. I will be commiting to that notebook every time I strip something down and keep updating here

lgray commented 1 year ago

I think the first thing to do is to attempt a reproducer without nanoevents.

ikrommyd commented 1 year ago

Ok I got something, it IS INDEED architecture based. I'm getting the same effect on my intel ubuntu laptop, LPC and LXPLUS while the effect is not there on my m1 mac.

If you look at the notebook's last cell, the arrays in the cell output should be the same and the underflow bin should always be empty since I'm doing pass_pt = events.Electron.pt > pt but that's not the case. Also note that the sum always stays the same.

I will try to strip out as much stuff as possible to try and find whether a specific awkward operation is causing the bug and also try to reproduce it in plain uproot.

lgray commented 1 year ago

Oh fun - I've been doing this on my laptop which is also an m1. I'll try my very simple tests on an x86 machine.

ikrommyd commented 1 year ago

At least, we know it's not a scaling issue that happens only on clusters and/or over big datasets

lgray commented 1 year ago

What's a full path on xrootd for the file you're using?

lgray commented 1 year ago
import hist.dask as hda
import dask.array as da
import dask_awkward as dak
import uproot
import pickle

from distributed import Client

if __name__ == "__main__":

    events = uproot.dask({"./d45e45dd-5dd8-4cf3-a03a-141a6ff45d44.root": "Events"}, steps_per_file=10)

    dx = hda.Hist.new.Variable([0, 10, 20, 30, 40, 80, 120, 200], name="x").Double()

    dx.fill(x=dak.flatten(events.Electron_pt[events.Electron_pt > 32]))

    with Client() as client:
        for _ in range(10):
            print(dx.compute().values(flow=True))

Does not reproduce the error, adding back more.

lgray commented 1 year ago
import hist.dask as hda
import dask.array as da
import dask_awkward as dak
import uproot
from coffea.nanoevents import NanoEventsFactory
import pickle

from distributed import Client

if __name__ == "__main__":

    #events = uproot.dask({"./d45e45dd-5dd8-4cf3-a03a-141a6ff45d44.root": "Events"}, steps_per_file=10)
    events = NanoEventsFactory.from_root({"./d45e45dd-5dd8-4cf3-a03a-141a6ff45d44.root": "Events"}, permit_dask=True, chunks_per_file=10).events()

    dx = hda.Hist.new.Variable([0, 10, 20, 30, 40, 80, 120, 200], name="x").Double()

    #dx.fill(x=dak.flatten(events.Electron_pt[events.Electron_pt > 32]))
    dx.fill(x=dak.flatten(events.Electron.pt[events.Electron.pt > 32]))

    with Client() as client:
        for _ in range(10):
            print(dx.compute().values(flow=True))

Also does not reproduce the behavior. It is therefore unlikely to be nanoevents.

lgray commented 1 year ago
import hist.dask as hda
import dask.array as da
import dask_awkward as dak
import uproot
from coffea.nanoevents import NanoEventsFactory
import pickle

from distributed import Client

if __name__ == "__main__":

    #events = uproot.dask({"./d45e45dd-5dd8-4cf3-a03a-141a6ff45d44.root": "Events"}, steps_per_file=10, open_files=False)                                                                                                                            
    events = NanoEventsFactory.from_root({"./d45e45dd-5dd8-4cf3-a03a-141a6ff45d44.root": "Events"}, permit_dask=True, chunks_per_file=10).events()

    dx = hda.Hist.new.Variable([0, 10, 20, 30, 40, 80, 120, 200], name="x").Double()

    #combos = dak.combinations(events.Electron_pt, 2, fields=["first", "second"])                                                                                                                                                                    
    combos = dak.combinations(events.Electron, 2, fields=["first", "second"])

    #dx.fill(x=dak.flatten(combos.second[combos.first > 32]))                                                                                                                                                                                        
    dx.fill(x=dak.flatten(combos.second.pt[combos.first.pt > 32]))

    with Client() as client:
        for _ in range(10):
            print(dx.compute().values(flow=True))

Here is a minimal reproducer. If you uncomment raw uproot and recomment the nanoevents the reproducibility issue goes away.

This does not yet clarify if it is an issue with nanoevents or an issue with zipping/broadcasting.

lgray commented 1 year ago

even more minimal, no coffea needed (@agoose77 @jpivarski):

import hist.dask as hda
import dask.array as da
import dask_awkward as dak
import uproot
import pickle

from distributed import Client

if __name__ == "__main__":

    events = uproot.dask({"./d45e45dd-5dd8-4cf3-a03a-141a6ff45d44.root": "Events"}, steps_per_file=10, open_files=False)

    dx = hda.Hist.new.Variable([0, 10, 20, 30, 40, 80, 120, 200], name="x").Double()

    electrons = dak.zip({"pt": events.Electron_pt})

    combos = dak.combinations(electrons, 2, fields=["first", "second"])

    dx.fill(x=dak.flatten(combos.second.pt[combos.first.pt > 32]))

    with Client() as client:
        for _ in range(10):
            print(dx.compute().values(flow=True))
martindurant commented 1 year ago

^ do you still get the problem here with sync/threaded ?

lgray commented 1 year ago

anecdotally yes - but I'll check with this minimal repro now.

lgray commented 1 year ago

threads but not sync for the variations from run to run - however the input data are mangled, there should not be any entries in the underflow bin and there are.

lgray commented 1 year ago

Strangely if I set threads_per_worker=1 on the Client() the error persists.

ikrommyd commented 1 year ago

This was also the case in my runs with the full package. threads_per_worker=1 wasn't fixing it but it was also there the sync scheduler

lgray commented 1 year ago

Here is a reproducer without uproot, distributed, or threads:

import hist.dask as hda
import dask.array as da
import dask_awkward as dak
import numpy as np

if __name__ == "__main__":

    dar = 200*da.random.random((100_000, 3)).astype(np.float32) + 32
    dak_ar = dak.from_dask_array(dar)

    dx = hda.Hist.new.Variable([0, 10, 20, 30, 40, 80, 120, 200], name="x").Double()

    electrons = dak.zip({"pt": dak_ar})

    combos = dak.combinations(electrons, 2, fields=["first", "second"])

    dx.fill(x=dak.flatten(combos.second.pt))

    for _ in range(10):
        print(dx.compute(scheduler="sync").values(flow=True))

if I remove dak.combinations the problem goes away entirely.

ikrommyd commented 1 year ago

You're doing all this on x86 right?

lgray commented 1 year ago

Yes.

lgray commented 1 year ago

OK I have a reproducer without dask.

import hist
import numpy as np
import awkward as ak

if __name__ == "__main__":

    ar = ak.Array(200*np.random.random((100_000, 3)).astype(np.float32) + 32)

    electrons = ak.zip({"pt": ar})

    combos = ak.combinations(electrons, 2, fields=["first", "second"])

    for _ in range(10):
        x = hist.Hist.new.Variable([0, 10, 20, 30, 40, 80, 120, 200], name="x").Double()
        print(x.fill(x=ak.flatten(combos.second.pt)).values(flow=True))

This fails and results in mangled, irreproducible data.

import hist
import numpy as np
import awkward as ak

if __name__ == "__main__":

    ar = ak.Array(200*np.random.random((100_000, 3)).astype(np.float32) + 32)

    combos = ak.combinations(ar, 2, fields=["first", "second"])

    for _ in range(10):
        x = hist.Hist.new.Variable([0, 10, 20, 30, 40, 80, 120, 200], name="x").Double()
        print(x.fill(x=ak.flatten(combos.second)).values(flow=True))

Executes perfectly fine so it is some interaction between zip and combinations! Though I haven't removed hist from the possible list of suspects.

@agoose77 @jpivarski - I will open an issue on awkward. This is critical.

lgray commented 1 year ago

Attempting this on arm64 (apple silicon) there are no problems.

ikrommyd commented 1 year ago

I just noticed that in the pure awkward case, the result is wrong but still consistent at every step of the for loop. Is it because in the daskified case, the array is a different sample at every compute call?

ikrommyd commented 1 year ago

Also, when I was using the entire egamma-tnp tool with dask, it was giving random AND wrong histograms every time I ran it while there was no rng anywhere.

lgray commented 1 year ago

That's fine, the point of the rng here is just to produce some data and remove dependencies on uproot to narrow the places to look.

Through this we identified that there's a memory issue on x86 for the current release of boost histgram.

Amusingly, if you use numpy histograms (or dask-histogram's numpy interface) all these problems go away.

ikrommyd commented 1 year ago

@lgray you can also close this one as complete