delta-io / delta-rs

A native Rust library for Delta Lake, with bindings into Python
https://delta-io.github.io/delta-rs/
Apache License 2.0
1.97k stars 365 forks source link

Converting DeltaTable to Pandas causes timestamp error #2593

Open Josh-Hiz opened 2 weeks ago

Josh-Hiz commented 2 weeks ago

Environment

Delta-rs version: Most Recent Available


Bug

What happened: Requesting data from an Azure storage Gen 2 DeltaTable and converting to pandas ultimately fails telling me:

pyarrow.lib.ArrowInvalid: Casting from timestamp[ns] to timestamp[us, tz=UTC] would lose data: -4852202631933722624

Converting it to a pyarrow dataset is successful, however trying to convert to a table or pandas dataframe fails due to this time stamp error in which isnt clear on firstly finding the location that causes it and secondly no clear way in fixing it. If my data is massive I have no idea what is causing it. Is there any clear ways I can fix this myself or delta-rs can simple introduce a "safe" option (similar to parquets to_pandas function where you can ignore this)?

ion-elgreco commented 2 weeks ago

Does to_table() work?

Josh-Hiz commented 2 weeks ago

Does to_table() work?

It fails, providing the same exact error.

ion-elgreco commented 2 weeks ago

Does to_table() work?

It fails, providing the same exact error.

Did you create this table with spark-delta or delta-rs, if so which versions did you use before? And were you ever able to read this table?

For some reason your parquet files have nanosecond timestamps

Josh-Hiz commented 2 weeks ago

Does to_table() work?

It fails, providing the same exact error.

Did you create this table with spark-delta or delta-rs, if so which versions did you use before? And were you ever able to read this table?

I've only used delta-rs purely, my version of delta is 0.18.1

Dekermanjian commented 2 weeks ago

Hey, I am running into the same issue trying to convert to polars. Is there any work around to this?

ion-elgreco commented 2 weeks ago

If a minimum reproducible example can be created then I can look into it when I find some time

Dekermanjian commented 2 weeks ago

Okay, thank you @ion-elgreco. I will try to come up with some copy pasteable example.

Dekermanjian commented 2 weeks ago

Hey @ion-elgreco

I am running into a very similar problem as I try to create a reproducible example. However, this time it is when writing the data out to a delta lake. See below for a copy pasteable example:

# Imports
from deltalake import DeltaTable, write_deltalake
import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.dataset as ds
import pandas as pd
import polars as pl
import numpy as np

# Create nanosecond data
df = pd.DataFrame(
    {
        'id': np.arange(10),
        'datetime': pd.date_range(start="2017-01-01", periods=10, freq="10ns", unit="ns", tz='UTC')
    }
)

# Convert to pyarrow
df_pa = pa.Table.from_pandas(df)

# This works
pq.write_table(df_pa, where='./data/test.parquet')

# This fails
parquet_format = ds.ParquetFileFormat()
write_options = parquet_format.make_write_options(use_deprecated_int96_timestamps = True, coerce_timestamps = 'us', allow_truncated_timestamps = True)
write_deltalake("tmp/some-table", df_pa, file_options=write_options)

I am not sure if the file_options argument is being used when trying using the write_deltalake() method. The error is the exact same error as when reading the data. So maybe if we can figure out the nature of the error we can fix both cases.

ion-elgreco commented 2 weeks ago

So delta protocol only supports timestamps with microsecond precision, so we downcast. However we might need to allow pyarrow to truncate those timestamps, I saw it's possible to pass cast options.

You can try it out yourself, grab the functions from deltalake.schema there is convert_table_to_recordbatchreader, and see if changing cast works

Josh-Hiz commented 2 weeks ago

Additionally to add to this, Datasets with out of bounds dates causes the same issue, for example something in the year 9999, I believe Deltalake-rs actually fixed these issues but I am not sure why it is still occurring, another feature suggestion would be to allow users to replace invalid timestamp with a "min" or "max" value that would more or less force the issue to be solved.

Dekermanjian commented 2 weeks ago

Okay, is this what you mean @ion-elgreco :

# Imports
from deltalake import DeltaTable, write_deltalake
import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.dataset as ds
import pandas as pd
import polars as pl
import numpy as np

# Create nanosecond data
df = pd.DataFrame(
    {
        'id': np.arange(10),
        'datetime': pd.date_range(start="2017-01-01", periods=10, freq="10ns", unit="ns", tz='UTC')
    }
)

