dask / dask

Parallel computing with task scheduling
https://dask.org
BSD 3-Clause "New" or "Revised" License
12.6k stars 1.71k forks source link

Dask Nunique bug under dask 2024.2.1 #10982

Open frbelotto opened 8 months ago

frbelotto commented 8 months ago

Hello guys, Take this CSV as an example dataframe. I am sorry but I could set an example dataframe by coding able to reproduce such bug teste.csv

Now, lets open and execute the query on the example dataframe under dask 2023.10.0:

import dask.dataframe as dd
ddf = dd.read_csv('teste.csv', dtype = {'status':'category', 'produto':'category','parceiro':'category', 
                                            'mci' : 'Int32','marca':'category', 'sku' :'string', 'cod_transacao': 'string', 'forma_pagamento':'category',
                                            'gmv' : 'Float32', 'receita' : 'Float32', 'cashback':'Float32'})
base_consumo = ddf.groupby(['mci', 'marca'], dropna=False, observed=True)['marca'].nunique().to_frame()
base_consumo.head()

image

Runs ok!

Now, lets do the same test under dask 2024.2.1

import dask.dataframe as dd
ddf = dd.read_csv('teste.csv', dtype = {'status':'category', 'produto':'category','parceiro':'category', 
                                            'mci' : 'Int32','marca':'category', 'sku' :'string', 'cod_transacao': 'string', 'forma_pagamento':'category',
                                            'gmv' : 'Float32', 'receita' : 'Float32', 'cashback':'Float32'})
base_consumo = ddf.groupby(['mci', 'marca'], dropna=False, observed=True)['marca'].nunique().to_frame()
base_consumo.head()

---------------------------------------------------------------------------
KeyError                                  Traceback (most recent call last)
File [c:\Users\fabio\AppData\Local\Programs\Python\Python312\Lib\site-packages\dask\dataframe\utils.py:194](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/utils.py:194), in raise_on_meta_error(funcname, udf)
    [193](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/utils.py:193) try:
--> [194](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/utils.py:194)     yield
    [195](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/utils.py:195) except Exception as e:

File [c:\Users\fabio\AppData\Local\Programs\Python\Python312\Lib\site-packages\dask\dataframe\core.py:7174](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/core.py:7174), in _emulate(func, udf, *args, **kwargs)
   [7173](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/core.py:7173) with raise_on_meta_error(funcname(func), udf=udf), check_numeric_only_deprecation():
-> [7174](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/core.py:7174)     return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))

File [c:\Users\fabio\AppData\Local\Programs\Python\Python312\Lib\site-packages\dask\dataframe\groupby.py:781](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/groupby.py:781), in _nunique_df_aggregate(df, levels, name, sort)
    [780](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/groupby.py:780) def _nunique_df_aggregate(df, levels, name, sort=False):
--> [781](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/groupby.py:781)     return df.groupby(level=levels, sort=sort, observed=True)[name].nunique()

File [c:\Users\fabio\AppData\Local\Programs\Python\Python312\Lib\site-packages\pandas\core\groupby\generic.py:1951](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/pandas/core/groupby/generic.py:1951), in DataFrameGroupBy.__getitem__(self, key)
   [1947](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/pandas/core/groupby/generic.py:1947)     raise ValueError(
   [1948](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/pandas/core/groupby/generic.py:1948)         "Cannot subset columns with a tuple with more than one element. "
   [1949](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/pandas/core/groupby/generic.py:1949)         "Use a list instead."
   [1950](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/pandas/core/groupby/generic.py:1950)     )
-> [1951](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/pandas/core/groupby/generic.py:1951) return super().__getitem__(key)

File [c:\Users\fabio\AppData\Local\Programs\Python\Python312\Lib\site-packages\pandas\core\base.py:244](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/pandas/core/base.py:244), in SelectionMixin.__getitem__(self, key)
    [243](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/pandas/core/base.py:243) if key not in self.obj:
--> [244](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/pandas/core/base.py:244)     raise KeyError(f"Column not found: {key}")
    [245](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/pandas/core/base.py:245) ndim = self.obj[key].ndim

KeyError: 'Column not found: marca'

The above exception was the direct cause of the following exception:

