delta-io / delta-rs

A native Rust library for Delta Lake, with bindings into Python
https://delta-io.github.io/delta-rs/
Apache License 2.0
2.25k stars 401 forks source link

When_matched_update causes records to be lost with explicit predicate #2158

Closed sebdiem closed 6 months ago

sebdiem commented 8 months ago

Environment

Delta-rs version: 0.15.0

Binding: python

Environment:


Bug

What happened:

I have a table with several columns, among them instance_id and cost. I want to update rows for which cost is null based on data contained in a pandas dataframe. For this purpose I use a merge operation like this:

    delta_table.merge(
        pa.Table.from_pandas(costs),
        predicate="t.instance_id = s.instance_id and t.cost is null",
        source_alias="s",
        target_alias="t",
    ).when_matched_update({"cost": "s.cost"}).execute()

In 0.14.0 it worked perfectly fine and rows for which cost was already set stayed untouched while other were updated. In 0.15.0 however the behavior is different: the rows with cost set were deleted, the other were updated.

What you expected to happen: Same behavior as in 0.14.0

How to reproduce it: Here is a minimal python script with the following dependencies:

pandas==1.4.4
pyarrow==15.0.0
deltalake==0.15.0
import os
import shutil

import pandas as pd
import pyarrow as pa
from deltalake import DeltaTable, write_deltalake
from deltalake.schema import (
    Field,
    PrimitiveType,
    Schema,
)

# first create a table with the given schema
path = "/tmp/delta-rs-bug-3"
shutil.rmtree(path)
os.makedirs(path)
schema = Schema(
    [
        Field("instance_id", PrimitiveType("string"), nullable=False),
        Field("month", PrimitiveType("string"), nullable=False),
        Field("cost", PrimitiveType("float"), nullable=True),
    ]
)
write_deltalake(
    path,
    data=[],
    schema=schema,
    name="test_table",
    partition_by=["month"],
    overwrite_schema=False,
)
table = DeltaTable(path)

# then fill this table with some entries
initial_df = pd.DataFrame(
    [
        {"instance_id": "1", "month": "2024-02", "cost": 32.0},
        {"instance_id": "2", "month": "2024-02", "cost": None},
    ]
)
table.merge(
    pa.Table.from_pandas(initial_df),
    predicate="t.instance_id = s.instance_id",
    source_alias="s",
    target_alias="t",
).when_not_matched_insert_all().execute()

# now try to merge new data into this table: boum
new_df = pd.DataFrame(
    [
        {"instance_id": "1", "month": "2024-02", "cost": 38.0},
        {"instance_id": "2", "month": "2024-02", "cost": 38.0},
    ]
)
table.merge(
    pa.Table.from_pandas(new_df),
    predicate="t.instance_id = s.instance_id and t.cost is null",
    source_alias="s",
    target_alias="t",
).when_matched_update({"cost": "s.cost"}).execute()
print(table.to_pandas())
# delta lake 0.14.0
# >>> print(table.to_pandas())
#   instance_id    month  cost
# 0           2  2024-02  38.0
# 1           1  2024-02  32.0
# ===================================
# delta lake 0.15.0
# >>> print(table.to_pandas())
#   instance_id    month  cost
# 0           2  2024-02  38.0

More details:

Blajda commented 8 months ago

Hi @sebdiem This regression was fixed in this pr: #2149

sebdiem commented 8 months ago

thanks a lot!

sebdiem commented 8 months ago

hmm actually I've just tested the newly released 0.15.2 and the bug described here is still present IMO.

ion-elgreco commented 8 months ago

Yup, can confirm.

Edit: Actually I am not sure if the current behaviour is incorrect because it only matches on 1 row from the target, and you passed only when_matched_update().

But passing thet.cost is null predicate inside the when_matched_update gives both rows where the null got updated now.

table.merge(
    pa.Table.from_pandas(new_df),
    predicate="t.instance_id = s.instance_id",
    source_alias="s",
    target_alias="t",
).when_matched_update({"cost": "s.cost"}, predicate = "t.cost is null").execute()
ion-elgreco commented 8 months ago

@sebdiem do you have some time to cross check the behavior with spark delta?

sebdiem commented 8 months ago

