JDASoftwareGroup / kartothek

A consistent table management library in python
https://kartothek.readthedocs.io/en/stable
MIT License
161 stars 53 forks source link

read_dataset_as_ddf does not return stored datetime index #129

Open MichalChromcak opened 5 years ago

MichalChromcak commented 5 years ago

Problem description

When reading kartothek dataset with read_dataset_as_ddf I am losing original datetime index when stored with update_dataset_from_ddf. Even though children parquet files in kartothek datasets' directory still keep the index as datetime. Can you please take a look on that? Original data comes from machine sensors, being mocked here:

Example code (ideally copy-pastable)

import pandas as pd
from functools import partial
from storefact import get_store_from_url
from kartothek.io.dask.dataframe import update_dataset_from_ddf
from kartothek.io.dask.dataframe import read_dataset_as_ddf
import dask.dataframe as dd

# Create dask dataframe
ddf = (dd.from_pandas(
        pd.DataFrame({'dateTime':[pd.Timestamp('2019-08-15T23:59:58'),pd.Timestamp('2019-08-15T23:59:59')],
                    'plantCode':['PL1','PL2'],
                    'value':[80.0,90.0],
                    'variableName':['sensor_1','sensor_2']}), 
        npartitions=1)
        .assign(date=lambda x:x['dateTime'].dt.date)
        .set_index('dateTime')
     )

Check index type

ddf.index

Output being correctly datetime64[ns]

Dask Index Structure:
npartitions=1
2019-08-15 23:59:58    datetime64[ns]
2019-08-15 23:59:59               ...
Name: dateTime, dtype: datetime64[ns]
Dask Name: sort_index, 7 tasks

Creating dataset and reading it back

store = './karto/'
store_factory = partial(get_store_from_url, "hfs://" + store)

update_dataset_from_ddf(ddf=ddf,
                        store=store_factory,
                        dataset_uuid='test_data',
                        table = 'table',
                        partition_on=['date']).compute()

ds = read_dataset_as_ddf(dataset_uuid='test_data',
                         store=store_factory,
                         table='table')

Checking index

ds.index

Outputs incorrectly int64[ns]

Dask Index Structure:
npartitions=1
    int64
      ...
dtype: int64
Dask Name: from-delayed, 4 tasks

While reading dd.read_parquet one of the children parquets of kartothek directory keeps the datetime index

dd.read_parquet('./karto/test_data/table/date=2019-08-15/68dc115bd83d4de1a701ebc567c6e6f6.parquet').index

Output being correctly datetime64[ns]

Dask Index Structure:
npartitions=1
    datetime64[ns]
               ...
Name: dateTime, dtype: datetime64[ns]
Dask Name: read-parquet, 2 tasks

Used versions