ValueError                                Traceback (most recent call last)
Cell In[3], [line 1](vscode-notebook-cell:?execution_count=3&line=1)
----> [1](vscode-notebook-cell:?execution_count=3&line=1) base_consumo = ddf.groupby(['mci', 'marca'], dropna=False, observed=True)['marca'].nunique().to_frame()
      [2](vscode-notebook-cell:?execution_count=3&line=2) base_consumo.head()

File [c:\Users\fabio\AppData\Local\Programs\Python\Python312\Lib\site-packages\dask\dataframe\groupby.py:3078](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/groupby.py:3078), in SeriesGroupBy.nunique(self, split_every, split_out)
   [3075](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/groupby.py:3075) else:
   [3076](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/groupby.py:3076)     chunk = _nunique_series_chunk
-> [3078](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/groupby.py:3078) return aca(
   [3079](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/groupby.py:3079)     [self.obj, self.by]
   [3080](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/groupby.py:3080)     if not isinstance(self.by, list)
   [3081](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/groupby.py:3081)     else [self.obj] + self.by,
   [3082](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/groupby.py:3082)     chunk=chunk,
   [3083](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/groupby.py:3083)     aggregate=_nunique_df_aggregate,
   [3084](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/groupby.py:3084)     combine=_nunique_df_combine,
   [3085](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/groupby.py:3085)     token="series-groupby-nunique",
   [3086](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/groupby.py:3086)     chunk_kwargs={"levels": levels, "name": name},
   [3087](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/groupby.py:3087)     aggregate_kwargs={"levels": levels, "name": name},
   [3088](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/groupby.py:3088)     combine_kwargs={"levels": levels},
   [3089](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/groupby.py:3089)     split_every=split_every,
   [3090](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/groupby.py:3090)     split_out=split_out,
   [3091](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/groupby.py:3091)     split_out_setup=split_out_on_index,
   [3092](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/groupby.py:3092)     sort=self.sort,
   [3093](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/groupby.py:3093) )

File [c:\Users\fabio\AppData\Local\Programs\Python\Python312\Lib\site-packages\dask\dataframe\core.py:7128](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/core.py:7128), in apply_concat_apply(args, chunk, aggregate, combine, meta, token, chunk_kwargs, aggregate_kwargs, combine_kwargs, split_every, split_out, split_out_setup, split_out_setup_kwargs, sort, ignore_index, **kwargs)
   [7126](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/core.py:7126) if meta is no_default:
   [7127](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/core.py:7127)     meta_chunk = _emulate(chunk, *args, udf=True, **chunk_kwargs)
-> [7128](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/core.py:7128)     meta = _emulate(
   [7129](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/core.py:7129)         aggregate, _concat([meta_chunk], ignore_index), udf=True, **aggregate_kwargs
   [7130](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/core.py:7130)     )
   [7131](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/core.py:7131) meta = make_meta(
   [7132](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/core.py:7132)     meta,
   [7133](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/core.py:7133)     index=(getattr(make_meta(dfs[0]), "index", None) if dfs else None),
   [7134](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/core.py:7134)     parent_meta=dfs[0]._meta,
   [7135](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/core.py:7135) )
   [7137](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/core.py:7137) graph = HighLevelGraph.from_collections(final_name, layer, dependencies=(chunked,))

File [c:\Users\fabio\AppData\Local\Programs\Python\Python312\Lib\site-packages\dask\dataframe\core.py:7173](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/core.py:7173), in _emulate(func, udf, *args, **kwargs)
   [7168](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/core.py:7168) def _emulate(func, *args, udf=False, **kwargs):
   [7169](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/core.py:7169)     """
   [7170](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/core.py:7170)     Apply a function using args / kwargs. If arguments contain dd.DataFrame /
   [7171](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/core.py:7171)     dd.Series, using internal cache (``_meta``) for calculation
   [7172](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/core.py:7172)     """
-> [7173](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/core.py:7173)     with raise_on_meta_error(funcname(func), udf=udf), check_numeric_only_deprecation():
   [7174](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/core.py:7174)         return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))

File [c:\Users\fabio\AppData\Local\Programs\Python\Python312\Lib\contextlib.py:158](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/contextlib.py:158), in _GeneratorContextManager.__exit__(self, typ, value, traceback)
    [156](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/contextlib.py:156)     value = typ()
    [157](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/contextlib.py:157) try:
