delta-io / delta

An open-source storage framework that enables building a Lakehouse architecture with compute engines including Spark, PrestoDB, Flink, Trino, and Hive and APIs
https://delta.io
Apache License 2.0
7.62k stars 1.71k forks source link

[BUG] [PySpark] Delta Table Schema Evolves Even When MERGE Condition Fails #3706

Open caldempsey opened 2 months ago

caldempsey commented 2 months ago

Bug

Which Delta project/connector is this regarding?

Describe the problem

When the condition to MERGE a delta table fails, the schema of that Delta Table still gets updated.

Steps to reproduce

Create a new Delta Table. Perform an upsert over an evolving schema as follows:

 # Perform the merge operation
        delta_table.alias("current").merge(
            labels.alias("new"), "current.file_id = new.file_id"
        ).whenMatchedUpdate(
            condition="current.ingest_id < new.ingest_id",
            set={
                "ingest_id": "new.ingest_id",
                # Add new column updates here, including any evolving schema columns
                **{
                    col: f"new.{col}" for col in labels.columns if col not in ["file_id", "ingest_id"] # Dynamic upsert 
                },
            },
        ).whenNotMatchedInsert(
            values={
                "file_id": "new.file_id",
                "ingest_id": "new.ingest_id",
                **{
                    col: f"new.{col}" for col in labels.columns if col not in ["file_id", "ingest_id"] # Dynamic upsert 
                },
            }
        ).execute()

Run a simple test like this:

def test_no_update_when_ingest_id_is_not_greater(spark_session, delta_table, valid_data_with_column_a):
    """
    Given a file_id initially inserted with column_a
    When upserting the same file_id with a lower or equal ingest_id and new column_b (so failing the Delta Table condition)
    Then the row should not be updated, and column_b should not be added.
    """
    # Insert row with column_a
    df_column_a = spark_session.createDataFrame(valid_data_with_column_a)
    delta_table.upsert_labels(df_column_a)

    # Create a new row with the same file_id and lower/equal ingest_id, but with column_b
    updated_data = [
        Row(
            file_id=valid_data_with_column_a[0].file_id,
            ingest_id=valid_data_with_column_a[0].ingest_id,  # Same ingest_id
            column_b="value_b"
        )
    ]
    df_updated = spark_session.createDataFrame(updated_data)

    # Upsert the updated data (should not update due to same ingest_id)
    delta_table.upsert_labels(df_updated)

    # Validate the row was not updated, and column_b was not added
    result_df = delta_table.toDF()
    row = result_df.filter(result_df.file_id == updated_data[0].file_id).first()
    result_df.show(truncate=False)
    assert row.column_a == "value_a"  # Column_a should remain the same
    assert "column_b" not in row.asDict()  # Column_b should not be added

Observed results

Delta Lake extends the schema even though the condition to update the table fails (which is intended to be sortable to demonstrate this is a potential issue, I used ksuids):

image

Expected results

When whenNotMatchedInsert is not triggered I would expect column_b not to be inserted into the table. I believe in the condition analysis step the schema is being evolved.

Environment information

Willingness to contribute

The Delta Lake Community encourages bug fix contributions. Would you or another member of your organization be willing to contribute a fix for this bug to the Delta Lake code base?