delta-io / delta-rs

A native Rust library for Delta Lake, with bindings into Python
https://delta-io.github.io/delta-rs/
Apache License 2.0
1.99k stars 362 forks source link

Cannot merge to a table with a timestamp column after upgrading delta-rs #2478

Open echai58 opened 2 months ago

echai58 commented 2 months ago

Environment

Delta-rs version: 0.15.3 => 0.17.2

Binding: python


Bug

What happened: I have a lot of tables created via delta-rs from an older version, around 0.15.3. I'm looking to upgrade, but I'm running into breaking errors, I think from the 0.16.0 breaking change to how timestamps work.

I have tables that have a timestamp column, and due to older versions of delta-rs, they are written without a timezone. When I upgraded to 0.17.2, and I read in the table, the schema says timestamp[us, tz=UTC] - okay, that's reasonable, it's attaching a UTC timezone.

But, when I try to write to the table via a merge and the predicate compares the timestamp columns, it fails with

DeltaError: Generic DeltaTable error: External error: Arrow error: Invalid argument error: Invalid comparison operation: Timestamp(Microsecond, Some("UTC")) == Timestamp(Microsecond, None)

My theory is that delta-rs is interpreting the schema of the table as having a UTC timezone, whereas the physical data actually reflects the table having no timezone. Thus, when it casts the source table I pass in, it applies the UTC timezone, which Arrow then cannot compare against the physical table, which has no timezone.

What you expected to happen: Is there any way to upgrade delta-rs such that this doesn't break? Would be really unfortunate to have to rewrite all of these tables because of this breaking change.

How to reproduce it: Run this with an older (< 0.16) delta-rs:

t = pa.Table.from_pandas(pd.DataFrame({"p": [1],"a": [pd.Timestamp("2022-01-01")], "b": [1], }), schema=pa.schema([("p", pa.int64()), ("a", pa.timestamp("us", tz="UTC")), ("b", pa.int64())]))

write_deltalake(
    "test",
    t,
    mode="overwrite",
    partition_by=["p"],
)

Then, run this with a new delta-rs:

new_data = pa.Table.from_pandas(pd.DataFrame({"p": [1],"a": [pd.Timestamp("2022-01-01")], "b": [4]}), schema=pa.schema([("p", pa.int64()), ("a", pa.timestamp("us", tz="UTC")), ("b", pa.int64())]))

dt.merge(
    source=new_data,
    predicate="s.p = t.p and s.a = t.a",
    source_alias="s",
    target_alias="t",
).when_matched_update_all().when_not_matched_insert_all().execute()
echai58 commented 2 months ago

@ion-elgreco would be interested to hear your thoughts on this, seems related to this https://github.com/delta-io/delta-rs/issues/2341

ion-elgreco commented 2 months ago

@echai58 it was quite a breaking change.

Old tables were never written correctly.

Ideally we cast the table scan to use the delta schema, but we have never added this. You could look into that or just rewrite the tables

echai58 commented 2 months ago

@echai58 it was quite a breaking change.

Old tables were never written correctly.

Ideally we cast the table scan to use the delta schema, but we have never added this. You could look into that or just rewrite the tables

@ion-elgreco Okay, it would be really difficult for us to rewrite all of our tables. Can you provide some pointers to code locations that we'd need to change to get the table scan to use the delta schema? I can look into that.

ion-elgreco commented 2 months ago

@echai58 it was quite a breaking change.

Old tables were never written correctly.

Ideally we cast the table scan to use the delta schema, but we have never added this. You could look into that or just rewrite the tables

@ion-elgreco Okay, it would be really difficult for us to rewrite all of our tables. Can you provide some pointers to code locations that we'd need to change to get the table scan to use the delta schema? I can look into that.

Somewhere in the scan builder I guess, and then on the parquet scan