delta-io / delta-rs

A native Rust library for Delta Lake, with bindings into Python
https://delta-io.github.io/delta-rs/
Apache License 2.0
1.97k stars 365 forks source link

Inconsistent arrow timestamp type breaks datafusion query #2341

Open qinix opened 3 months ago

qinix commented 3 months ago

Environment

Delta-rs version: current main(https://github.com/delta-io/delta-rs/commit/abafd2d0cb8dde32ffa990dc30fb97a5581688ec)

Binding: rust, python


Bug

What happened:

Prior versions of this library serialize PrimitiveType::Timestamp as ArrowDataType::Timestamp(TimeUnit::Microsecond, None), while PR #2236 changed this behavior to serialize as ArrowDataType::Timestamp(TimeUnit::Microsecond, Some("UTC")), leads to different timestamp schema in different parquet file.

When querying delta lake with Arrow Datafusion and filtering by timestamp column, datafusion throws Invalid comparison operation: Timestamp(Microsecond, Some("UTC")) < Timestamp(Microsecond, None). After changing the filter type to Timestamp(Microsecond, Some("UTC")), datafusion throws Invalid comparison operation: Timestamp(Microsecond, None) < Timestamp(Microsecond, Some("UTC")). The left side of the comparison operation is a timestamp column in delta lake.

ion-elgreco commented 3 months ago

This actually had breaking effects since the prior behavior was incorrect. I suggest you rewrite your tables so that the parquet timestamp is properly encoded.

It's also strange that the parquet encoded type takes precedence over the arrow schema that's coming from delta shema. This doesn't seem right

ion-elgreco commented 3 months ago

@qinix I assume you have this issue only on tables written with older versions and then read with the latest main?

qinix commented 3 months ago

@qinix I assume you have this issue only on tables written with older versions and then read with the latest main?

Yes, it is

cmettler commented 3 months ago

i have a similar issue when trying to delete records via a UTC timestamp field. I was not able to create a UTC timestamp for the right side of the predicate:

pa = duckdb.sql("select now()::timestamptz at time zone 'CET' at time zone 'UTC' as ts" ).to_arrow_table() write_deltalake(target_table, pa,engine="rust",storage_options=storage_options,mode='overwrite') dt = DeltaTable(target_table,storage_options=storage_options) dt.delete("ts >= to_timestamp_micros('2024-03-27 00:00:00Z','%Y-%m-%d %H:%M:%S%#z')")

fails with: ValueError: Invalid comparison operation: Timestamp(Microsecond, Some("UTC")) >= Timestamp(Microsecond, None)

ravid08 commented 2 months ago

I am facing similar issue when trying to write to a delta table previously written to by a Glue PySpark job. I chose to use Glue for one time full load of the source data and then use Lambda with deltalake python package to load on-going streaming data. The Glue job loads the data and then Lambda fails with the following error: "Schema error: Fail to merge schema because the from data_type = Timestamp(Microsecond, Some(\"UTC\")) does not equal Timestamp(Nanosecond, None) I have tried various options like:

but unable to make these two schemas agree on a common format for timestamp!!

ion-elgreco commented 2 months ago

@ravid08 I am quite sure you wrote with spark without changing the default spark timestamp parquet type from int96 to timestamp_micros.

Try restoring the table prior to full load, then do load with spark again with timestamp_micros as the default timestamp parquet type

ravid08 commented 2 months ago

@ion-elgreco can you please clarify, I am using Glue 4.0 which supports Spark 3.3. From what I see pyspark.sql.functions.timestamp_micros is a new feature in Spark 3.5. Also, I am not using delta table, all I am doing is using write_deltalake from writer.py because the table will be created in an external hive metastore.

ion-elgreco commented 2 months ago

@ravid08 you mentioned you did a full load with spark, so i assume it wrote some parquet files. Those parquet files without timestam_micros setting will have timestamps in int96. At the moment the parquet crate interprets this as timestamp nanoseconds

ravid08 commented 2 months ago

@ion-elgreco The original value from the source is a string 2024-08-06T16:34:16.000574Z I did a full load of that data using spark to s3 in delta format: df = df.withColumn(col, fn.to_timestamp(fn.col(col))) I created a table in hive metastore on these delta files and I could see the datatype is timestamp, not int.

ion-elgreco commented 2 months ago

@ravid08 im meaning the logical type in the parquet file is represented as INT96 when you don't set this spark conf setting

ravid08 commented 2 months ago

@ion-elgreco OK. sounds like spark 3.5 is required, which doesn't exist in Glue (yet).

ravid08 commented 2 months ago

Thanks for the pointer @ion-elgreco the following config worked: SparkSession.config("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS")