delta-io / delta-rs

A native Rust library for Delta Lake, with bindings into Python
https://delta-io.github.io/delta-rs/
Apache License 2.0
2.32k stars 407 forks source link

Table merges fail when CDF is enabled due to column mismatches #2908

Closed ldacey closed 1 month ago

ldacey commented 1 month ago

Environment

Delta-rs version: 0.20.0

Binding: Python


Bug

What happened:

Enabling CDF results in _internal.DeltaError: Generic DeltaTable error: Error during planning: UNION

What you expected to happen:

How to reproduce it:

from datetime import date, datetime

import polars as pl
from deltalake import DeltaTable
from deltalake.table import TableMerger
from pyarrow import Table

def generate_sample_data():
    data = {
        "year": [2023] * 5,
        "month_id": [202309] * 5,
        "date_id": [20230901, 20230902, 20230903, 20230904, 20230905],
        "date": [
            date(2023, 9, 1),
            date(2023, 9, 2),
            date(2023, 9, 3),
            date(2023, 9, 4),
            date(2023, 9, 5),
        ],
        "row_hash": ["example1", "example2", "example3", "example4", "example5"],
        "interaction_datetime": [datetime(2023, 9, i + 1, 10, 0) for i in range(5)],
        "primary_user_id": [f"USER00{i+1}" for i in range(5)],
    }

    df = pl.DataFrame(data)
    return df

df = generate_sample_data()
table = df.to_arrow()

print(table.schema)

DeltaTable.create(
    table_uri="union_error",
    schema=table.schema,
    mode="overwrite",
    partition_by=["month_id"],
    storage_options=None,
    name="union_error",
    description="Union Error",
    custom_metadata={},
    configuration={
        "delta.dataSkippingStatsColumns": "year,month_id,date_id,row_hash",
        "delta.checkpoint.writeStatsAsStruct": "true",
        "delta.checkpointInterval": "5",
        "delta.deletedFileRetentionDuration": "interval 30 days",
        "delta.enableChangeDataFeed": "true",
        "delta.appendOnly": "false",
    },
)
dt = DeltaTable("union_error")

print(dt.metadata().configuration)

def insert_unique_rows(
    source_pyarrow_table: Table,
    target_delta_table: DeltaTable,
) -> TableMerger:
    """Merges the source PyArrow Table with the target DeltaTable."""
    predicates = {"merge": "s.row_hash = t.row_hash"}

    merger: TableMerger = target_delta_table.merge(
        source=source_pyarrow_table,
        predicate=predicates["merge"],
        source_alias="s",
        target_alias="t",
    ).when_not_matched_insert_all()

    return merger

insert_unique_rows(source_pyarrow_table=table, target_delta_table=dt).execute()

dt = DeltaTable("union_error")
print(dt.to_pandas())

More details:

Turn off the delta.enableChangeDataFeed configuration and then the merge is successful for some reason.

VillePuuska commented 1 month ago

A more minimal example of the issue:

import polars as pl
from deltalake import DeltaTable

df = pl.DataFrame(
    {
        "id": [1, 2],
        "date": [1, 2],
    },
    schema={
        # setting data types to be equal fixes the error, i.e. int & int or date & date
        "id": pl.Int64,
        "date": pl.Date,
    },
)
table = df.to_arrow()

dt = DeltaTable.create(
    table_uri="union_error",
    schema=table.schema,
    mode="overwrite",
    partition_by=["id"],  # taking out partitioning fixes the error
    configuration={
        "delta.enableChangeDataFeed": "true",  # false fixes the error
    },
)

dt.merge(
    source=table,
    predicate="s.id = t.id",
    source_alias="s",
    target_alias="t",
).when_not_matched_insert_all().execute()

Running this gives the error

Traceback (most recent call last):
  File "/workspaces/codespaces-blank/setup.py", line 31, in <module>
    ).when_not_matched_insert_all().execute()
                                    ^^^^^^^^^
  File "/usr/local/python/3.12.1/lib/python3.12/site-packages/deltalake/table.py", line 1793, in execute
    metrics = self._table.merge_execute(self._builder)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
_internal.DeltaError: Generic DeltaTable error: Error during planning: UNION Column id (type: Int64) is not compatible with column date (type: Date32)

Modifications to the script that allow it to run without error:

The script also works fine if you have two pl.String columns, or one pl.String and one pl.Int64. But if you have pl.String and pl.Date columns, you then get the following kind of error:

Traceback (most recent call last):
  File "/workspaces/codespaces-blank/setup.py", line 31, in <module>
    ).when_not_matched_insert_all().execute()
                                    ^^^^^^^^^
  File "/usr/local/python/3.12.1/lib/python3.12/site-packages/deltalake/table.py", line 1793, in execute
    metrics = self._table.merge_execute(self._builder)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Exception: Cast error: Cannot cast string 'a' to value of Date32 type

So the issue seems to be some weird mix of Date, partitioning, and CDF. 🤔

VillePuuska commented 1 month ago

Looks like this is the same issue as #2832