dask / fastparquet

python implementation of the parquet columnar file format.
Apache License 2.0
776 stars 178 forks source link

Corrupted DataFrame when loading partitioned parquet #715

Open r-stiller opened 2 years ago

r-stiller commented 2 years ago

What happened: Error when working with a pandas.DataFrame that has been loaded from a partitioned parquet file.

What you expected to happen: Non corrupted DataFrame.

Minimal Complete Verifiable Example:

import fastparquet as fp
import pandas as pd

path = './bug_test' 

df1 = pd.DataFrame({'A':[1, 2], 'B':[1, 2]})
df2 = pd.DataFrame({'A':[3, 4], 'B':[3, 4]})              

df1.to_parquet(path, engine='fastparquet', partition_cols=['B'])

df1_load = fp.ParquetFile(path).to_pandas()

pd.concat([df1, df2])   # does work
pd.concat([df1_load, df2])   # doesn't work

---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
~\AppData\Local\Temp/ipykernel_9888/2521671170.py in <module>
     12 
     13 pd.concat([df1, df2])
---> 14 pd.concat([df1_load, df2])

F:\Programme\miniconda3\envs\tf\lib\site-packages\pandas\util\_decorators.py in wrapper(*args, **kwargs)
    309                     stacklevel=stacklevel,
    310                 )
--> 311             return func(*args, **kwargs)
    312 
    313         return wrapper

F:\Programme\miniconda3\envs\tf\lib\site-packages\pandas\core\reshape\concat.py in concat(objs, axis, join, ignore_index, keys, levels, names, verify_integrity, sort, copy)
    305     )
    306 
--> 307     return op.get_result()
    308 
    309 

F:\Programme\miniconda3\envs\tf\lib\site-packages\pandas\core\reshape\concat.py in get_result(self)
    530                 mgrs_indexers.append((obj._mgr, indexers))
    531 
--> 532             new_data = concatenate_managers(
    533                 mgrs_indexers, self.new_axes, concat_axis=self.bm_axis, copy=self.copy
    534             )

F:\Programme\miniconda3\envs\tf\lib\site-packages\pandas\core\internals\concat.py in concatenate_managers(mgrs_indexers, axes, concat_axis, copy)
    224             fastpath = blk.values.dtype == values.dtype
    225         else:
--> 226             values = _concatenate_join_units(join_units, concat_axis, copy=copy)
    227             fastpath = False
    228 

F:\Programme\miniconda3\envs\tf\lib\site-packages\pandas\core\internals\concat.py in _concatenate_join_units(join_units, concat_axis, copy)
    488     upcasted_na = _dtype_to_na_value(empty_dtype, has_none_blocks)
    489 
--> 490     to_concat = [
    491         ju.get_reindexed_values(empty_dtype=empty_dtype, upcasted_na=upcasted_na)
    492         for ju in join_units

F:\Programme\miniconda3\envs\tf\lib\site-packages\pandas\core\internals\concat.py in <listcomp>(.0)
    489 
    490     to_concat = [
--> 491         ju.get_reindexed_values(empty_dtype=empty_dtype, upcasted_na=upcasted_na)
    492         for ju in join_units
    493     ]

F:\Programme\miniconda3\envs\tf\lib\site-packages\pandas\core\internals\concat.py in get_reindexed_values(self, empty_dtype, upcasted_na)
    405             # No upcasting is necessary
    406             fill_value = self.block.fill_value
--> 407             values = self.block.get_values()
    408         else:
    409             fill_value = upcasted_na

F:\Programme\miniconda3\envs\tf\lib\site-packages\pandas\core\internals\blocks.py in get_values(self, dtype)
   1358             values = values.astype(object)
   1359         # TODO(EA2D): reshape not needed with 2D EAs
-> 1360         return np.asarray(values).reshape(self.shape)
   1361 
   1362     def interpolate(

ValueError: cannot reshape array of size 2 into shape (1,0)

Anything else we need to know?: The DataFrame works again when making a copy of it df1_loaded = df1_loaded.copy(). When you compare the __dicts__ of the original and the copy, you can see that the CategoricalBlock size has changed from 0 to 2:

df1_load.__dict__
Out[1]: 
{'_is_copy': None,
 '_mgr': BlockManager
 Items: Index(['A', 'B'], dtype='object')
 Axis 1: RangeIndex(start=0, stop=2, step=1)
 NumericBlock: slice(0, 1, 1), 1 x 2, dtype: int64
 CategoricalBlock: slice(1, 2, 1), 1 x 0, dtype: category,
 '_item_cache': {},
 '_attrs': {},
 '_flags': <Flags(allows_duplicate_labels=True)>}
df1_load.copy().__dict__
Out[1]: 
{'_is_copy': None,
 '_mgr': BlockManager
 Items: Index(['A', 'B'], dtype='object')
 Axis 1: RangeIndex(start=0, stop=2, step=1)
 NumericBlock: slice(0, 1, 1), 1 x 2, dtype: int64
 CategoricalBlock: slice(1, 2, 1), 1 x 2, dtype: category,
 '_item_cache': {},
 '_attrs': {},
 '_flags': <Flags(allows_duplicate_labels=True)>}

I guess this error is a direct consequence of #653 since the dtype of row 'B' is changed from int64 to categorical.

df1['B'].dtype
Out[1]: 
dtype('int64')
df1_load['B'].dtype
Out[1]: 
CategoricalDtype(categories=[1, 2], ordered=False)

Environment:

martindurant commented 2 years ago

I would say that "corrupted" is not correct here, since you do not get incorrect data, but, rather, a workflow that doesn't complete due to an exception. (sorry, I know this is just semantics)

I would say that the behaviour in this case is reasonable, if not exactly as you were expecting. Partitioning a dataset on load is indeed casting the data type to catagorical - I'm not sure if this is explicitly mentioned anywhere in the docs. For the (common) case of a string field, this is of great advantage in memory footprint on read. It is, perhaps, surprising that pandas will not concatenate these dataframes.

On write, the original dtype of the columns used for partitioning is saved in metadata, so it would be to reasonable request that the loaded dataset should have exactly the same types as the original.

May I suggest that the following workaround is a reasonable way to cope? This should not do any unnecessary copying.

pd.concat([df1_load.astype(df1.dtypes), df2])
r-stiller commented 2 years ago

Thank you for the reply. You're right corrupted is the wrong term here.

I do understand that casting into a categorical has it's benefits and I wouldn't have noticed if the CategoricalBlock had the correct size. I think that point should be fixed. (the dtype of the copy ist still categorical but concatenate works).

Maybe the type casting should be added to the docs, too.

I had to change your workaround a little bit to pd.concat([df1_load[:], df2]) since my df1 is not always available (For e.g. when starting my program and only load data from disk).

martindurant commented 2 years ago

I do understand that casting into a categorical has it's benefits and I wouldn't have noticed if the CategoricalBlock had the correct size.

Oh, I see. That would be something in fastparquet.df.empty, but probably we'd need to pull in someone from pandas to get the incantation correct. As you can see in that code, categoricals are explicitly handles in a couple of different places, and I'm really not sure where this behaviour might be coming from.

cc @jbrockmendel

jbrockmendel commented 2 years ago

that size change definitely looks wonky. I can reproduce it locally with pandas 1.3.4 but not with master (we're looking at 1.3.5 in the next few days and 1.4 late Dec).

Is fp constructing the CategoricalBlock directly or maybe pinning its .values attribute after __init__?

martindurant commented 2 years ago

If it's not an issue on upcoming pandas, and we have a workaround here, I am tempted no to take any action.