JDASoftwareGroup / kartothek

A consistent table management library in python
https://kartothek.readthedocs.io/en/stable
MIT License
161 stars 53 forks source link

`update_dataset_from_ddf` corrupts datasets if table names diverge on create -> update #452

Open stephan-hesselmann-by opened 3 years ago

stephan-hesselmann-by commented 3 years ago

This is a follow up issue to #445 and #451. Currently datasets can still be corrupted if the table name diverges between the creation and update step.

Snippet:

import dask
import pandas as pd
import dask.dataframe as dd
from datetime import date
from storefact import get_store_from_url
from kartothek.io.dask.dataframe import update_dataset_from_ddf, read_dataset_as_ddf

dask.config.set(scheduler="synchronous")
store_url = f"hfs://testdata"
store = get_store_from_url(store_url)
dataset_uuid = "testdata"

def create():
    df = pd.DataFrame(
        {"date": [date(2021, 1, x) for x in range(1, 6)], "value": range(5)}
    )
    ddf = dask.dataframe.from_pandas(df, npartitions=1)
    delayed = update_dataset_from_ddf(
        ddf, store, dataset_uuid, table="predictions", partition_on=["date"]
    )
    res = delayed.compute()

def update():
    df = pd.DataFrame(
        {"date": [date(2021, 1, x) for x in range(6, 11)], "value": range(5)}
    )
    ddf = dask.dataframe.from_pandas(df, npartitions=1)
    delayed = update_dataset_from_ddf(ddf, store, dataset_uuid, partition_on=["date"])
    res = delayed.compute()

def validate():
    ddf = read_dataset_as_ddf(dataset_uuid, store, "predictions")
    df = ddf.compute()
    print(df)

if __name__ == "__main__":
    create()
    update()
    validate()

The Kartothek<4 behavior of this code is to raise a TypeError:

TypeError: Unexpected table in dataset:
Found:  ['predictions']
Expected:   table

Files:

├──  testdata
│  ├──  testdata
│  │  └──  predictions
│  │     ├──  _common_metadata
│  │     ├──  date=2021-01-01
│  │     │  └──  03d8f26b0d22400e9637eca61cc12fb2.parquet
│  │     ├──  date=2021-01-02
│  │     │  └──  03d8f26b0d22400e9637eca61cc12fb2.parquet
│  │     ├──  date=2021-01-03
│  │     │  └──  03d8f26b0d22400e9637eca61cc12fb2.parquet
│  │     ├──  date=2021-01-04
│  │     │  └──  03d8f26b0d22400e9637eca61cc12fb2.parquet
│  │     └──  date=2021-01-05
│  │        └──  03d8f26b0d22400e9637eca61cc12fb2.parquet
│  └──  testdata.by-dataset-metadata.json

The Kartothek>=4 behavior of this code is to corrupt the dataset:

Traceback (most recent call last):
  File "test_kartothek_create_update.py", line 46, in <module>
    validate()
  File "test_kartothek_create_update.py", line 35, in validate
    ddf = read_dataset_as_ddf(dataset_uuid, store, "predictions")
  File "<decorator-gen-7>", line 2, in read_dataset_as_ddf
  File "/Users/lgtf/git/kartothek-fork/kartothek/io_components/utils.py", line 277, in normalize_args
    return _wrapper(*args, **kwargs)
  File "/Users/lgtf/git/kartothek-fork/kartothek/io_components/utils.py", line 275, in _wrapper
    return function(*args, **kwargs)
  File "/Users/lgtf/git/kartothek-fork/kartothek/io/dask/dataframe.py", line 113, in read_dataset_as_ddf
    delayed_partitions = read_dataset_as_delayed(
  File "/Users/lgtf/git/kartothek-fork/kartothek/io/dask/delayed.py", line 239, in read_dataset_as_delayed
    mps = read_dataset_as_delayed_metapartitions(
  File "<decorator-gen-5>", line 2, in read_dataset_as_delayed_metapartitions
  File "/Users/lgtf/git/kartothek-fork/kartothek/io_components/utils.py", line 277, in normalize_args
    return _wrapper(*args, **kwargs)
  File "/Users/lgtf/git/kartothek-fork/kartothek/io_components/utils.py", line 275, in _wrapper
    return function(*args, **kwargs)
  File "/Users/lgtf/git/kartothek-fork/kartothek/io/dask/delayed.py", line 217, in read_dataset_as_delayed_metapartitions
    return list(mps)
  File "/Users/lgtf/git/kartothek-fork/kartothek/io_components/read.py", line 102, in dispatch_metapartitions_from_factory
    yield MetaPartition.from_partition(
  File "/Users/lgtf/git/kartothek-fork/kartothek/io_components/metapartition.py", line 426, in from_partition
    file=partition.files[table_name],
KeyError: 'predictions'

Files:

├──  testdata
│  ├──  testdata
│  │  ├──  predictions
│  │  │  ├──  _common_metadata
│  │  │  ├──  date=2021-01-01
│  │  │  │  └──  f9ba700a1c8b40ff875772654652fe2e.parquet
│  │  │  ├──  date=2021-01-02
│  │  │  │  └──  f9ba700a1c8b40ff875772654652fe2e.parquet
│  │  │  ├──  date=2021-01-03
│  │  │  │  └──  f9ba700a1c8b40ff875772654652fe2e.parquet
│  │  │  ├──  date=2021-01-04
│  │  │  │  └──  f9ba700a1c8b40ff875772654652fe2e.parquet
│  │  │  └──  date=2021-01-05
│  │  │     └──  f9ba700a1c8b40ff875772654652fe2e.parquet
│  │  └──  table
│  │     ├──  date=2021-01-06
│  │     │  └──  5a826fc386704a78ad40cac49039bb8b.parquet
│  │     ├──  date=2021-01-07
│  │     │  └──  5a826fc386704a78ad40cac49039bb8b.parquet
│  │     ├──  date=2021-01-08
│  │     │  └──  5a826fc386704a78ad40cac49039bb8b.parquet
│  │     ├──  date=2021-01-09
│  │     │  └──  5a826fc386704a78ad40cac49039bb8b.parquet
│  │     └──  date=2021-01-10
│  │        └──  5a826fc386704a78ad40cac49039bb8b.parquet
│  └──  testdata.by-dataset-metadata.json

Expected Behavior: Based on these changelog entries for Kartothek 4.0 I would expect Kartothek to infer the correct table name if left out.

All read pipelines will now automatically infer the table to read such that it is no longer necessary to provide table or table_name as an input argument
All writing pipelines which previously supported a complex user input type now expose an argument table_name which can be used to continue usage of legacy datasets (i.e. datasets with an intrinsic, non-trivial table name). This usage is discouraged and we recommend users to migrate to a default table name (i.e. leave it None / table)

However replicating the Kartothek<4 behavior would also be acceptable to me.