JDASoftwareGroup / kartothek

A consistent table management library in python
https://kartothek.readthedocs.io/en/stable
MIT License
161 stars 53 forks source link

Index build pipeline may build indices with incompatible types #217

Open fjetter opened 4 years ago

fjetter commented 4 years ago

Problem description

The usage of an index build pipeline build_dataset_indices__bag may build indices of incompatible types when building an index for a date type column, leaving the dataset in an unusable state

Example code (ideally copy-pastable)

import datetime
from functools import partial

import pandas as pd
from storefact import get_store_from_url

from kartothek.io.dask.bag import build_dataset_indices__bag
from kartothek.io.eager import store_dataframes_as_dataset

df = pd.DataFrame(
    {"date_col": pd.Series([datetime.date(2020, 1, 1)]), "payload": ["A"]}
)
store_factory = partial(get_store_from_url, "hfs://test")

store_dataframes_as_dataset(store_factory, "test_uuid", [df])
bag = build_dataset_indices__bag(store_factory, "test_uuid", columns=["date_col"])
ds = bag.compute()[0]

# This should also be a date. Otherwise subsequent updates will fail
ds.indices['date_col'].dtype == "datetime64[ns]"
lr4d commented 4 years ago

This doesn't just affect date dtypes:

import datetime
from functools import partial

import pandas as pd
from storefact import get_store_from_url

from kartothek.io.dask.bag import build_dataset_indices__bag
from kartothek.io.eager import store_dataframes_as_dataset

df = pd.DataFrame(
    {"date_col": pd.Series([datetime.date(2020, 1, 1)]), "payload": ["A"], "int": [2]}
)
store_factory = partial(get_store_from_url, "hfs://test")
dataset_uuid = "uuid"

store_dataframes_as_dataset(store_factory, dataset_uuid, [df])
bag = build_dataset_indices__bag(store_factory, dataset_uuid, columns=["date_col", "payload", "int"])
ds = bag.compute()[0]

[index.dtype for index in ds.indices.values()]
# Out[31]: [None, None, None]

But I also see the dtype attribute of ExplicitSecondaryIndex being None in standard store pipelines:

In [32]: import datetime
    ...: from functools import partial
    ...:
    ...: import pandas as pd
    ...: from storefact import get_store_from_url
    ...:
    ...: from kartothek.io.dask.bag import build_dataset_indices__bag
    ...: from kartothek.io.eager import store_dataframes_as_dataset
    ...:
    ...: df = pd.DataFrame(
    ...:     {"date_col": pd.Series([datetime.date(2020, 1, 1)]), "payload": ["A
    ...: "], "int": [2]}
    ...: )
    ...: store_factory = partial(get_store_from_url, "hfs://test")
    ...: dataset_uuid = "uuid2"
    ...:
    ...: dm = store_dataframes_as_dataset(store_factory, dataset_uuid, [df], sec
    ...: ondary_indices=["date_col", "payload", "int"])

In [33]: dm.indices
Out[33]:
OrderedDict([('date_col',
              ExplicitSecondaryIndex(index_storage_key=uuid2/indices/date_col/2020-02-10T12%3A58%3A18.581458.by-dataset-index.parquet, column=date_col, dtype=None, creation_time=2020-02-10 12:58:18.600849, index_dct=[], _index_dct_available=False)),
             ('payload',
              ExplicitSecondaryIndex(index_storage_key=uuid2/indices/payload/2020-02-10T12%3A58%3A18.583370.by-dataset-index.parquet, column=payload, dtype=None, creation_time=2020-02-10 12:58:18.600867, index_dct=[], _index_dct_available=False)),
             ('int',
              ExplicitSecondaryIndex(index_storage_key=uuid2/indices/int/2020-02-10T12%3A58%3A18.584579.by-dataset-index.parquet, column=int, dtype=None, creation_time=2020-02-10 12:58:18.600871, index_dct=[], _index_dct_available=False))])

In [34]: [index.dtype for index in dm.indices.values()]
Out[34]: [None, None, None]
fjetter commented 4 years ago

@lr4d In your example the index is probably not flagged as loaded, i.e. the object which is in memory is merely a stub were the dtype information is not set. I would also consider this a bug but a different one