``` # Paste your output of `pip freeze` or `conda list` here # Name Version Build Channel appnope 0.1.0 py37_1000 conda-forge arrow-cpp 0.13.0 py37h8cfbac2_0 astroid 2.2.5 py37_0 atomicwrites 1.3.0 pypi_0 pypi attrs 19.1.0 pypi_0 pypi backcall 0.1.0 py_0 conda-forge bleach 3.1.0 pypi_0 pypi bokeh 1.3.4 py37_0 conda-forge boost-cpp 1.67.0 h1de35cc_4 brotli 1.0.7 h0a44026_0 bzip2 1.0.8 h1de35cc_0 ca-certificates 2019.9.11 hecc5488_0 conda-forge certifi 2019.9.11 py37_0 conda-forge chardet 3.0.4 pypi_0 pypi click 7.0 py_0 conda-forge cloudpickle 1.2.1 py_0 conda-forge coverage 4.5.4 pypi_0 pypi croniter 0.3.30 pypi_0 pypi cytoolz 0.10.0 py37h01d97ff_0 conda-forge dask 2.3.0 py_0 conda-forge dask-core 2.3.0 py_0 conda-forge decorator 4.4.0 py_0 conda-forge defusedxml 0.6.0 pypi_0 pypi distributed 2.3.2 py_1 conda-forge docker 4.0.2 pypi_0 pypi double-conversion 3.1.5 haf313ee_1 entrypoints 0.3 pypi_0 pypi fastparquet 0.3.2 py37heacc8b8_0 conda-forge freetype 2.10.0 h24853df_1 conda-forge fsspec 0.4.3 py_0 conda-forge gflags 2.2.2 h0a44026_0 glog 0.4.0 h0a44026_0 heapdict 1.0.0 py37_1000 conda-forge icu 58.2 h4b95b61_1 idna 2.8 pypi_0 pypi importlib-metadata 0.20 pypi_0 pypi ipykernel 5.1.2 py37h5ca1d4c_0 conda-forge ipython 7.8.0 py37h5ca1d4c_0 conda-forge ipython_genutils 0.2.0 py_1 conda-forge ipywidgets 7.5.1 pypi_0 pypi isort 4.3.21 py37_0 jedi 0.15.1 py37_0 conda-forge jinja2 2.10.1 py_0 conda-forge jpeg 9c h1de35cc_1001 conda-forge json5 0.8.5 pypi_0 pypi jsonschema 3.0.2 pypi_0 pypi jupyter 1.0.0 pypi_0 pypi jupyter-console 6.0.0 pypi_0 pypi jupyter_client 5.3.1 py_0 conda-forge jupyter_core 4.4.0 py_0 conda-forge jupyterlab 1.1.1 pypi_0 pypi jupyterlab-server 1.0.6 pypi_0 pypi kartothek 3.3.0 pypi_0 pypi lazy-object-proxy 1.4.2 py37h1de35cc_0 libblas 3.8.0 12_openblas conda-forge libboost 1.67.0 hebc422b_4 libcblas 3.8.0 12_openblas conda-forge libcxx 4.0.1 hcfea43d_1 libcxxabi 4.0.1 hcfea43d_1 libedit 3.1.20181209 hb402a30_0 libevent 2.1.8 ha12b0ac_0 libffi 3.2.1 h475c297_4 libgfortran 4.0.0 2 conda-forge libiconv 1.15 hdd342a3_7 liblapack 3.8.0 12_openblas conda-forge libopenblas 0.3.7 h4bb4525_1 conda-forge libpng 1.6.37 h2573ce8_0 conda-forge libprotobuf 3.6.0 hd9629dc_0 libsodium 1.0.17 h01d97ff_0 conda-forge libtiff 4.0.10 hcb84e12_2 llvm-openmp 8.0.1 h770b8ee_0 conda-forge llvmlite 0.29.0 pypi_0 pypi locket 0.2.0 py_2 conda-forge lz4-c 1.8.3 h6de7cb9_1001 conda-forge markupsafe 1.1.1 py37h1de35cc_0 conda-forge marshmallow 3.0.2 pypi_0 pypi marshmallow-oneofschema 2.0.1 pypi_0 pypi mccabe 0.6.1 py37_1 mistune 0.8.4 pypi_0 pypi more-itertools 7.2.0 pypi_0 pypi msgpack-python 0.6.1 py37h04f5b5a_0 conda-forge mypy-extensions 0.4.1 pypi_0 pypi nbconvert 5.6.0 pypi_0 pypi nbformat 4.4.0 pypi_0 pypi ncurses 6.1 h0a44026_1 notebook 6.0.1 pypi_0 pypi numba 0.45.1 py37h86efe34_0 conda-forge numpy 1.17.1 py37h6b0580a_0 conda-forge olefile 0.46 py_0 conda-forge openssl 1.1.1c h01d97ff_0 conda-forge packaging 19.0 py_0 conda-forge pandas 0.25.1 py37h86efe34_0 conda-forge pandas-bokeh 0.3 pypi_0 pypi pandocfilters 1.4.2 pypi_0 pypi parso 0.5.1 py_0 conda-forge partd 1.0.0 py_0 conda-forge pendulum 2.0.5 pypi_0 pypi pexpect 4.7.0 py37_0 conda-forge pickleshare 0.7.5 py37_1000 conda-forge pillow 5.3.0 py37hbddbef0_1000 conda-forge pip 19.2.2 py37_0 pluggy 0.12.0 pypi_0 pypi prefect 0.6.3 pypi_0 pypi prometheus-client 0.7.1 pypi_0 pypi prompt_toolkit 2.0.9 py_0 conda-forge psutil 5.6.3 py37h01d97ff_0 conda-forge ptyprocess 0.6.0 py_1001 conda-forge py 1.8.0 pypi_0 pypi pyarrow 0.13.0 py37h0a44026_0 pygments 2.4.2 py_0 conda-forge pylint 2.3.1 py37_0 pyparsing 2.4.2 py_0 conda-forge pyrsistent 0.15.4 pypi_0 pypi pyscaffold 3.2.1 py_0 conda-forge pytest 5.1.2 pypi_0 pypi pytest-cov 2.7.1 pypi_0 pypi python 3.7.4 h359304d_1 python-dateutil 2.8.0 py_0 conda-forge python-slugify 3.0.3 pypi_0 pypi python-snappy 0.5.4 py37h1e06ddd_0 conda-forge pytz 2019.2 py_0 conda-forge pytzdata 2019.2 pypi_0 pypi pyyaml 5.1.2 py37h01d97ff_0 conda-forge pyzmq 18.0.2 py37hee98d25_2 conda-forge qtconsole 4.5.5 pypi_0 pypi re2 2019.08.01 h0a44026_0 readline 7.0 h1de35cc_5 requests 2.22.0 pypi_0 pypi send2trash 1.5.0 pypi_0 pypi setuptools 41.0.1 py37_0 simplejson 3.16.0 pypi_0 pypi simplekv 0.13.0 pypi_0 pypi six 1.12.0 py37_1000 conda-forge snappy 1.1.7 h6de7cb9_1002 conda-forge sortedcontainers 2.1.0 py_0 conda-forge sqlite 3.29.0 ha441bb4_0 storefact 0.9.0 pypi_0 pypi tabulate 0.8.3 pypi_0 pypi tblib 1.4.0 py_0 conda-forge terminado 0.8.2 pypi_0 pypi testpath 0.4.2 pypi_0 pypi text-unidecode 1.2 pypi_0 pypi thrift 0.11.0 py37h0a44026_1001 conda-forge thrift-cpp 0.11.0 hd79cdb6_3 tk 8.6.8 ha441bb4_0 toml 0.10.0 pypi_0 pypi toolz 0.10.0 py_0 conda-forge tornado 6.0.3 py37h01d97ff_0 conda-forge traitlets 4.3.2 py37_1000 conda-forge typing 3.7.4.1 pypi_0 pypi typing-extensions 3.7.4 pypi_0 pypi uritools 2.2.0 pypi_0 pypi urllib3 1.25.3 pypi_0 pypi wcwidth 0.1.7 py_1 conda-forge webencodings 0.5.1 pypi_0 pypi websocket-client 0.56.0 pypi_0 pypi wheel 0.33.4 py37_0 widgetsnbextension 3.5.1 pypi_0 pypi wrapt 1.11.2 py37h1de35cc_0 xlrd 1.2.0 pypi_0 pypi xz 5.2.4 h1de35cc_4 yaml 0.1.7 h1de35cc_1001 conda-forge zeromq 4.3.2 h6de7cb9_2 conda-forge zict 1.0.0 py_0 conda-forge zipp 0.6.0 pypi_0 pypi zlib 1.2.11 h1de35cc_3 zstandard 0.11.1 pypi_0 pypi zstd 1.3.7 h5bba6e5_0 ```
lr4d commented 5 years ago

