ssl-hep / ServiceX_frontend

Client access library for ServiceX
5 stars 11 forks source link

Awkward Queries #307

Open BenGalewsky opened 1 year ago

BenGalewsky commented 1 year ago

As an analyzer I want to specify my ServiceX queries using awkward syntax so I can perform row-level cuts without learning a new language

Description

We will use Awkward DASK to create a task graph for selects along with necessary_columns method to determine properties to include in the results. This will be translated into Qastle to pass on to the code generators.

We can add annotations to the task graph to indicate where the select goes beyond what ServiceX can handle.

Assumptions

  1. It will only do row-level filtering
  2. For the first pass, we won't attempt to unify the selections between the ServiceX parts and the coffea parts.
BenGalewsky commented 1 year ago

This code snippet was submitted by Lindsey Gray

from coffea.nanoevents import NanoEventsFactory, NanoAODSchema
from distributed import Client
import dask
import dask_awkward
import awkward as ak
import hist.dask as hda

def extract_pushdown(coll):
    hlg_sorted = coll.dask._toposort_layers()
    pushdown_deps = []
    for key in hlg_sorted:
        annotations = coll.dask.layers[key].annotations
        if annotations is not None and "pushdown" in annotations:
            #print(key, coll.dask.layers[key].annotations)
            pushdown_deps = [key] + pushdown_deps
    for dep in pushdown_deps:
        layer = coll.dask.layers[dep]
        fcn = list(layer.dsk.values())[0][0]
        if isinstance(layer, dask_awkward.layers.AwkwardBlockwiseLayer) and not isinstance(layer, dask_awkward.layers.AwkwardInputLayer):
            print(dir(layer))
            print(layer.dsk)
            print(list(layer.keys()))
            print(dep, fcn.fn)
            print(dir(fcn))
            print(fcn.arg_repackers[0])
        else:
            print(dep, fcn)

if __name__ == "__main__":
    #client = Client()

    dask.config.set({"awkward.optimization.enabled": True, "awkward.raise-failed-meta": True, "awkward.optimization.on-fail": "raise"})

    with dask.annotate(pushdown="servicex"):
        events = NanoEventsFactory.from_root(
            {"tests/samples/nano_dy.root": "Events"},
            metadata={"dataset": "nano_dy"},
            schemaclass=NanoAODSchema,
            permit_dask=True,
        ).events()

        mask = events.Muon.pt > 30
        events = events[ak.any(mask, axis=1)]

    myhist = hda.Hist.new.Regular(50, -2.5, 2.5, name="abseta").Double()

    myhist.fill(abseta=abs(events.Muon.eta))

    extract_pushdown(myhist)
ponyisi commented 3 months ago

We have significant support for expressions and filtering using awkward syntax now using the uproot-raw codegen.

ponyisi commented 1 week ago

Following some discussion with Jim Pivarski, a thought about a first way of tying ServiceX and dask-awkward together:

BenGalewsky commented 1 week ago
  • I would imagine a separate microservice that used the DID finder to look up the dataset files and extract metadata from one of them, then returning the schema to the user.

The return of the preflight check! We used to have a service that would review a sample file to decide if the transform would work before committing the rest of the workers. We decided it wasn't worth the effort and removed that functionality.