dask-contrib / dask-awkward

Native Dask collection for awkward arrays, and the library to use it.
https://dask-awkward.readthedocs.io
BSD 3-Clause "New" or "Revised" License
61 stars 19 forks source link

PlaceholderArray encountered in `BitMaskedArray.to_ByteMaskedArray` when it shouldn't be #524

Open jpivarski opened 4 months ago

jpivarski commented 4 months ago

Here's a reproducer:

files.tar.gz

import awkward as ak
ak.concatenate([
    ak.from_parquet("one.parquet")["goodjets"],
    ak.from_parquet("two.parquet")["goodjets"],
])

succeeds but

import awkward as ak
import dask_awkward as dak
ak.concatenate([
    dak.from_parquet("one.parquet")["goodjets"],
    dak.from_parquet("two.parquet")["goodjets"],
]).compute()

fails with

Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/home/jpivarski/miniforge3/lib/python3.11/site-packages/dask/base.py", line 376, in compute
    (result,) = compute(self, traverse=False, **kwargs)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/jpivarski/miniforge3/lib/python3.11/site-packages/dask/base.py", line 664, in compute
    return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/jpivarski/miniforge3/lib/python3.11/site-packages/dask/base.py", line 664, in <listcomp>
    return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
                   ^^^^^^^^
  File "/home/jpivarski/miniforge3/lib/python3.11/site-packages/dask_awkward/lib/core.py", line 830, in _finalize_array
    return ak.concatenate(results)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/jpivarski/irishep/awkward/src/awkward/_dispatch.py", line 64, in dispatch
    next(gen_or_result)
  File "/home/jpivarski/irishep/awkward/src/awkward/operations/ak_concatenate.py", line 64, in concatenate
    return _impl(arrays, axis, mergebool, highlevel, behavior, attrs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/jpivarski/irishep/awkward/src/awkward/operations/ak_concatenate.py", line 160, in _impl
    contents = [ak._do.mergemany(b) for b in batches]
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/jpivarski/irishep/awkward/src/awkward/operations/ak_concatenate.py", line 160, in <listcomp>
    contents = [ak._do.mergemany(b) for b in batches]
                ^^^^^^^^^^^^^^^^^^^
  File "/home/jpivarski/irishep/awkward/src/awkward/_do.py", line 218, in mergemany
    return contents[0]._mergemany(contents[1:])
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/jpivarski/irishep/awkward/src/awkward/contents/listoffsetarray.py", line 808, in _mergemany
    out = listarray._mergemany(others)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/jpivarski/irishep/awkward/src/awkward/contents/listarray.py", line 1128, in _mergemany
    nextcontent = contents[0]._mergemany(tail_contents)
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/jpivarski/irishep/awkward/src/awkward/contents/recordarray.py", line 723, in _mergemany
    trimmed = field[0 : array.length]
              ~~~~~^^^^^^^^^^^^^^^^^^
  File "/home/jpivarski/irishep/awkward/src/awkward/contents/content.py", line 512, in __getitem__
    return self._getitem(where)
           ^^^^^^^^^^^^^^^^^^^^
  File "/home/jpivarski/irishep/awkward/src/awkward/contents/content.py", line 523, in _getitem
    return self._getitem_range(start, stop)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/jpivarski/irishep/awkward/src/awkward/contents/bitmaskedarray.py", line 493, in _getitem_range
    return self.to_ByteMaskedArray()._getitem_range(start, stop)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/jpivarski/irishep/awkward/src/awkward/contents/bitmaskedarray.py", line 384, in to_ByteMaskedArray
    self._backend[
  File "/home/jpivarski/irishep/awkward/src/awkward/_kernels.py", line 91, in __call__
    return self._impl(
           ^^^^^^^^^^^
  File "/home/jpivarski/irishep/awkward/src/awkward/_kernels.py", line 92, in <genexpr>
    *(self._cast(x, t) for x, t in zip(args, self._impl.argtypes))
      ^^^^^^^^^^^^^^^^
  File "/home/jpivarski/irishep/awkward/src/awkward/_kernels.py", line 82, in _cast
    raise AssertionError(
AssertionError: Only NumPy buffers should be passed to Numpy Kernels, received PyCPointerType

Going into more detail, the troublemaker is self._mask.data, which is a PlaceholderArray. The rehydration must be saying that this buffer is not needed, but it is needed. The concatenation needs to know which array elements are missing.

jpivarski commented 4 months ago

Another indicator: passing optimize_graph=False to compute make it work. It's the column-optimization.

martindurant commented 4 months ago

Is it expected to be possible to do concatenate on typetracers? It should be needed, since we will need to know the columns to select from both input layers independently - we have no mechanism to carry the columns found needed for one layer across to another.

So far I have found these edges:

martindurant commented 4 months ago

So indeed, the second partition is receiving columns=[] (nothing to load), and it unproject_layout is turning all the missing columns into typetracers.

martindurant commented 4 months ago

I can confirm that the one-pass branch successfully computes the second failing case, but _only_the_firsttime. Subsequent computes fail. The fail mode in having no required columns passed to parquet at all. Calling dak.core.dak_cache.clear() causes it to pass again, so we have a good hint of where the problem is.

Given that #526 has a variant of this same problem, is it time to dust off the one-pass PR?

martindurant commented 4 months ago

(I should say, that a trivial, but not great, workaround for the issue here is to touch all inputs to a concatenate, which somehow is what the other linked issue ended up doing (because of axis=1, presumably).