Index information isn't supported by kartothek at the moment, so this behavior (i.e. index information being "lost" after read/write) should be expected.

I am not sure why the Parquet files do contain index information, maybe someone else can comment on that.

MichalChromcak commented 5 years ago

@lr4d Thank you for quick reply. I'll assume losing that information as expected behavior. Ideas what it would take to reconstruct it from parquets (assuming they have index stored as in above case)?

lr4d commented 5 years ago

The easiest thing would be to just convert the index to a column. Otherwise, you might want to take a look at https://github.com/JDASoftwareGroup/kartothek/blob/master/kartothek/serialization/_parquet.py#L86, and possibly add a flag to keep index information, but I'm not sure if that would work with the rest of the code.

crepererum commented 5 years ago

Let me add some details to the discussion:

fjetter commented 5 years ago

This is actually somewhat unrelated to the fact that we're not recovering the parquet indices since this issue is about (re-)constructing a dask index which is a bit more elaborate but I actually see the possibility of including this to kartothek.

What happens under the hood when you call set_index on a Dask Dataframe is that dask calculates all unique values (similar to our ktk indices) for the column and rearranges the data accordingly. There are some implicit guarantees on the partitions and divisions (not part of the public API but unlikely to change). The data is actually sorted by the index key s.t. the dataframe revisions (data interval boundaries of the partitions) are sorted and unique, e.g. see https://stackoverflow.com/questions/49905306/why-do-dask-divisions-need-to-be-unique.

Using this attribute it is, for example, possible to set/construct a dask index based on kartothek index information (code existed but was never merged since we didn't need it, yet). This would be a not so difficult first step but not exactly what is requested in this issue. But I would similarly expect that we can also reconstruct a previously existing index based on min/max stats and local (partition wise) index setting. This is, however, a bit more complicated.

As a (not well performing) substitute, I recommend to reset the index, store the Dask Dataframe and set the index after reading. This way, the partitioning information of the index still stays intact and I'd expect the second index setting is a bit faster than usual since the data is already arranged as it's supposed to be. I haven't tried this, though.