dask / dask-expr

BSD 3-Clause "New" or "Revised" License
79 stars 18 forks source link

ReadParquetFSSpec will throw error when collecting statistics if plan is emply #1021

Closed benrutter closed 2 months ago

benrutter commented 2 months ago

Describe the issue:

I'm having a little trouble actually recreating this error, and initially thought it was related to azure functions: https://github.com/dask/dask/issues/11037.

I understand pretty well now the type of situations it'll happen in, so I'll just explain the cause of the bug.

Essentially, when the self._plan variable fo ReadParquetFSSpec is empty, it'll set it's internal _io_func property to be the identity function rather than the ParquetFunctionWrapper:

    @property
    def _io_func(self):
        if self._plan["empty"]:
            return identity
        dataset_info = self._dataset_info
        return ParquetFunctionWrapper(
            self.engine,
            dataset_info["fs"],
            dataset_info["base_meta"],
            self.columns,
            dataset_info["index"],
            dataset_info["kwargs"]["dtype_backend"],
            {},  # All kwargs should now be in `common_kwargs`
            self._plan["common_kwargs"],
        )

This causes issues later when attempting to collect parquet statistics:

def _collect_pq_statistics(
    expr: ReadParquet, columns: list | None = None
) -> list[dict] | None:
    """Collect Parquet statistic for dataset paths"""

    # Be strict about columns argument
    if columns:
        if not isinstance(columns, list):
            raise ValueError(f"Expected columns to be a list, got {type(columns)}.")
        allowed = {expr._meta.index.name} | set(expr.columns)
        if not set(columns).issubset(allowed):
            raise ValueError(f"columns={columns} must be a subset of {allowed}")

    # Collect statistics using layer information
    fs = expr._io_func.fs

The line:

fs = expr._io_func.fs

Will throw an error if _io_func is actually the identity function because, unlike the ParquetFunctionWrapper class it doesn't have an "fs" object.

