dask-contrib / dask-awkward

Native Dask collection for awkward arrays, and the library to use it.
https://dask-awkward.readthedocs.io
BSD 3-Clause "New" or "Revised" License
61 stars 19 forks source link

Feature request for typetracers handler for `np.interp` #493

Open green-cabbage opened 8 months ago

green-cabbage commented 8 months ago

Hello, I originally made a QA question on implementing dask_awkward equivalent of np.interp in this QA discussion.

@lgray suggested wrapping np.interp with dak.map_partitions, but doing so gave TypeError("cannot compare unknown lengths against known values") . The full error log can be seen in here. @lgray later said that this indicates that some wrapper code is neceesary to handle the typetracers. Under his suggestion, I would like to make a feature request for this functionality, so that I can in turn wrap np.interp with dak.map_partitions. Please let me know if you need more info!

lgray commented 8 months ago

@green-cabbage it's probably something along the lines of:

def interp(vals, knots_x, knots_y):
       result = ak.Array(np.interp(
           ak.typetracer.length_zero_if_typetracer(vals),
           ak.typetracer.length_zero_if_typetracer(knots_x),
           ak.typetracer.length_zero_if_typetracer(knots_y),
       ))
       if ak.backend(vals, knots_x, knots_y) == "typetracer":
           return ak.Array(result.layer.to_typetracer(forget_length=True))
       return result

interpolated = dak.map_partitions(
    interp,
    vals,
    xs,
    ys,
)

If you have jagged arrays you'll have to do the flattening and unflattening as well.

green-cabbage commented 8 months ago

Here's a quick code that I wrote up to test this:

import dask_awkward as dak
import awkward as ak
from coffea.nanoevents import NanoEventsFactory, NanoAODSchema
from dask_awkward.lib.core import map_partitions
import numpy as np

def interp(vals, knots_x, knots_y):
    print(f"ak.typetracer.length_zero_if_typetracer(vals): {ak.typetracer.length_zero_if_typetracer(vals)}")
    result = ak.Array(np.interp(
       ak.typetracer.length_zero_if_typetracer(vals),
       knots_x,
       knots_y,
    ))
    print(f"result: {result}")
    if ak.backend(vals, knots_x, knots_y) == "typetracer":
       return ak.Array(result.layer.to_typetracer(forget_length=True))
    return result

fname = "root://eos.cms.rcac.purdue.edu//store/mc/RunIISummer20UL18NanoAODv9/GluGluHToMuMu_M125_TuneCP5_13TeV-powheg-pythia8/NANOAODSIM/106X_upgrade2018_realistic_v16_L1v1-v1/2810000/C4DAB63C-E2A1-A541-93A8-3F46315E362C.root"

events = NanoEventsFactory.from_root(
    {fname: "Events"},
    schemaclass=NanoAODSchema,
    metadata={"dataset": "DYJets"},
).events()
jets = events.Jet[:2]
xp = np.array([1, 2, 3])
fp = np.array([3, 2, 0])

# Implement Lindsey's suggestion
interpolated = dak.map_partitions(
    interp,
    jets.pt, xp, fp
)

This returns ak.typetracer.length_zero_if_typetracer(vals): [], result: [] and then returns an error because the result variable is empty. I individually tested ak.typetracer.length_zero_if_typetracer(jets.pt) and it returns an error saying:


---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
File [/depot/cms/kernels/coffea2024/lib/python3.10/site-packages/awkward/_dispatch.py:39](https://cms.geddes.rcac.purdue.edu/depot/cms/kernels/coffea2024/lib/python3.10/site-packages/awkward/_dispatch.py#line=38), in named_high_level_function.<locals>.dispatch(*args, **kwargs)
     38 with OperationErrorContext(name, args, kwargs):
---> 39     gen_or_result = func(*args, **kwargs)
     40     if isgenerator(gen_or_result):

File [/depot/cms/kernels/coffea2024/lib/python3.10/site-packages/awkward/operations/ak_from_iter.py:70](https://cms.geddes.rcac.purdue.edu/depot/cms/kernels/coffea2024/lib/python3.10/site-packages/awkward/operations/ak_from_iter.py#line=69), in from_iter(iterable, allow_record, highlevel, behavior, attrs, initial, resize)
     29 """
     30 Args:
     31     iterable (Python iterable): Data to convert into an Awkward Array.
   (...)
     68 See also #ak.to_list.
     69 """
---> 70 return _impl(iterable, highlevel, behavior, allow_record, initial, resize, attrs)

File [/depot/cms/kernels/coffea2024/lib/python3.10/site-packages/awkward/operations/ak_from_iter.py:100](https://cms.geddes.rcac.purdue.edu/depot/cms/kernels/coffea2024/lib/python3.10/site-packages/awkward/operations/ak_from_iter.py#line=99), in _impl(iterable, highlevel, behavior, allow_record, initial, resize, attrs)
     99 builder = _ext.ArrayBuilder(initial=initial, resize=resize)
--> 100 builder.fromiter(iterable)
    102 formstr, length, buffers = builder.to_buffers()

File [/depot/cms/kernels/coffea2024/lib/python3.10/site-packages/dask_awkward/lib/core.py:1527](https://cms.geddes.rcac.purdue.edu/depot/cms/kernels/coffea2024/lib/python3.10/site-packages/dask_awkward/lib/core.py#line=1526), in Array.__getattr__.<locals>.wrapper(*args, **kwargs)
   1525 @wraps(cls_method)
   1526 def wrapper(*args, **kwargs):
-> 1527     return self.map_partitions(
   1528         _BehaviorMethodFn(attr, **kwargs),
   1529         *args,
   1530         label=hyphenize(attr),
   1531     )

File [/depot/cms/kernels/coffea2024/lib/python3.10/site-packages/dask_awkward/lib/core.py:1588](https://cms.geddes.rcac.purdue.edu/depot/cms/kernels/coffea2024/lib/python3.10/site-packages/dask_awkward/lib/core.py#line=1587), in Array.map_partitions(self, func, traverse, *args, **kwargs)
   1562 """Map a function across all partitions of the collection.
   1563 
   1564 Parameters
   (...)
   1586 
   1587 """
-> 1588 return map_partitions(func, self, *args, traverse=traverse, **kwargs)

File [/depot/cms/kernels/coffea2024/lib/python3.10/site-packages/dask_awkward/lib/core.py:2107](https://cms.geddes.rcac.purdue.edu/depot/cms/kernels/coffea2024/lib/python3.10/site-packages/dask_awkward/lib/core.py#line=2106), in map_partitions(base_fn, label, token, meta, output_divisions, traverse, *args, **kwargs)
   2101 fn = ArgsKwargsPackedFunction(
   2102     base_fn,
   2103     arg_repackers,
   2104     kwarg_repacker,
   2105     arg_lens_for_repackers,
   2106 )
-> 2107 return _map_partitions(
   2108     fn,
   2109     *arg_flat_deps_expanded,
   2110     *kwarg_flat_deps,
   2111     label=label,
   2112     token=token,
   2113     meta=meta,
   2114     output_divisions=output_divisions,
   2115 )

File [/depot/cms/kernels/coffea2024/lib/python3.10/site-packages/dask_awkward/lib/core.py:1943](https://cms.geddes.rcac.purdue.edu/depot/cms/kernels/coffea2024/lib/python3.10/site-packages/dask_awkward/lib/core.py#line=1942), in _map_partitions(fn, label, token, meta, output_divisions, *args, **kwargs)
   1942 if meta is None:
-> 1943     meta = map_meta(fn, *args, **kwargs)
   1945 hlg = HighLevelGraph.from_collections(
   1946     name,
   1947     lay,
   1948     dependencies=deps,
   1949 )

File [/depot/cms/kernels/coffea2024/lib/python3.10/site-packages/dask_awkward/lib/core.py:2467](https://cms.geddes.rcac.purdue.edu/depot/cms/kernels/coffea2024/lib/python3.10/site-packages/dask_awkward/lib/core.py#line=2466), in map_meta(fn, *deps)
   2466 try:
-> 2467     meta = fn(*to_meta(deps))
   2468     return meta

File [/depot/cms/kernels/coffea2024/lib/python3.10/site-packages/dask_awkward/lib/core.py:1903](https://cms.geddes.rcac.purdue.edu/depot/cms/kernels/coffea2024/lib/python3.10/site-packages/dask_awkward/lib/core.py#line=1902), in ArgsKwargsPackedFunction.__call__(self, *args_deps_expanded)
   1902 kwargs = self.kwarg_repacker(args_deps_expanded[len_args:])[0]
-> 1903 return self.fn(*args, **kwargs)

File [/depot/cms/kernels/coffea2024/lib/python3.10/site-packages/dask_awkward/lib/core.py:2567](https://cms.geddes.rcac.purdue.edu/depot/cms/kernels/coffea2024/lib/python3.10/site-packages/dask_awkward/lib/core.py#line=2566), in _BehaviorMethodFn.__call__(self, coll, *args)
   2566 def __call__(self, coll: ak.Array, *args: Any) -> ak.Array:
-> 2567     return getattr(coll, self.attr)(*args, **self.kwargs)

File [/depot/cms/kernels/coffea2024/lib/python3.10/site-packages/awkward/highlevel.py:496](https://cms.geddes.rcac.purdue.edu/depot/cms/kernels/coffea2024/lib/python3.10/site-packages/awkward/highlevel.py#line=495), in Array.to_list(self)
    493 """
    494 Converts this Array into Python objects; same as #ak.to_list.
    495 """
--> 496 return self._layout.to_list(self._behavior)

File [/depot/cms/kernels/coffea2024/lib/python3.10/site-packages/awkward/contents/content.py:1086](https://cms.geddes.rcac.purdue.edu/depot/cms/kernels/coffea2024/lib/python3.10/site-packages/awkward/contents/content.py#line=1085), in Content.to_list(self, behavior)
   1085 def to_list(self, behavior: dict | None = None) -> list:
-> 1086     return self.to_packed()._to_list(behavior, None)

File [/depot/cms/kernels/coffea2024/lib/python3.10/site-packages/awkward/contents/listoffsetarray.py:2228](https://cms.geddes.rcac.purdue.edu/depot/cms/kernels/coffea2024/lib/python3.10/site-packages/awkward/contents/listoffsetarray.py#line=2227), in ListOffsetArray._to_list(self, behavior, json_conversions)
   2227 if not self._backend.nplike.known_data:
-> 2228     raise TypeError("cannot convert typetracer arrays to Python lists")
   2230 starts, stops = self.starts, self.stops

TypeError: cannot convert typetracer arrays to Python lists

The above exception was the direct cause of the following exception:

TypeError                                 Traceback (most recent call last)
Cell In[3], line 2
      1 # ak.typetracer.length_zero_if_typetracer(jets.pt, highlevel=False)
----> 2 ak.typetracer.length_zero_if_typetracer(jets.pt)
      3 # ak.backend(jets.pt, xp, fp)

File [/depot/cms/kernels/coffea2024/lib/python3.10/site-packages/awkward/typetracer.py:102](https://cms.geddes.rcac.purdue.edu/depot/cms/kernels/coffea2024/lib/python3.10/site-packages/awkward/typetracer.py#line=101), in length_zero_if_typetracer(array, highlevel, behavior, attrs)
     81 def length_zero_if_typetracer(
     82     array: Any,
     83     *,
   (...)
     86     attrs: Mapping[str, Any] | None = None,
     87 ) -> T:
     88     """
     89     Args:
     90         array: Array-like data (anything #ak.to_layout recognizes).
   (...)
    100     shallow copy of the original array is returned.
    101     """
--> 102     return _length_0_1_if_typetracer(
    103         array, Form.length_zero_array, highlevel, behavior, attrs
    104     )

File [/depot/cms/kernels/coffea2024/lib/python3.10/site-packages/awkward/typetracer.py:65](https://cms.geddes.rcac.purdue.edu/depot/cms/kernels/coffea2024/lib/python3.10/site-packages/awkward/typetracer.py#line=64), in _length_0_1_if_typetracer(array, function, highlevel, behavior, attrs)
     62 typetracer_backend = TypeTracerBackend.instance()
     64 with HighLevelContext(behavior=behavior, attrs=attrs) as ctx:
---> 65     layout = ctx.unwrap(
     66         array,
     67         allow_unknown=False,
     68         allow_record=True,
     69         primitive_policy="error",
     70         none_policy="error",
     71         string_policy="as-characters",
     72     )
     74 if layout.backend is typetracer_backend:
     75     _touch_data(layout)

File [/depot/cms/kernels/coffea2024/lib/python3.10/site-packages/awkward/_layout.py:145](https://cms.geddes.rcac.purdue.edu/depot/cms/kernels/coffea2024/lib/python3.10/site-packages/awkward/_layout.py#line=144), in HighLevelContext.unwrap(self, obj, allow_record, allow_unknown, none_policy, primitive_policy, string_policy, use_from_iter, regulararray)
    141 from awkward.operations.ak_to_layout import _impl as to_layout_impl
    143 self.update(obj)
--> 145 return to_layout_impl(
    146     obj,
    147     allow_record=allow_record,
    148     allow_unknown=allow_unknown,
    149     none_policy=none_policy,
    150     use_from_iter=use_from_iter,
    151     primitive_policy=primitive_policy,
    152     string_policy=string_policy,
    153     regulararray=regulararray,
    154 )

File [/depot/cms/kernels/coffea2024/lib/python3.10/site-packages/awkward/operations/ak_to_layout.py:263](https://cms.geddes.rcac.purdue.edu/depot/cms/kernels/coffea2024/lib/python3.10/site-packages/awkward/operations/ak_to_layout.py#line=262), in _impl(obj, allow_record, allow_unknown, none_policy, regulararray, use_from_iter, primitive_policy, string_policy)
    261 elif isinstance(obj, Iterable):
    262     if use_from_iter:
--> 263         return ak.operations.from_iter(
    264             obj, highlevel=False, allow_record=allow_record
    265         )
    266     else:
    267         raise TypeError(
    268             "Encountered an iterable object, but coercing iterables is disabled"
    269         )

File [/depot/cms/kernels/coffea2024/lib/python3.10/site-packages/awkward/_dispatch.py:38](https://cms.geddes.rcac.purdue.edu/depot/cms/kernels/coffea2024/lib/python3.10/site-packages/awkward/_dispatch.py#line=37), in named_high_level_function.<locals>.dispatch(*args, **kwargs)
     35 @wraps(func)
     36 def dispatch(*args, **kwargs):
     37     # NOTE: this decorator assumes that the operation is exposed under `ak.`
---> 38     with OperationErrorContext(name, args, kwargs):
     39         gen_or_result = func(*args, **kwargs)
     40         if isgenerator(gen_or_result):

File [/depot/cms/kernels/coffea2024/lib/python3.10/site-packages/awkward/_errors.py:85](https://cms.geddes.rcac.purdue.edu/depot/cms/kernels/coffea2024/lib/python3.10/site-packages/awkward/_errors.py#line=84), in ErrorContext.__exit__(self, exception_type, exception_value, traceback)
     78 try:
     79     # Handle caught exception
     80     if (
     81         exception_type is not None
     82         and issubclass(exception_type, Exception)
     83         and self.primary() is self
     84     ):
---> 85         self.handle_exception(exception_type, exception_value)
     86 finally:
     87     # Step out of the way so that another ErrorContext can become primary.
     88     if self.primary() is self:

File [/depot/cms/kernels/coffea2024/lib/python3.10/site-packages/awkward/_errors.py:95](https://cms.geddes.rcac.purdue.edu/depot/cms/kernels/coffea2024/lib/python3.10/site-packages/awkward/_errors.py#line=94), in ErrorContext.handle_exception(self, cls, exception)
     93     self.decorate_exception(cls, exception)
     94 else:
---> 95     raise self.decorate_exception(cls, exception)

TypeError: cannot convert typetracer arrays to Python lists

This error occurred while calling

    ak.from_iter(
        Array-instance
        highlevel = False
        allow_record = True
    )
lgray commented 8 months ago

You left out all the length_zero_if_typetracers that are necessary.

lgray commented 8 months ago

You cannot mix delayed mode evaluation with eager mode evaluation.

lgray commented 8 months ago

i.e. if you have an interpolation lookup table to implement you need to ship that with the function.

You can make a class that has a __call__ method, fill that class with the interpolation data, and then map_partitions the instantiated class.

it'll be something like:

class MyScaleVariation:
    def __init__(self, x_knots, y_knots):
        self.x_knots = x_knots
        self.y_knots = y_knots

    def __call__(self, vals):
        # handle the typetracers
        out = np.iterp(vals, self.x_knots, self.y_knots)
        return out

Then you make that class, and then you call map_partitions on it, or if you want to get fancy you can check for a dask awkward array being passed into the __call__ function and call map_partitions when you get a dask-awkward array and otherwise evaluate the function on the concrete data.

green-cabbage commented 8 months ago

I have taken your suggestion and made a new code:

class MyScaleVariation:
    def __init__(self, x_knots, y_knots):
        self.x_knots = x_knots
        self.y_knots = y_knots

    def __call__(self, vals):
        result = ak.Array(np.interp(
           vals, # this is already delayed ak array
           ak.typetracer.length_zero_if_typetracer(self.x_knots),
           ak.typetracer.length_zero_if_typetracer(self.y_knots),
        ))
        print(f"result: {result}")
        if ak.backend(vals, self.x_knots, self.y_knots) == "typetracer":
           return ak.Array(result.layer.to_typetracer(forget_length=True))
        return result
scalevar = MyScaleVariation(xp, fp)
interpolated = dak.map_partitions(
    scalevar,
    jets.pt.flatten()
)

This goes further, but then returns this error:

......
Cell In[47], line 10, in MyScaleVariation.__call__(self, vals)
      8 print(f"ak.typetracer.length_zero_if_typetracer(vals): {ak.typetracer.length_zero_if_typetracer(vals)}")
      9 print(f"ak.typetracer.length_zero_if_typetracer(vals): {type(ak.typetracer.length_zero_if_typetracer(self.x_knots))}")
---> 10 result = ak.Array(np.interp(
     11    vals, # this is already delayed ak array
     12    ak.typetracer.length_zero_if_typetracer(self.x_knots),
     13    ak.typetracer.length_zero_if_typetracer(self.y_knots),
     14 ))
     15 print(f"result: {result}")
     16 if ak.backend(vals, self.x_knots, self.y_knots) == "typetracer":

File [/depot/cms/kernels/coffea2024/lib/python3.10/site-packages/awkward/highlevel.py:1527](https://cms.geddes.rcac.purdue.edu/depot/cms/kernels/coffea2024/lib/python3.10/site-packages/awkward/highlevel.py#line=1526), in Array.__array_function__(self, func, types, args, kwargs)
   1513 def __array_function__(self, func, types, args, kwargs):
   1514     """
   1515     Intercepts attempts to pass this Array to those NumPy functions other
   1516     than universal functions that have an Awkward equivalent.
   (...)
   1525     See also #__array_ufunc__.
   1526     """
-> 1527     return ak._connect.numpy.array_function(
   1528         func, types, args, kwargs, behavior=self._behavior, attrs=self._attrs
   1529     )

File [/depot/cms/kernels/coffea2024/lib/python3.10/site-packages/awkward/_connect/numpy.py:111](https://cms.geddes.rcac.purdue.edu/depot/cms/kernels/coffea2024/lib/python3.10/site-packages/awkward/_connect/numpy.py#line=110), in array_function(func, types, args, kwargs, behavior, attrs)
    109 rectilinear_args = tuple(_to_rectilinear(x, backend) for x in args)
    110 rectilinear_kwargs = {k: _to_rectilinear(v, backend) for k, v in kwargs.items()}
--> 111 result = func(*rectilinear_args, **rectilinear_kwargs)
    112 # We want the result to be a layout (this will fail for functions returning non-array convertibles)
    113 out = ak.operations.ak_to_layout._impl(
    114     result,
    115     allow_record=True,
   (...)
    121     string_policy="pass-through",
    122 )

File [/depot/cms/kernels/coffea2024/lib/python3.10/site-packages/numpy/lib/function_base.py:1566](https://cms.geddes.rcac.purdue.edu/depot/cms/kernels/coffea2024/lib/python3.10/site-packages/numpy/lib/function_base.py#line=1565), in interp(x, xp, fp, left, right, period)
   1461 @array_function_dispatch(_interp_dispatcher)
   1462 def interp(x, xp, fp, left=None, right=None, period=None):
   1463     """
   1464     One-dimensional linear interpolation for monotonically increasing sample points.
   1465 
   (...)
   1563 
   1564     """
-> 1566     fp = np.asarray(fp)
   1568     if np.iscomplexobj(fp):
   1569         interp_func = compiled_interp_complex

File [/depot/cms/kernels/coffea2024/lib/python3.10/site-packages/awkward/_nplikes/typetracer.py:308](https://cms.geddes.rcac.purdue.edu/depot/cms/kernels/coffea2024/lib/python3.10/site-packages/awkward/_nplikes/typetracer.py#line=307), in TypeTracerArray.__array__(self, dtype)
    307 def __array__(self, dtype=None):
--> 308     raise AssertionError(
    309         "bug in Awkward Array: attempt to convert TypeTracerArray into a concrete array"
    310     )

AssertionError: bug in Awkward Array: attempt to convert TypeTracerArray into a concrete array
lgray commented 8 months ago

OK I need to be far more precise for you to get something that works. You are shooting in the dark here a bit, and I think it is more the documentation's fault than anything else.

The lifecycle of evaluating functions you send to map_partitions is: 1) a typetracer of the arguments to the call are sent through the function, a typetracer is returned to yield the output data structure. This happens when you are building the task graph. If you need to evaluate a function that doesn't know how to deal with awkward arrays or typetracers, you need to convert the typetracers/awkward arrays back and forth through the function. In the code you supplied you converted the arrays that do not need to be converted. You have to have a concrete array to pass through np.interp. Not a delayed array. 2) once the task graph is built the typetracers with data access report are passed through the graph to determine input data needs for the analysis. this requires the same handling code as 1 3) the actual arrays with real data are fed through, code handling awkward arrays should only handle that portion of conversion.

class MyScaleVariation:
    def __init__(self, x_knots, y_knots):
        self.x_knots = x_knots
        self.y_knots = y_knots

    def __call__(self, vals):
        result = ak.Array(np.interp(
           ak.typetracer.length_zero_if_typetracer(vals), # this will either be a concrete array with data or a type tracer
           self.x_knots,
           self.y_knots,
        ))
        print(f"result: {result}")
        if ak.backend(vals) == "typetracer":
           return ak.Array(result.layout.to_typetracer(forget_length=True))
        return result
scalevar = MyScaleVariation(xp, fp) # xp and fp should be *concrete* arrays that you use later.
interpolated = dak.map_partitions(
    scalevar,
    jets.pt.flatten() # just make sure you can unflatten it too, if you need that.
)
green-cabbage commented 8 months ago

Your last suggestion essentially solved my issue. My code below functions as intended:


import dask_awkward as dak
import awkward as ak
from coffea.nanoevents import NanoEventsFactory, NanoAODSchema
from dask_awkward.lib.core import map_partitions
import numpy as np

fname = "root://eos.cms.rcac.purdue.edu:1094///store/mc/RunIISummer20UL18NanoAODv9/GluGluHToMuMu_M125_TuneCP5_13TeV-powheg-pythia8/NANOAODSIM/106X_upgrade2018_realistic_v16_L1v1-v1/2810000/C4DAB63C-E2A1-A541-93A8-3F46315E362C.root"

events = NanoEventsFactory.from_root(
    {fname: "Events"},
    schemaclass=NanoAODSchema,
    metadata={"dataset": "DYJets"},
).events()
H_pt = events.HTXS.Higgs_pt[:50] # 1D flat array
xp = np.array([1, 2, 3])
fp = np.array([3, 2, 0])

class MyScaleVariation:
    def __init__(self, x_knots, y_knots):
        self.x_knots = x_knots
        self.y_knots = y_knots

    def __call__(self, vals):
        result = ak.Array(np.interp(
           ak.typetracer.length_zero_if_typetracer(vals), # this will either be a concrete array with data or a type tracer
           self.x_knots,
           self.y_knots,
        ))
        if ak.backend(vals) == "typetracer":
           return ak.Array(result.layout.to_typetracer(forget_length=True))
        return result
scalevar = MyScaleVariation(xp, fp)
interpolated = dak.map_partitions(
    scalevar,
    H_pt
)
print(interpolated.compute())

This prints an actual array, which is great! I will now try to integrate this into my framework, and will let you know if things don't go well.