@sebdiem do you have some time to cross check the behavior with spark delta?

yes will try to do that tomorrow

Blajda commented 8 months ago

The correct behavior is from 0.14. Staring At this I think the root cause is that predicate pushdown was enabled which pushes a filter of t.cost is null into the scan. This will be a simple fix.

Blajda commented 8 months ago

Oddly I can't reproduce the issue when using rust

    async fn test_merge_pushdowns() {
        //See #2158
        let schema = vec![
            StructField::new(
                "id".to_string(),
                DataType::Primitive(PrimitiveType::String),
                true,
            ),
            StructField::new(
                "cost".to_string(),
                DataType::Primitive(PrimitiveType::Float),
                true,
            ),
            StructField::new(
                "month".to_string(),
                DataType::Primitive(PrimitiveType::String),
                true,
            ),
        ];

        let arrow_schema = Arc::new(ArrowSchema::new(vec![
            Field::new("id", ArrowDataType::Utf8, true),
            Field::new("cost", ArrowDataType::Float32, true),
            Field::new("month", ArrowDataType::Utf8, true),
        ]));

        let table = DeltaOps::new_in_memory()
            .create()
            .with_columns(schema)
            .await
            .unwrap();

        let ctx = SessionContext::new();
        let batch = RecordBatch::try_new(
            Arc::clone(&arrow_schema.clone()),
            vec![
                Arc::new(arrow::array::StringArray::from(vec!["A", "B"])),
                Arc::new(arrow::array::Float32Array::from(vec![Some(10.15), None])),
                Arc::new(arrow::array::StringArray::from(vec![
                    "2023-07-04",
                    "2023-07-04",
                ])),
            ],
        )
        .unwrap();

        let table = DeltaOps(table)
            .write(vec![batch.clone()])
            .with_save_mode(SaveMode::Append)
            .await
            .unwrap();
        assert_eq!(table.version(), 1);
        assert_eq!(table.get_files_count(), 1);

        let batch = RecordBatch::try_new(
            Arc::clone(&arrow_schema.clone()),
            vec![
                Arc::new(arrow::array::StringArray::from(vec!["A", "B"])),
                Arc::new(arrow::array::Float32Array::from(vec![Some(12.15), Some(11.15)])),
                Arc::new(arrow::array::StringArray::from(vec![
                    "2023-07-04",
                    "2023-07-04",
                ])),
            ],
        ).unwrap();
        let source = ctx.read_batch(batch).unwrap();

        let (table, _metrics) = DeltaOps(table)
            .merge(source, "target.id = source.id and target.cost is null")
            .with_source_alias("source")
            .with_target_alias("target")
            .when_matched_update(|insert| {
                insert
                    .update("id", "target.id")
                    .update("cost", "source.cost")
                    .update("month", "target.month")
            })
            .unwrap()
            .await
            .unwrap();

        let expected = vec![
    "+----+-------+------------+",
    "| id | cost  | month      |",
    "+----+-------+------------+",
    "| A  | 10.15 | 2023-07-04 |",
    "| B  | 11.15 | 2023-07-04 |",
    "+----+-------+------------+",
        ];
        let actual = get_data(&table).await;
        assert_batches_sorted_eq!(&expected, &actual);
    }
ion-elgreco commented 7 months ago

@Blajda weirdly enough using this predicate works: "t.instance_id = s.instance_id, t.cost is null" and gives the old correct behavior. Not sure if that helps narrowing it down

ion-elgreco commented 6 months ago

@Blajda I ran your rust test but it doesn't pass on my side:


running 1 test
thread 'operations::merge::tests::test_merge_pushdowns' panicked at crates/core/src/operations/merge/mod.rs:2966:9:
assertion `left == right` failed: 

expected:

[
    "+----+-------+------------+",
    "| id | cost  | month      |",
    "+----+-------+------------+",
    "| A  | 10.15 | 2023-07-04 |",
    "| B  | 11.15 | 2023-07-04 |",
    "+----+-------+------------+",
]
actual:

[
    "+----+-------+------------+",
    "| id | cost  | month      |",
    "+----+-------+------------+",
    "| B  | 11.15 | 2023-07-04 |",
    "+----+-------+------------+",
]