dask-contrib / dask-awkward

Native Dask collection for awkward arrays, and the library to use it.
https://dask-awkward.readthedocs.io
BSD 3-Clause "New" or "Revised" License
61 stars 19 forks source link

`dak.to_parquet` cannot write empty arrays when `extensionarray=True` #541

Closed ikrommyd closed 2 months ago

ikrommyd commented 2 months ago

To reproduce

import awkward as ak
import dask_awkward as dak

array = ak.Array({"x": [], "y":[]})
x =  x = dak.from_awkward(array, 1)

dak.to_parquet(x, "dummy", extensionarray=False) # works fine
dak.to_parquet(x, "dummy", extensionarray=True) # fails

Traceback


File ~/miniforge3/envs/egamma_dev/lib/python3.10/site-packages/dask_awkward/lib/io/parquet.py:694, in to_parquet(array, destination, list_to32, string_to32, bytestring_to32, emptyarray_to, categorical_as_dictionary, extensionarray, count_nulls, compression, compression_level, row_group_size, data_page_size, parquet_flavor, parquet_version, parquet_page_version, parquet_metadata_statistics, parquet_dictionary_encoding, parquet_byte_stream_split, parquet_coerce_timestamps, parquet_old_int96_timestamps, parquet_compliant_nested, parquet_extra_options, storage_options, write_metadata, compute, prefix)
    692 out = new_scalar_object(graph, final_name, dtype="f8")
    693 if compute:
--> 694     out.compute()
    695     return None
    696 else:

File ~/miniforge3/envs/egamma_dev/lib/python3.10/site-packages/dask/base.py:376, in DaskMethodsMixin.compute(self, **kwargs)
    352 def compute(self, **kwargs):
    353     """Compute this dask collection
    354
    355     This turns a lazy Dask collection into its in-memory equivalent.
   (...)
    374     dask.compute
    375     """
--> 376     (result,) = compute(self, traverse=False, **kwargs)
    377     return result

File ~/miniforge3/envs/egamma_dev/lib/python3.10/site-packages/dask/base.py:664, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    661     postcomputes.append(x.__dask_postcompute__())
    663 with shorten_traceback():
--> 664     results = schedule(dsk, keys, **kwargs)
    666 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File ~/miniforge3/envs/egamma_dev/lib/python3.10/site-packages/dask_awkward/lib/core.py:1964, in ArgsKwargsPackedFunction.__call__(self, *args_deps_expanded)
   1962     len_args += n_args
   1963 kwargs = self.kwarg_repacker(args_deps_expanded[len_args:])[0]
-> 1964 return self.fn(*args, **kwargs)

File ~/miniforge3/envs/egamma_dev/lib/python3.10/site-packages/dask_awkward/lib/io/parquet.py:506, in _ToParquetFn.__call__(self, data, block_index)
    504     filename = f"{self.prefix}-{filename}"
    505 filename = self.fs.unstrip_protocol(f"{self.path}{self.fs.sep}{filename}")
--> 506 return ak.to_parquet(
    507     data, filename, **self.kwargs, storage_options=self.storage_options
    508 )

File ~/miniforge3/envs/egamma_dev/lib/python3.10/site-packages/awkward/_dispatch.py:38, in named_high_level_function.<locals>.dispatch(*args, **kwargs)
     35 @wraps(func)
     36 def dispatch(*args, **kwargs):
     37     # NOTE: this decorator assumes that the operation is exposed under `ak.`
---> 38     with OperationErrorContext(name, args, kwargs):
     39         gen_or_result = func(*args, **kwargs)
     40         if isgenerator(gen_or_result):

File ~/miniforge3/envs/egamma_dev/lib/python3.10/site-packages/awkward/_errors.py:85, in ErrorContext.__exit__(self, exception_type, exception_value, traceback)
     78 try:
     79     # Handle caught exception
     80     if (
     81         exception_type is not None
     82         and issubclass(exception_type, Exception)
     83         and self.primary() is self
     84     ):
---> 85         self.handle_exception(exception_type, exception_value)
     86 finally:
     87     # Step out of the way so that another ErrorContext can become primary.
     88     if self.primary() is self:

File ~/miniforge3/envs/egamma_dev/lib/python3.10/site-packages/awkward/_errors.py:95, in ErrorContext.handle_exception(self, cls, exception)
     93     self.decorate_exception(cls, exception)
     94 else:
---> 95     raise self.decorate_exception(cls, exception)

ValueError: Must pass schema, or at least one RecordBatch

This error occurred while calling

    ak.to_parquet(
        <Array [] type='0 * {x: unknown, y: unknown}'>
        'file:///Users/iason/work/egamma_dev/egamma-tnp/dummy/part'...
        list_to32 = False
        string_to32 = True
        bytestring_to32 = True
        emptyarray_to = None
        categorical_as_dictionary = False
        extensionarray = True
        count_nulls = True
        compression = 'zstd'
        compression_level = None
        row_group_size = 67108864
        data_page_size = None
        parquet_flavor = None
        parquet_version = '2.4'
        parquet_page_version = '1.0'
        parquet_metadata_statistics = True
        parquet_dictionary_encoding = False
        parquet_byte_stream_split = False
        parquet_coerce_timestamps = None
        parquet_old_int96_timestamps = None
        parquet_compliant_nested = False
        parquet_extra_options = None
        storage_options = None
    )
martindurant commented 2 months ago

OK, will fix. Actually, an empty partition should result in no file at all. That does raise the question of what to do when all partitions are empty, but I'll defer that one.

martindurant commented 2 months ago

Actually, this is an ak error:

import awkward as ak

array = ak.Array({"x": [], "y":[]})

ak.to_parquet(array, "dummy", extensionarray=False)  # works fine
ak.to_parquet(array, "dummy", extensionarray=True)  # fails

cc @jpivarski

Of course, dak could still simply skip empty partitions.

ikrommyd commented 2 months ago

Ah, I'm sorry then. I thought it would be tested on awkward and I saw only extensionarray=False in the dask-awkward codebase so I assume it was a dak problem.

ikrommyd commented 2 months ago

This is a problem in my case because I'm skimming and writing NTuples, and some partitions end up with zero events after skimming so it tries to write empty arrays and the entire dask computation is being killed.

jpivarski commented 2 months ago

I expect this to be fixed by scikit-hep/awkward#3234.

martindurant commented 2 months ago

+1 , please close this when that one is merged.

I am still unsure on whether dak should omit these files entirely, since then you can end up with no output at all.