JDASoftwareGroup / kartothek

A consistent table management library in python
https://kartothek.readthedocs.io/en/stable
MIT License
161 stars 53 forks source link

Fix bug when loading few columns of a dataset with many primary indices #446

Closed mlondschien closed 3 years ago

mlondschien commented 3 years ago

Description:

On master:

In [1]: import pandas as pd
   ...: import storefact
   ...: from functools import partial
   ...: from kartothek.io.eager import store_dataframes_as_dataset, read_table
   ...: 
   ...: path = "/tmp"
   ...: uuid = "dataset"
   ...: 
   ...: df = pd.DataFrame({"w": [-1, -2], "x": [1, 2], "y": ["a", "b"], "z": [0.0, 1.1]})
   ...: 
   ...: store = partial(storefact.get_store_from_url, f"hfs://{path}?create_if_missing=False")
   ...: dm = store_dataframes_as_dataset(
   ...:     dfs=[df],
   ...:     dataset_uuid=uuid,
   ...:     store=store,
   ...:     partition_on=["w", "x", "y"],
   ...:     overwrite=True,
   ...: )
   ...: 
   ...: df = read_table(dataset_uuid=uuid, store=store, columns=["y", "z"])
---------------------------------------------------------------------------
IndexError                                Traceback (most recent call last)
<ipython-input-1-a5c31589801c> in <module>
     18 )
     19 
---> 20 df = read_table(dataset_uuid=uuid, store=store, columns=["y", "z"])

~/code/kartothek/kartothek/io/eager.py in read_table(dataset_uuid, store, columns, predicate_pushdown_to_io, categoricals, dates_as_object, predicates, factory)
    248         dataset_uuid=dataset_uuid, store=store, factory=factory,
    249     )
--> 250     partitions = read_dataset_as_dataframes(
    251         columns=columns,
    252         predicate_pushdown_to_io=predicate_pushdown_to_io,

~/code/kartothek/kartothek/io/eager.py in read_dataset_as_dataframes(dataset_uuid, store, columns, predicate_pushdown_to_io, categoricals, dates_as_object, predicates, factory, dispatch_by)
    135     )
    136 
--> 137     mps = read_dataset_as_metapartitions(
    138         columns=columns,
    139         predicate_pushdown_to_io=predicate_pushdown_to_io,

~/code/kartothek/kartothek/io/eager.py in read_dataset_as_metapartitions(dataset_uuid, store, columns, predicate_pushdown_to_io, categoricals, dates_as_object, predicates, factory, dispatch_by)
    202         dispatch_by=dispatch_by,
    203     )
--> 204     return list(ds_iter)
    205 
    206 

~/code/kartothek/kartothek/io/iter.py in read_dataset_as_metapartitions__iterator(dataset_uuid, store, columns, predicate_pushdown_to_io, categoricals, dates_as_object, predicates, factory, dispatch_by)
     87         else:
     88             mp = cast(MetaPartition, mp)
---> 89             mp = mp.load_dataframes(
     90                 store=store,
     91                 columns=columns,

~/code/kartothek/kartothek/io_components/metapartition.py in _impl(self, *method_args, **method_kwargs)
    148         else:
    149             for mp in self:
--> 150                 method_return = method(mp, *method_args, **method_kwargs)
    151                 if not isinstance(method_return, MetaPartition):
    152                     raise ValueError(

~/code/kartothek/kartothek/io_components/metapartition.py in load_dataframes(self, store, columns, predicate_pushdown_to_io, categoricals, dates_as_object, predicates)
    701         # Metadata version >=4 parse the index columns and add them back to the dataframe
    702 
--> 703         df = self._reconstruct_index_columns(
    704             df=df,
    705             key_indices=indices,

~/code/kartothek/kartothek/io_components/metapartition.py in _reconstruct_index_columns(self, df, key_indices, columns, categories, date_as_object)
    800                 if convert_to_date:
    801                     value = pd.Timestamp(value).to_pydatetime().date()
--> 802             df.insert(pos, primary_key, value)
    803 
    804         return df

~/anaconda3/envs/kartothek/lib/python3.9/site-packages/pandas/core/frame.py in insert(self, loc, column, value, allow_duplicates)
   3622         self._ensure_valid_index(value)
   3623         value = self._sanitize_column(column, value, broadcast=False)
-> 3624         self._mgr.insert(loc, column, value, allow_duplicates=allow_duplicates)
   3625 
   3626     def assign(self, **kwargs) -> "DataFrame":

~/anaconda3/envs/kartothek/lib/python3.9/site-packages/pandas/core/internals/managers.py in insert(self, loc, item, value, allow_duplicates)
   1204             self._blknos = np.append(self._blknos, len(self.blocks))
   1205         else:
-> 1206             self._blklocs = np.insert(self._blklocs, loc, 0)
   1207             self._blknos = np.insert(self._blknos, loc, len(self.blocks))
   1208 

<__array_function__ internals> in insert(*args, **kwargs)

~/anaconda3/envs/kartothek/lib/python3.9/site-packages/numpy/lib/function_base.py in insert(arr, obj, values, axis)
   4556         index = indices.item()
   4557         if index < -N or index > N:
-> 4558             raise IndexError(
   4559                 "index %i is out of bounds for axis %i with "
   4560                 "size %i" % (obj, axis, N))

IndexError: index 2 is out of bounds for axis 0 with size 1

In [2]: 

This PR fixes this.

mlondschien commented 3 years ago

The above code snipped could serve as a test. There are no tests for _reconstruct_index_columns, but I could add some.

fjetter commented 3 years ago

I'm just confused since I assume that the enumerate and the manual increase of the pos counter should be identical so I am not sure what this change is doing.

mlondschien commented 3 years ago

We don't increase pos if the condition in L772 holds true, as in the above example. One could also always insert at pos=0, since IIUC the column order gets shuffled later anyways.

mlondschien commented 3 years ago

@fjetter could you have another look at this?

fjetter commented 3 years ago

Sorry for the delay. Can you please rebase and ping when green?

mlondschien commented 3 years ago

Test failures are also on master.