--> [158](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/contextlib.py:158)     self.gen.throw(value)
    [159](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/contextlib.py:159) except StopIteration as exc:
    [160](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/contextlib.py:160)     # Suppress StopIteration *unless* it's the same exception that
    [161](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/contextlib.py:161)     # was passed to throw().  This prevents a StopIteration
    [162](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/contextlib.py:162)     # raised inside the "with" statement from being suppressed.
    [163](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/contextlib.py:163)     return exc is not value

File [c:\Users\fabio\AppData\Local\Programs\Python\Python312\Lib\site-packages\dask\dataframe\utils.py:215](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/utils.py:215), in raise_on_meta_error(funcname, udf)
    [206](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/utils.py:206) msg += (
    [207](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/utils.py:207)     "Original error is below:\n"
    [208](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/utils.py:208)     "------------------------\n"
   (...)
    [212](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/utils.py:212)     "{2}"
    [213](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/utils.py:213) )
    [214](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/utils.py:214) msg = msg.format(f" in `{funcname}`" if funcname else "", repr(e), tb)
--> [215](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/utils.py:215) raise ValueError(msg) from e

ValueError: Metadata inference failed in `_nunique_df_aggregate`.

You have supplied a custom function and Dask is unable to 
determine the type of output that that function returns. 

To resolve this please provide a meta= keyword.
The docstring of the Dask function you ran should have more information.

Original error is below:
------------------------
KeyError('Column not found: marca')

Traceback:
---------
  File "c:\Users\fabio\AppData\Local\Programs\Python\Python312\Lib\site-packages\dask\dataframe\utils.py", line 194, in raise_on_meta_error
    yield
  File "c:\Users\fabio\AppData\Local\Programs\Python\Python312\Lib\site-packages\dask\dataframe\core.py", line 7174, in _emulate
    return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "c:\Users\fabio\AppData\Local\Programs\Python\Python312\Lib\site-packages\dask\dataframe\groupby.py", line 781, in _nunique_df_aggregate
    return df.groupby(level=levels, sort=sort, observed=True)[name].nunique()
           ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^
  File "c:\Users\fabio\AppData\Local\Programs\Python\Python312\Lib\site-packages\pandas\core\groupby\generic.py", line 1951, in __getitem__
    return super().__getitem__(key)
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "c:\Users\fabio\AppData\Local\Programs\Python\Python312\Lib\site-packages\pandas\core\base.py", line 244, in __getitem__
    raise KeyError(f"Column not found: {key}")

Environment:

phofl commented 8 months ago

The query doesn't make much sense, you are computing nunique on one of the group columns which will always return 1

We should fix this anyway though

frbelotto commented 8 months ago

The query doesn't make much sense, you are computing nunique on one of the group columns which will always return 1

We should fix this anyway though

LOL. This query is part of a bigger query sentence to extract how many unique clients ("MCIs") have consumed from each brand. Maybe it could be written is a smarter way, but when we get what was expected, I as not changing it anymore LOL

**base_consumo = gerado.loc[(gerado['produto'] == 'Afiliados') & (gerado['data'] >= datetime(2024,1,1))].groupby(['mci', 'marca'], dropna=False, observed=True)['marca'].nunique().to_frame()**
base_consumo = base_consumo.groupby(['mci'], dropna=False, observed=True).aggregate({'marca' : 'sum'})
base_consumo = base_consumo.rename(columns={'marca' : 'qtde_marcas'}).reset_index()
base_consumo = base_consumo.groupby('qtde_marcas', dropna=False, observed=True)['mci'].nunique().to_frame()
base_consumo = base_consumo.compute()
base_consumo.to_excel(f'{pastaloja}\\Clientes_por_marcas_afiliados.xlsx', merge_cells=False)
phofl commented 8 months ago

PRs to fix are welcome

frbelotto commented 8 months ago

PRs to fix are welcome

Is there a newbie guide for where to start? My knowledges on python is average, but I've no experience in building, sharing and keeping a library. I don't even know how to read the source code of the unique method so I could try to better understand what is happening.

frbelotto commented 8 months ago

And for the related example of bug,it's interesting the the "marca" column returns an error, but any other column seems to work. My first thought was that is something related to the category dtype ( a very buggy dtype), but I've tried changing it to string and the error persist

phofl commented 8 months ago

The error happens because Marca is part of your grouping keys, it’s not dtype related

frbelotto commented 8 months ago

The error happens because Marca is part of your grouping keys, it’s not dtype related

But it does not happen if I use MCI column.