dask / dask

Parallel computing with task scheduling
https://dask.org
BSD 3-Clause "New" or "Revised" License
12.59k stars 1.71k forks source link

datatype conversion error over dask-expr use #11497

Closed frbelotto closed 19 hours ago

frbelotto commented 1 week ago

Hi guys! I´ve been facing a weird behavior when trying to use dask-expr. I am sorry I didn´t provide any reproducible code because I couldn't create any!

This is my code. It runs a loop over a list of dask dataframes.

def enviar_novas_bases():
    for df in list_dfs:
        logger.info(f'Iniciando o envio dos dados referentes ao DF {var_name(df)}')
        df = df.drop(columns = ['sku'])
        if produtos != None:
            df = df.loc[df.produto.isin(produtos)]
        for i in range(df.npartitions):
            logger.info(f'Enviando parte {i+1} de {df.npartitions}, contendo {len(df.get_partition(i))} registros')
            flag = False
            while flag == False :
                try : 
                    logger.info(f'Iniciando o envio às {datetime.now()}')
                    df.get_partition(i).to_sql('shoppingbb', uri = enginedb2uri, method = None, chunksize = 250000, parallel= True,
                                               schema='DB2I023A', if_exists='append', index=False, dtype = sql_types)
                    enginedb2.commit()
                    logger.info(f'Envio concluido às {datetime.now()} às {datetime.now()}')
                    flag = True
                except exc.PendingRollbackError as error:
                    logger.error(f'Erro {error} enviando o arquivo, realizando rollback')
                    enginedb2.rollback()
                    alertas(erro = True, msgerro = error)
                    raise SystemExit("Stop here")
enviar_novas_bases()

It runs up to the "Iniciando o envio dos dados referentes ao DF df_afiliados"

this is the dataframe

<class 'dask_expr.DataFrame'>
Columns: 12 entries, status to cashback
dtypes: dictionary<values=string, indices=int32, ordered=0>[pyarrow](3), double[pyarrow](1), float[pyarrow](3), int32[pyarrow](1), string(3), timestamp[ns][pyarrow](1)

and this is the error

---------------------------------------------------------------------------
ArrowTypeError                            Traceback (most recent call last)
File <timed exec>:35

File <timed exec>:18, in enviar_novas_bases()

File /projeto/libs/lib/python3.11/site-packages/dask_expr/_collection.py:776, in FrameBase.isin(self, values)
    771         except ValueError:
    772             # Numpy 1.23 supports creating arrays of iterables, while lower
    773             # version 1.21.x and 1.22.x do not
    774             pass
--> 776 return new_collection(
    777     expr.Isin(
    778         self,
    779         values=expr._DelayedExpr(
    780             delayed(values, name="delayed-" + _tokenize_deterministic(values))
    781         ),
    782     )
    783 )

File /projeto/libs/lib/python3.11/site-packages/dask_expr/_collection.py:4803, in new_collection(expr)
   4801 def new_collection(expr):
   4802     """Create new collection from an expr"""
-> 4803     meta = expr._meta
   4804     expr._name  # Ensure backend is imported
   4805     return get_collection_type(meta)(expr)

File /usr/local/lib/python3.11/functools.py:1001, in cached_property.__get__(self, instance, owner)
    999 val = cache.get(self.attrname, _NOT_FOUND)
   1000 if val is _NOT_FOUND:
-> 1001     val = self.func(instance)
   1002     try:
   1003         cache[self.attrname] = val

File /projeto/libs/lib/python3.11/site-packages/dask_expr/_expr.py:1372, in Isin._meta(self)
   1369 @functools.cached_property
   1370 def _meta(self):
   1371     return make_meta(
-> 1372         meta_nonempty(self.frame._meta).isin(
   1373             meta_nonempty(self.frame._meta).iloc[[0]]
   1374         )
   1375     )

File /projeto/libs/lib/python3.11/site-packages/pandas/core/series.py:5559, in Series.isin(self, values)
   5486 def isin(self, values) -> Series:
   5487     """
   5488     Whether elements in Series are contained in `values`.
   5489 
   (...)
   5557     dtype: bool
   5558     """
-> 5559     result = algorithms.isin(self._values, values)
   5560     return self._constructor(result, index=self.index, copy=False).__finalize__(
   5561         self, method="isin"
   5562     )

File /projeto/libs/lib/python3.11/site-packages/pandas/core/algorithms.py:505, in isin(comps, values)
    502 comps_array = extract_array(comps_array, extract_numpy=True)
    503 if not isinstance(comps_array, np.ndarray):
    504     # i.e. Extension Array
--> 505     return comps_array.isin(values)
    507 elif needs_i8_conversion(comps_array.dtype):
    508     # Dispatch to DatetimeLikeArrayMixin.isin
    509     return pd_array(comps_array).isin(values)

File /projeto/libs/lib/python3.11/site-packages/pandas/core/arrays/arrow/array.py:1113, in ArrowExtensionArray.isin(self, values)
   1110 if not len(values):
   1111     return np.zeros(len(self), dtype=bool)
-> 1113 result = pc.is_in(self._pa_array, value_set=pa.array(values, from_pandas=True))
   1114 # pyarrow 2.0.0 returned nulls, so we explicitly specify dtype to convert nulls
   1115 # to False
   1116 return np.array(result, dtype=np.bool_)

File /projeto/libs/lib/python3.11/site-packages/pyarrow/compute.py:264, in _make_generic_wrapper.<locals>.wrapper(memory_pool, options, *args, **kwargs)
    262 if args and isinstance(args[0], Expression):
    263     return Expression._call(func_name, list(args), options)
--> 264 return func.call(args, options, memory_pool)

File /projeto/libs/lib/python3.11/site-packages/pyarrow/_compute.pyx:385, in pyarrow._compute.Function.call()

File /projeto/libs/lib/python3.11/site-packages/pyarrow/error.pxi:155, in pyarrow.lib.pyarrow_internal_check_status()

File /projeto/libs/lib/python3.11/site-packages/pyarrow/error.pxi:92, in pyarrow.lib.check_status()

ArrowTypeError: Array type doesn't match type of values set: string vs dictionary<values=string, indices=int32, ordered=0>

This error ONLY happers after installing dask-expr. Without it, the code runs normally. using dask.config.set({"dataframe.convert-string": False}) does not change the results (despite of the Dataframe keeps as pyarrow because it came that way from the data source)

Can you give any tips? I am specially worried as legacy implementation is being deprecated!

Environment:

phofl commented 1 week ago

Can you post something that is reproducible for us? Currently we are missing the source of the data

phofl commented 19 hours ago

Closing this for now, please ping to reopen if you have a reproducer