delta-io / delta-rs

A native Rust library for Delta Lake, with bindings into Python
https://delta-io.github.io/delta-rs/
Apache License 2.0
2.36k stars 416 forks source link

Cannot merge when predicate contains Decimal comparision #3033

Open ponychicken opened 5 days ago

ponychicken commented 5 days ago

Environment

Delta-rs version: 0.22


Bug

Merging into a DeltaTable and comparing a Decimal in the predicate fails with

deltalake/table.py", line 1800, in execute
    metrics = self._table.merge_execute(self._builder)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
_internal.DeltaError: Generic DeltaTable error: Unable to convert expression to string

Reprocase:

from deltalake import DeltaTable, write_deltalake
import pandas as pd
import pyarrow as pa
from datetime import datetime
from decimal import Decimal
import random

DATABASE_NAME = ''.join(random.choices('0123456789', k=10))

data = {
    "timestamp": [datetime(2024, 3, 20, 12, 30, 0)],
    "altitude": [Decimal("150.5")],
}

# Create DataFrame
df = pd.DataFrame(data)

# Define schema using pyarrow
schema = pa.schema(
    [
        ("timestamp", pa.timestamp("us")),
        ("altitude", pa.decimal128(6, 1)),
    ]
)

# Create new Delta table
dt = DeltaTable.create(DATABASE_NAME, schema=schema)

# Initial write
write_deltalake(dt, df, mode="append")

# Read Delta table and display schema and content
dt_read = DeltaTable(DATABASE_NAME)
print("Schema:")
print(dt_read.schema())

# Convert to pandas DataFrame and display content
df_read = dt_read.to_pandas()
print("\nContent:")
print(df_read)

# SUCCEDS
dt.merge(
    source=df,
    predicate="target.timestamp = source.timestamp",
    source_alias="source",
    target_alias="target",
).when_matched_update_all().when_not_matched_insert_all().execute()

# FAILS Merge operation
dt.merge(
    source=df,
    predicate="target.timestamp = source.timestamp AND target.altitude = source.altitude",
    source_alias="source",
    target_alias="target",
).when_matched_update_all().when_not_matched_insert_all().execute()
ponychicken commented 4 days ago

Casting it before comparission works....

dt.merge(
    source=df,
    predicate="target.timestamp = source.timestamp AND CAST(target.altitude AS STRING) = CAST(source.altitude AS STRING)",
    source_alias="source",
    target_alias="target",
).when_matched_update_all().when_not_matched_insert_all().execute()