delta-io / delta-rs

A native Rust library for Delta Lake, with bindings into Python
https://delta-io.github.io/delta-rs/
Apache License 2.0
2.19k stars 394 forks source link

Error with MERGE for partitioned table having `delta.enableChangeDataFeed` set to true and non-primitive columns #2832

Open ximonsson opened 1 month ago

ximonsson commented 1 month ago

Environment

Delta-rs version: 0.19.1

Binding: python

Environment:


Bug

What happened:

This error seems to have been introduced in 0.19. An error is thrown when trying to merge into a table that is partitioned and has the delta.enableChangeDataFeed configuration set to true, and if any of the columns have a non-primitive type, e.g. timestamp or array.

What you expected to happen:

Would be niced if there was no error I guess.

How to reproduce it:

import deltalake
import duckdb

# create dataset

duckdb.sql(
    "CREATE TABLE cowabunga (magic INTEGER, name VARCHAR, bday TIMESTAMP, weapons VARCHAR[])"
)
duckdb.sql(
    """
    INSERT INTO cowabunga VALUES
        (1, 'Leonardo', TIMESTAMP '2024-01-01 00:00:00', ['ninjato']),
        (1, 'Donatello', TIMESTAMP '2024-01-01 01:00:00', ['longstick']),
        (2, 'Michelangelo', TIMESTAMP '2024-01-01 02:00:00', ['nunchucks']),
        (2, 'Raphael', TIMESTAMP '2024-01-01 03:00:00', ['forks - whatever they are called']),
"""
)

# write table for first time

print("create table")

data = duckdb.sql("SELECT * FROM cowabunga LIMIT 2").fetch_arrow_reader()
deltalake.write_deltalake(
    "/tmp/tmnt",
    data,
    mode="overwrite",
    partition_by=["magic"],
    configuration={"delta.enableChangeDataFeed": "true"},
)

# merge rest of data

print("merge rest of data")

data = duckdb.table("cowabunga").fetch_arrow_reader()
(
    deltalake.DeltaTable("/tmp/tmnt")
    .merge(source=data, target_alias="t", source_alias="s", predicate="s.name = t.name")
    .when_matched_update_all()
    .when_not_matched_insert_all()
    .execute()
)

This produces the following error:

_internal.DeltaError: Generic DeltaTable error: Error during planning: UNION Column bday (type: Timestamp(Microsecond, None)) is not compatible with column weapons (type: List(Field { name: "item", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }))

Removing either partition_by in the first write, the delta.enableChangeDataFeed or the columns bday and weapons fixes it.

More details:

I tried using version 0.18 and there it works.

rtyler commented 3 weeks ago

This looks like a bug in our change data feed handling. Prior to 0.19.0 the Rust crate did not support CDF on the merge operation, which will be triggered by the enableChangeDataFeed table property.

:disappointed:

sherlockbeard commented 3 weeks ago

This looks like a bug in our change data feed handling

https://github.com/delta-io/delta-rs/blob/d3a796709a4bc9ee8a0fdc4ea16f8c607c0daf19/crates/core/src/operations/merge/mod.rs#L1497

it's dying here but this function is straight forward except logic 😿

VillePuuska commented 1 day ago

If the partition column is the last column this issue doesn't seem to pop up. With some debug printing it looks like the schemas of preimage and postimage have the partition column in a different spot here https://github.com/delta-io/delta-rs/blob/2498837ff6a2c3525058f1a9fd1301ba50fecbba/crates/core/src/operations/cdc.rs#L49

For example, adding

println!("{}", preimage.schema());
println!("{}", postimage.schema());

before this line and running the above Python script prints

fields:[t.name, t.bday, t.weapons, t.magic, _change_type], metadata:{}
fields:[magic, name, bday, weapons, _change_type], metadata:{}

before the error.

Not sure why the union would even go through with ints and strings getting mixed, but not if there are timestamps or arrays in the mix 🤔 Ofc I could just be completely off base and this column moving isn't even related to the issue.