delta-io / delta-rs

A native Rust library for Delta Lake, with bindings into Python
https://delta-io.github.io/delta-rs/
Apache License 2.0
2.34k stars 414 forks source link

Concurrent merges not working >=0.19.0 #2980

Closed echai58 closed 1 day ago

echai58 commented 2 weeks ago

Environment

Delta-rs version: 0.19.0 and later

Binding: python


Bug

What happened: On 0.18.2, concurrent merges to different partitions worked (on some types of partition columns, at least - such as strings). On attempting to upgrade, it seems to be completely broken. The regression exists on all versions >= 0.19.0.

What you expected to happen: Concurrent merges to different partitions continues to work for the partition types that it used to work for.

How to reproduce it: The following code block succeeds on 0.18.2, but fails with

CommitFailedError: Failed to commit transaction: Error evaluating predicate: Generic DeltaTable error: Internal error: Failed to coerce types Date32 and Int64 in BETWEEN expression.
This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker

on 0.19.0 and later.

from deltalake import DeltaTable, write_deltalake
import pyarrow as pa
import datetime
import pandas as pd 

path = "test_dt"
partition_columns = ["string_key"]
schema = pa.schema(
    [
        pa.field("date_key", pa.date32()),
        pa.field("string_key", pa.string()),
        pa.field("int32_key", pa.int32()),
        pa.field("value", pa.float64()),
    ]
)

DeltaTable.create(
    table_uri=path,
    schema=schema,
    mode="error",
    partition_by=partition_columns,
)

# this simulates a concurrent write
delta_table_1 = DeltaTable(path)
delta_table_2 = DeltaTable(path)

delta_table_1.merge(
    pa.Table.from_pandas(
        pd.DataFrame(
            {
                "date_key": [datetime.date(2020, 1, 1)],
                "string_key": ["foo"],
                "int32_key": [1],
                "value": [2.0],
            }
        ),
        schema=schema,
    ),
    predicate="s.date_key = t.date_key AND s.string_key = t.string_key AND s.int32_key = t.int32_key",
    source_alias="s",
    target_alias="t",
).when_matched_update_all().when_not_matched_insert_all().execute()

delta_table_2.merge(
    pa.Table.from_pandas(
        pd.DataFrame(
            {
                "date_key": [datetime.date(2021, 1, 1)],
                "string_key": ["foo2"],
                "int32_key": [2],
                "value": [3.0],
            }
        )
    ),
    predicate="s.date_key = t.date_key AND s.string_key = t.string_key AND s.int32_key = t.int32_key",
    source_alias="s",
    target_alias="t",
).when_matched_update_all().when_not_matched_insert_all().execute()
JonasDev1 commented 3 days ago

I was able to reproduce the issue in Rust:

use std::fs;
use std::sync::Arc;
use datafusion::arrow::array::{Float64Array, Int32Array, StringArray, Date32Array};
use datafusion::arrow::compute::kernels::cast_utils::Parser;
use datafusion::arrow::datatypes::{DataType, Date32Type, Field, Schema};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::logical_expr::col;
use datafusion::prelude::SessionContext;
use deltalake::{DeltaOps};
use deltalake::kernel::StructType;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let tmp_dir = tempfile::tempdir().unwrap();
    let base_path = tmp_dir.path().to_str().to_owned().unwrap();
    fs::create_dir(tmp_dir.path().join("test_dt")).unwrap();
    let path = format!("{:}/{:}", base_path, "test_dt");
    let ctx = SessionContext::new();

    let partition_columns = vec!["string_key".to_string()];

    // Define the schema
    let schema = Arc::new(Schema::new(vec![
        Field::new("date_key", DataType::Date32, false),
        Field::new("string_key", DataType::Utf8, false),
        Field::new("int32_key", DataType::Int32, false),
        Field::new("value", DataType::Float64, false),
    ]));

    // Create Delta table
    let ops = DeltaOps::try_from_uri(&path).await.unwrap();
    let _res = ops.create()
        .with_partition_columns(partition_columns.clone())
        .with_columns(StructType::try_from(schema.clone()).unwrap().fields().cloned())
        .await.unwrap();

    // Simulate concurrent writes
    let table1 = DeltaOps::try_from_uri(&path).await.unwrap();
    let table2 = DeltaOps::try_from_uri(&path).await.unwrap();

    // First merge
    let batch1 = RecordBatch::try_new(
        schema.clone(),
        vec![
            Arc::new(Date32Array::from(vec![Date32Type::parse("2020-01-01").unwrap()])),
            Arc::new(StringArray::from(vec!["foo"])),
            Arc::new(Int32Array::from(vec![1])),
            Arc::new(Float64Array::from(vec![2.0])),
        ],
    )?;
    let _res = table1.merge(
        ctx.read_batch(batch1).unwrap(),
        col("target.date_key").eq(col("source.date_key"))
            .and(col("target.string_key").eq(col("source.string_key")))
            .and(col("target.int32_key").eq(col("source.int32_key")))
    )
        .with_source_alias("source")
        .with_target_alias("target")
        .when_matched_update(|update| {
            update
                .update("value", col("source.value"))
        }).unwrap()
        .when_not_matched_insert(|insert| {
            insert.set("date_key", col("source.date_key"))
                .set("string_key", col("source.string_key"))
                .set("int32_key", col("source.int32_key"))
                .set("value", col("source.value"))
        }).unwrap().await.unwrap();

    // Second merge
    let batch2 = RecordBatch::try_new(
        schema,
        vec![
            Arc::new(Date32Array::from(vec![Date32Type::parse("2021-01-01").unwrap()])),
            Arc::new(StringArray::from(vec!["foo2"])),
            Arc::new(Int32Array::from(vec![2])),
            Arc::new(Float64Array::from(vec![3.0])),
        ],
    )?;

    let _res = table2.merge(
        ctx.read_batch(batch2).unwrap(),
        col("target.date_key").eq(col("source.date_key"))
            .and(col("target.string_key").eq(col("source.string_key")))
            .and(col("target.int32_key").eq(col("source.int32_key")))
    )
        .with_source_alias("source")
        .with_target_alias("target")
        .when_matched_update(|update| {
            update
                .update("value", col("source.value"))
        }).unwrap()
        .when_not_matched_insert(|insert| {
            insert.set("date_key", col("source.date_key"))
                .set("string_key", col("source.string_key"))
                .set("int32_key", col("source.int32_key"))
                .set("value", col("source.value"))
        }).unwrap().await.unwrap();

    Ok(())
}
JonasDev1 commented 3 days ago

It seems that the issue is in the conflict checker.

The predicate from the first merge is date_key BETWEEN 2020-01-01 AND 2020-01-01 AND string_key = 'foo' AND int32_key BETWEEN 1 AND 1

JonasDev1 commented 2 days ago

The problem is that the parse_predicate_expression method parses the date wrongly:

From "date_key BETWEEN 2021-01-01 AND 2021-01-01 AND string_key = 'foo2' AND int32_key BETWEEN 2 AND 2" Output ... BinaryExpr(**BinaryExpr { left: BinaryExpr(BinaryExpr { left: Literal(Int64(2021)), op: Minus, right: Literal(Int64(1)) }) ...

echai58 commented 1 day ago

@JonasDev1 thanks for taking a look - does that PR only fix it for dates? I am seeing the regression for other types as well, e.g. just a string partition column

JonasDev1 commented 1 day ago

My reproduced rust code works fine with the changes and the code also contains merges over string and int keys

echai58 commented 1 day ago

My reproduced rust code works fine with the changes and the code also contains merges over string and int keys

ah right, i misremembered the error message - it's particularly about the inability to merge around date. thanks!