# Convert to pyarrow
df_pa = pa.Table.from_pandas(df)

# This works
pq.write_table(df_pa, where='./data/test.parquet')

# Define schema
schema = pa.schema([
    ("id", pa.int32()),
    ("datetime", pa.timestamp('us', tz = 'UTC'))
])

# Downcast
df_pa_cast = df_pa.cast(target_schema=schema, safe=False)

# This now works
write_deltalake("tmp/some-table", df_pa_cast)

Now I can write the table after downcasting, while explicitly telling pyarrow safe=False

Josh-Hiz commented 1 week ago

@ion-elgreco I tried out this solution to just replace invalid timestamps:

        dl = DeltaTable(...)

        min_time = pd.Timestamp("2020-01-01")
        max_time = pd.Timestamp("2025-01-01")
        schema = dl.schema().to_pyarrow()
        cols = []
        for field in schema:
            if isinstance(field.type, pa.TimestampType):
                cols.append(field.name)

        for col in cols:
            print(col)
            predicate_min = f"{col} < '{str(min_time)}'"
            new_values = {col: str(min_time)}
            dl.update(predicate=predicate_min, new_values=new_values)
            print("finish1")
            predicate_max = f"{col} > '{str(max_time)}'"
            new_values = {col: str(max_time)}
            dl.update(predicate=predicate_max, new_values=new_values)
            print("finish2")

        return dl.to_pandas()

However it is astronomically slow, this takes about 30 min with 200k rows which should not be the case at all whatsoever, is there a way to optimize this or a well known way to just replace bad values? For context, I want to replace bad timestamps as pyarrow has numerous issues with it.

p3i0t commented 1 week ago

@ion-elgreco A small example to reproduce the map error from pyarrow datatype to deltalake PrimitiveType, hope this would help.

    import deltalake
    import pyarrow as pa
    print("deltalake version: ", deltalake.__version__)
    print("pyarrow version: ", pa.__version__)

    t1 = pa.timestamp('us')
    print(deltalake.schema.PrimitiveType.from_pyarrow(t1))
    t2 = pa.timestamp('ns')
    print(deltalake.schema.PrimitiveType.from_pyarrow(t2))

The results:

deltalake version:  0.18.1
pyarrow version:  15.0.2
PrimitiveType("timestamp_ntz")
Traceback (most recent call last):
  File "/Users/wangxin/codes/seer/examples/futures_data_to_delta.py", line 69, in <module>
    print(deltalake.schema.PrimitiveType.from_pyarrow(t2))
Exception: Schema error: Invalid data type for Delta Lake: Timestamp(Nanosecond, None)
Dekermanjian commented 6 days ago

Okay, @ion-elgreco I think I have what you were asking me for previously. Below you can see how I modified the function convert_pyarrow_table() in deltalake.schema to allow for time variables to be truncated:

# Imports
from typing import Optional
from deltalake.schema import _convert_pa_schema_to_delta
import pyarrow as pa
import pyarrow.compute as pc
import pandas as pd
import numpy as np

# Create nanosecond data
df = pd.DataFrame(
    {
        'id': np.arange(10),
        'datetime': pd.date_range(start="2017-01-01", periods=10, freq="10ns", unit="ns", tz='UTC')
    }
)

# Convert to pyarrow
df_pa = pa.Table.from_pandas(df)

# Modified function from deltalake.schema
def convert_pyarrow_table(data: pa.Table, large_dtypes: bool, options: Optional[dict] = None) -> pa.RecordBatchReader:
    """Converts a PyArrow table to a PyArrow RecordBatchReader with a compatible delta schema"""
    schema = _convert_pa_schema_to_delta(data.schema, large_dtypes=large_dtypes)
    if options:
        for i, (n, t) in enumerate(zip(schema.names, schema.types)):
            data = data.set_column(i, n, pa.compute.cast(data[n], options=pc.CastOptions(target_type=t, **options)))
        data = data.to_reader()
    else:
        data = data.cast(schema).to_reader()
    return data

# Will FAIL due to nanosecond timestamp
convert_pyarrow_table(data=df_pa, large_dtypes=False)

# Will be SUCCESSFUL due to CastOptions allowing time truncation
convert_pyarrow_table(data=df_pa, large_dtypes=False, options={"allow_time_truncate": True})

Please let me know if this is still not what you had in mind.