That leads to an error with this kind of a stack:

  File \"/home/site/wwwroot/lib_functions.py\", line 423, in clipped_dataset
    return_df_size = len(return_df)
  File \"/home/site/wwwroot/.python_packages/lib/site-packages/dask_expr/_collection.py\", line 381, in __len__
    return new_collection(Len(self)).compute()
  File \"/home/site/wwwroot/.python_packages/lib/site-packages/dask_expr/_collection.py\", line 452, in compute
    out = out.optimize(fuse=fuse)
  File \"/home/site/wwwroot/.python_packages/lib/site-packages/dask_expr/_collection.py\", line 488, in optimize
    return new_collection(self.expr.optimize(fuse=fuse))
  File \"/home/site/wwwroot/.python_packages/lib/site-packages/dask_expr/_expr.py\", line 93, in optimize
    return optimize(self, **kwargs)
  File \"/home/site/wwwroot/.python_packages/lib/site-packages/dask_expr/_expr.py\", line 2877, in optimize
    return optimize_until(expr, stage)
  File \"/home/site/wwwroot/.python_packages/lib/site-packages/dask_expr/_expr.py\", line 2828, in optimize_until
    expr = result.simplify()
  File \"/home/site/wwwroot/.python_packages/lib/site-packages/dask_expr/_core.py\", line 371, in simplify
    new = expr.simplify_once(dependents=dependents, simplified={})
  File \"/home/site/wwwroot/.python_packages/lib/site-packages/dask_expr/_core.py\", line 332, in simplify_once
    out = child._simplify_up(expr, dependents)
  File \"/home/site/wwwroot/.python_packages/lib/site-packages/dask_expr/io/parquet.py\", line 622, in _simplify_up
    _lengths = self._get_lengths()
  File \"/home/site/wwwroot/.python_packages/lib/site-packages/dask_expr/io/parquet.py\", line 1311, in _get_lengths
    self._update_length_statistics()
  File \"/home/site/wwwroot/.python_packages/lib/site-packages/dask_expr/io/parquet.py\", line 1333, in _update_length_statistics
    stat[\"num-rows\"] for stat in _collect_pq_statistics(self)
  File \"/home/site/wwwroot/.python_packages/lib/site-packages/dask_expr/io/parquet.py\", line 1554, in _collect_pq_statistics
    fs = expr._io_func.fs
AttributeError: 'function' object has no attribute 'fs'

Environment:

phofl commented 2 months ago

I can't reproduce what you are running into unfortunately. I am not sure how we can get there, we would require an empty directory but this is caught beforehand. Filters can't be specified, we are protected against this case. I think we need a bit more information to understand how we end uo there

benrutter commented 2 months ago

@phofl - that's super interesting, I'm not that surprised that it's hard to reproduce, as it's been a bit of a hard to track down bug for me and so far I've only be able to recreate very specifically in an azure function, which is a really unusual environment for dask anyway.

It seems like a classic type mismatch error where something odd is probably happening at a different point but it just comes up there. I'll try and dig around to figure out how it's getting thrown - the code that threw the error trace I included was on dask 2024.3 if that makes a difference.

I think it might be caused by asking for the length of an empty dataframe from read_parquet actions, I'll do what I can to put together a minimal reproducible example.

phofl commented 2 months ago

Can you share the read_parquet invocation and what kwargs you are setting? That would be a good start

Ben @.***> schrieb am Fr. 12. Apr. 2024 um 20:05:

@phofl https://github.com/phofl - that's super interesting, I'm not that surprised that it's hard to reproduce, as it's been a bit of a hard to track down bug for me and so far I've only be able to recreate very specifically in an azure function, which is a really unusual environment for dask anyway.

It seems like a classic type mismatch error where something odd is probably happening at a different point but it just comes up there. I'll try and dig around to figure out how it's getting thrown - the code that threw the error trace I included was on dask 2024.3 if that makes a difference.

I think it might be caused by asking for the length of an empty dataframe from read_parquet actions, I'll do what I can to put together a minimal reproducible example.

— Reply to this email directly, view it on GitHub https://github.com/dask/dask-expr/issues/1021#issuecomment-2052231477, or unsubscribe https://github.com/notifications/unsubscribe-auth/AOYQZGGPE7SNIBC4XJ4CMYLY5AO77AVCNFSM6AAAAABGEM2CW6VHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDANJSGIZTCNBXG4 . You are receiving this because you were mentioned.Message ID: @.***>

benrutter commented 2 months ago

Yup, read parquet invocation looks like this with no special kwargs other than storage_options. I've taken out the real paths, and some extra code that interprets the list which in the case throwing the error (although weirdly again, I'm only seeing the error in an azure function environment and not elsewhere*) winds up with a list of just 1.

df = dd.read_parquet(["abfs://somecontainer/folder/*.parquet"], storage_options=storage_options)
this_bit_will_crash = len(df)

(worth noting that in the crashing instance I found, the folder was in fact empty which may or may not be relevant)

phofl commented 2 months ago

Can you share how big the parquet file is or don't you exactly know how big it is?

benrutter commented 2 months ago

I don't think there's any parquet file at all, just an empty folder in the case throwing the error. It's a step of a pipeline that's checking whether some data exists and then running some tests on it if it does. In this case, it's just an empty dataframe I think, which might be what's causing the problem (although that might be a red herring)

(I say empty dataframe, because that's the usual return of dd.read_parquet("some/not/matching/globstring/*.parquet") rather than it being a parquet with 0 data or something like that)

I also don't know if the empty data itself is causing the issue (I've found the bug in resources running at my work, so I'm slightly restricted on the tests I can run on it - sorrry, I know that's a bit annoying!)

phofl commented 2 months ago

Yeah found a reproducer, your folder is empty. Normally this would raise earlier (that's why I couldn't reproduce in another way), but globs in lists don't do that (for some reason)

benrutter commented 2 months ago

Ah that's amazing - nice one!!

I'm asking this just for my own learning, but how come the _collect_pq_statistics ran when it normally wouldn't? Is this because normally trying to read an empty folder with something like dd.read_parquet("empty/folder") throws an error (apart from when passed a list) so _collect_pq_statistics isn't expecting that as a possibility?

phofl commented 2 months ago

Yeah normally this raises way before you even get close to _collect_pq_statistics