delta-io / delta-rs

A native Rust library for Delta Lake, with bindings into Python
https://delta-io.github.io/delta-rs/
Apache License 2.0
1.98k stars 364 forks source link

We cannot append data to existing Delta Lake tables if the schema of data to write includes timestamp columns with timezone. #1777

Closed sugibuchi closed 4 months ago

sugibuchi commented 8 months ago

Environment

Delta-rs version:

Binding:

Environment:


Bug

We cannot append data to existing Delta Lake tables if the schema of data to write includes timestamp columns with timezone.

What happened: The first write succeeds. But subsequent append writes fail.

What you expected to happen:

We can append data including timestamp columns with timezone in its schema.

How to reproduce it:

from datetime import datetime, timezone

import pyarrow as pa
from deltalake import write_deltalake

schema_with_tz = pa.schema([pa.field("ts", pa.timestamp(unit="us", tz=timezone.utc))]) 

table_with_tz1 = pa.table([[datetime(2023,1,1,1,1,1,tzinfo=timezone.utc)]], schema_with_tz)
table_with_tz2 = pa.table([[datetime(2023,2,2,2,2,2,tzinfo=timezone.utc)]], schema_with_tz)

write_deltalake("with_tz.delta", table_with_tz1, mode="append") # Success
write_deltalake("with_tz.delta", table_with_tz2, mode="append") # Fail!

pa.timestamp(unit="us", tz=timezone.utc) looks compliant with the timestamp data type in Delta Lake.

Microsecond precision timestamp elapsed since the Unix epoch, 1970-01-01 00:00:00 UTC. https://github.com/delta-io/delta/blob/master/PROTOCOL.md#primitive-types

But the second write_deltalake(..., mode="append") fails with the following error.

File /opt/conda/lib/python3.10/site-packages/deltalake/writer.py:185, in write_deltalake(table_or_uri, data, schema, partition_by, filesystem, mode, file_options, max_partitions, max_open_files, max_rows_per_file, min_rows_per_group, max_rows_per_group, name, description, configuration, overwrite_schema, storage_options, partition_filters, large_dtypes)
    181 if table:  # already exists
    182     if schema != table.schema().to_pyarrow(as_large_types=large_dtypes) and not (
    183         mode == "overwrite" and overwrite_schema
    184     ):
--> 185         raise ValueError(
    186             "Schema of data does not match table schema\n"
    187             f"Data schema:\n{schema}\nTable Schema:\n{table.schema().to_pyarrow(as_large_types=large_dtypes)}"
    188         )
    190     if mode == "error":
    191         raise AssertionError("DeltaTable already exists.")

ValueError: Schema of data does not match table schema
Data schema:
ts: timestamp[us, tz=UTC]
Table Schema:
ts: timestamp[us]

More details:

One of the possible workarounds is removing timezone from timestamp column definitions.

schema_without_tz = pa.schema([pa.field("ts", pa.timestamp(unit="us", tz=None))])  # <-- tz=None to ignore timezone

table_without_tz1 = pa.table([[datetime(2023,1,1,1,1,1,tzinfo=timezone.utc)]], schema_without_tz)
table_without_tz2 = pa.table([[datetime(2023,2,2,2,2,2,tzinfo=timezone.utc)]], schema_without_tz)

write_deltalake("without_tz.delta", table_without_tz1, mode="append")
write_deltalake("without_tz.delta", table_without_tz2, mode="append")

However, we are strongly concerned with this workaround because this workaround removes timestamp info from statistics in transaction logs.

cat with_tz.delta/_delta_log/00000000000000000000.json | grep stats | jq -r .add.stats | jq
{
  "numRecords": 1,
  "minValues": {
    "ts": "2023-01-01T01:01:01+00:00"
  },
  "maxValues": {
    "ts": "2023-01-01T01:01:01+00:00"
  },
  "nullCount": {
    "ts": 0
  }
}

cat without_tz.delta/_delta_log/00000000000000000000.json | grep stats | jq -r .add.stats | jq
{
  "numRecords": 1,
  "minValues": {
    "ts": "2023-01-01T01:01:01"
  },
  "maxValues": {
    "ts": "2023-01-01T01:01:01"
  },
  "nullCount": {
    "ts": 0
  }
}

We are currently investigating an inconsistent behaviour of Spark Delta Lake with one of our Delta Lake tables. Since this table is written using this workaround, and this inconsistency happens only when we set a timezone except for UTC to Spark session, we are guessing statistics without timezone information in transaction logs are the root cause of this inconsistency.

ion-elgreco commented 8 months ago

Same issue: https://github.com/delta-io/delta-rs/issues/1598

Kaffeegangster commented 8 months ago

i have experienced the same issue now trying to read a delta-rs generated table via Sql Server 2022 Polybase. Polybase Delta expects a timestamp with tz to be readable.

ion-elgreco commented 8 months ago

i have experienced the same issue now trying to read a delta-rs generated table via Sql Server 2022 Polybase. Polybase Delta expects a timestamp with tz to be readable.

Can Polybase not read non-tz timestamps?

Kaffeegangster commented 8 months ago

No, doesn't work. I have also checked the other way around . A sql server CTAS using a datetime2 is always written as timestamp[us] with UTC tz