kedro-org / kedro-plugins

First-party plugins maintained by the Kedro team.
Apache License 2.0
84 stars 76 forks source link

Pandas DataFrame index not preserved with pandas.DeltaTableDataset #431

Open KrzysztofDoboszInpost opened 7 months ago

KrzysztofDoboszInpost commented 7 months ago

Description

Pandas DataFrame saved and loaded using pandas.DeltaTableDataset differs from the original DataFrame - it has additional column __index_level_0__. This is an Index saved as a column, but it's not interpreted as such.

I tried to read the stored file manually:

Since the dataset is based on deltalake, maybe it will suffice to add this line to _load()?

delta_table = delta_table.set_index("__index_level_0__")

Context

How has this bug affected you? What were you trying to accomplish? I'm trying to use pandas.DeltaTableDataset locally and databricks.ManagedTableDataset in the pipeline deployed to Databricks.

Steps to Reproduce

  1. Create two nodes sharing a dataset of type pandas.DeltaTableDataset. Pass pd.DataFrame.
  2. In the second node see the additional column.

Expected Result

The DataFrame after save&load should be identical as the one before the operations.

Actual Result

The DataFrame after save&load has additional column __index_level_0__ and the index is default RangeIndex.

Your Environment

kedro==0.18.14 kedro-datasets==1.8.0 deltalake==0.13.0 pyarrow==14.0.1

datajoely commented 7 months ago

So we use the Deltalake Python library which doesn't seem to have an explicitly retaining this.

We typically don't like to modify the underlying API and retain 1:1 relationship with Kedro and load_args / save_args this feels quite sensible. Even if this is easy to remedy in the node or by subclassing the dataset. I'd be interested to see what the other maintainers think.

KrzysztofDoboszInpost commented 7 months ago

Actually, there might be more than one index level and at some point deltalake might start handling pandas_metadata, so:

index_cols = delta_table.columns[delta_table.columns.str.match(r"__index_level_+\d__")].values.tolist()
if index_cols:
    delta_table = delta_table.set_index(index_cols)
datajoely commented 7 months ago

The fundamental point is that Indices are a Pandas concept, Spark and more recently Polars decided not to mirror this concept. There is an argument that the current behaviour is justified when using the Delta format. Again, very keen to see what others think here.

KrzysztofDoboszInpost commented 3 months ago

I changed my mind. I needed a dataset that I'd use as a local counterpart of databricks.ManagedTableDataset (or actually something derived from this one), and that dataset ignores pandas index completely.

To get consistent behaviour between the datasets, let's drop index on saving the data:

data = pa.Table.from_pandas(data, preserve_index=False)

This line at the start of _save would do the magic, and also fix #610.