delta-io / delta-rs

A native Rust library for Delta Lake, with bindings into Python
https://delta-io.github.io/delta-rs/
Apache License 2.0
1.97k stars 365 forks source link

Python: Automatically convert Pandas types to valid Delta Lake types in write_deltalake() #686

Closed wjones127 closed 7 months ago

wjones127 commented 1 year ago

Description

Many Pandas types aren't automatically converted into valid Delta Lake types when converted into Arrow tables. For example, Pandas Timestamps are converted into timestamps with nanosecond precision by default, but Delta Lake only supports microsecond precision. This makes write_deltalake() difficult to use for Pandas users.

We should write a test that validates all Pandas types can be written with write_deltalake() without manual conversion.

I'm not sure yet how to configure the conversion here:

https://github.com/delta-io/delta-rs/blob/431d0ea47700bb3b388a5679ce8d906fa84bb1c6/python/deltalake/writer.py#L128-L129

It's possible that we can pass in an adjusted schema to the schema parameter of pyarrow.Table.from_pandas() and that will make the correct conversion.

Use Case

Related Issue(s)

Based on #685

blaze225 commented 1 year ago

Would appreciate if this can be prioritized. Right now this is forcing us to use spark over delta-rs.

ion-elgreco commented 11 months ago

This also happens when you write delta from Polars with columns with nano precision datetime. However it's slightly more easy to circumvent you just have to do the casting first to micro precision.

ion-elgreco commented 9 months ago

Would appreciate if this can be prioritized. Right now this is forcing us to use spark over delta-rs.

@blaze225 You can also switch to polars, which casts the dtypes correctly to a delta compatible schema: https://github.com/pola-rs/polars/pull/10165/files#diff-843e4fa7334b1cfcdf4ebe039377c0d724d0abb51bcde68c9aaae1b93868e20b

thehappycheese commented 8 months ago

I made this as a stopgap solution. Its a dumb solution but it helped me actually get it to write and test out the library.

import deltalake as dl
from deltalake import DeltaTable
from typing import Union

def strip_categorical(df:pd.DataFrame):
    """convert categorical columns back into integer types,
    and return a dataframe of the categories

    Example:

    ```python
    (original_df, categories) = strip_categorical(df)
    ```"""
    categories = {}
    df=df.copy()
    for col in df.columns:
        if pd.api.types.is_categorical_dtype(df[col]):
            print(f"Converting categorical column to integer: '{col}' - {dict(enumerate(df[col].cat.categories))}")
            categories[col] = df[col].cat.categories
            df[col] = df[col].cat.codes
    return df, pd.DataFrame(categories)

def strip_duration_to_int(df:pd.DataFrame, to_int_unit:Union[str,dict[str,str]]="ms"):
    """convert Timedelta columns to integer types with the given unit
    to_int_unit should be a string or a dictionary of column names to units

    Example:

    ```python
    df, time_delta_cols = strip_duration_to_int(df, to_int_unit="ms")
    ```"""
    df=df.copy()
    time_delta_cols = {}
    for col in df.columns:
        if pd.api.types.is_timedelta64_dtype(df[col].dtype):
            col_to_int_unit = to_int_unit
            if isinstance(to_int_unit, dict):
                col_to_int_unit = to_int_unit[col]
            print(f"Converting Timedelta column to integer using units '{col_to_int_unit}': '{col}'")
            time_delta_cols[col] = col_to_int_unit
            df[col] = df[col] // pd.Timedelta(1, unit=col_to_int_unit)
    return df, time_delta_cols

def write_delta(path, data, timedelta_to_int_unit:Union[str,dict[str,str]]="ms", **kwargs):
    data, categories = strip_categorical(data)
    data, time_delta_cols = strip_duration_to_int(data, timedelta_to_int_unit)
    dl.write_deltalake(path, data, **kwargs)
    if len(categories) > 0:
        dl.write_deltalake(path+"_categories", categories,**kwargs)
    if len(time_delta_cols) > 0:
        dl.write_deltalake(path+"_time_delta_cols", time_delta_cols,**kwargs)
kangshung commented 8 months ago

Are there any plans to implement this?

ion-elgreco commented 8 months ago

Are there any plans to implement this?

You can use polars.io.delta import _convert_pa_schema_to_delta

kangshung commented 8 months ago

Are there any plans to implement this?

You can use polars.io.delta import _convert_pa_schema_to_delta

What about the _check_for_unsupported_types() method that lists Categorical as an unsupported type? Why would it work without polars if it doesn't with polars?

ion-elgreco commented 8 months ago

Are there any plans to implement this?

You can use polars.io.delta import _convert_pa_schema_to_delta

What about the _check_for_unsupported_types() method that lists Categorical as an unsupported type? Why would it work without polars if it doesn't with polars?

I don't see any categorical primitive types in here: https://github.com/delta-io/delta/blob/master/PROTOCOL.md#primitive-types

kangshung commented 8 months ago

Are there any plans to implement this?

You can use polars.io.delta import _convert_pa_schema_to_delta

What about the _check_for_unsupported_types() method that lists Categorical as an unsupported type? Why would it work without polars if it doesn't with polars?

I don't see any categorical primitive types in here: https://github.com/delta-io/delta/blob/master/PROTOCOL.md#primitive-types

And that's the issue. Delta returns deltalake.PyDeltaTableError: Schema error: Invalid data type for Delta Lake: Dictionary(Int8, Utf8) for Categorical fields.

Here you have a method that raises an exception on Categorical fields in polars: https://github.com/pola-rs/polars/blob/main/py-polars/polars/io/delta.py#L323-L329

ion-elgreco commented 8 months ago

Are there any plans to implement this?

You can use polars.io.delta import _convert_pa_schema_to_delta

What about the _check_for_unsupported_types() method that lists Categorical as an unsupported type? Why would it work without polars if it doesn't with polars?

I don't see any categorical primitive types in here: https://github.com/delta-io/delta/blob/master/PROTOCOL.md#primitive-types

And that's the issue. Delta returns deltalake.PyDeltaTableError: Schema error: Invalid data type for Delta Lake: Dictionary(Int8, Utf8) for Categorical fields.

Here you have a method that raises an exception on Categorical fields in polars: https://github.com/pola-rs/polars/blob/main/py-polars/polars/io/delta.py#L323-L329

I see, we could possibly port these things from Polars into delta-rs, I'll check with the polars contributors. Not super familiar with licenses and all