delta-io / delta

An open-source storage framework that enables building a Lakehouse architecture with compute engines including Spark, PrestoDB, Flink, Trino, and Hive and APIs
https://delta.io
Apache License 2.0
7.44k stars 1.67k forks source link

[Feature Request] Add concurrency support to whenNotMatchedBySource filters #2170

Open sor-droneup opened 11 months ago

sor-droneup commented 11 months ago

Feature request

Which Delta project/connector is this regarding?

Overview

Introduce support for ensuring partition disjoint in the conditions provided to the "whenNotMatchedBySource" group of operations.

In the context of merge operations belonging to the "whenNotMatchedBySource" family, any conditions that would normally guarantee partition disjoint are currently ignored, resulting in a concurrency error being raised.

To address this, it is possible to include a partition disjoint statement within the join condition (e.g., "class=1"), which enables concurrent updates across multiple partitions.

All operations within the "WhenMatched" and "WhenNotMatched" groups are now designed to ensure concurrency safety. However, if you incorporate "whenNotMatchedBySourceDelete," Spark will throw a ConcurrentAppendException.

Motivation

Adding concurrency disjont support to whenNotMatchedBy will increase the utility for delta lake tables.

Further details

An example scenario: 1) my source is non-delta file (eg. json/parquet) 2) This file sources one of the partitions within my delta table 3) I want to merge into specified partition in my table. Within this partition i want to run a query that upserts the data and removes rows that not exist any more within source (basically refresh). To do that I have currently 2 options:

The code snipped that simulates this behaviour:

import tempfile
import threading
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('test').getOrCreate()

SCHEMA = StructType(
    [
        StructField("uuid", StringType(), nullable=False),
        StructField("column_1", StringType(), nullable=False),
        StructField("column_2", StringType(), nullable=False),
        StructField("product_name", StringType(), nullable=False),
        StructField("category_id", IntegerType(), nullable=False),
        StructField("checksum", StringType(), nullable=False),
    ]
)

def merge_df(df, category_id, target_delta_table):
    join_condition = f"target.category_id = '{category_id}' AND target.checksum == source.checksum"

    delete_condition = f"target.category_id = '{category_id}"

    update_condition = "target.uuid <> source.uuid"
    update_set = {"column_1": "source.column_1", "column_2": "source.column_2"}

    merge_statement = (
        target_delta_table.alias("target")
        .merge(df.alias("source"), join_condition)
        .whenNotMatchedInsertAll()
        .whenNotMatchedBySourceDelete(condition=delete_condition)
        .whenMatchedUpdate(condition=update_condition, set=update_set)
    )
    merge_statement.execute()

data = [
    ("001", "100", "1", "Product A", 1, "123456789"),
    ("002", "200", "1", "Product B", 2, "987654321"),
    ("003", "300", "1", "Product C", 1, "246813579"),
]
initial_df = spark.createDataFrame(data, schema)

temp_dir = tempfile.mkdtemp()

initial_df.write.format("delta").option(
            "delta.enableChangeDataFeed", True
        ).partitionBy('category_id').save(temp_dir)

target_table = DeltaTable.forPath(spark, temp_dir)

new_data_1 = [("005", "500", "5", "Product G", 1, "aaabbbccc")]
category_id_1 = 1
df_1 = spark.createDataFrame(new_data_1, schema)

new_data_2 = [("007", "700", "7", "Product O", 2, "111555")]
category_id_2 = 2
df_2 = spark.createDataFrame(new_data_2, schema)

threads = []
t1 = threading.Thread(target=merge_df, args=(df_1, category_id_1, target_table, schema))
t1.start()
threads.append(t1)

t2=threading.Thread(target=merge_df, args=(df_2, category_id_2, target_table, schema))
t2.start()
threads.append(t2)

for thread in threads:
    thread.join()

In example above although the condition category was specified that would make this operation partition scoped, it is ignored since whenNotMatchedBySource is considered as non-partition-scoped.

Willingness to contribute

The Delta Lake Community encourages new feature contributions. Would you or another member of your organization be willing to contribute an implementation of this feature?

sambathrajv commented 10 months ago

hi, We are also facing the same issue. Is there any workaround / patch can be made available?

vkorukanti commented 10 months ago

@johanl-db Could you please take a look at this issue?

johanl-db commented 10 months ago

The following may work: Given:

MERGE INTO target
USING source
ON merge_condition
WHEN NOT MATCHED BY SOURCE AND <not_matched_by_source_condition_1> THEN ...
WHEN NOT MATCHED BY SOURCE AND <not_matched_by_source_condition_2> THEN ...

If merge_condition and all not_matched_by_source_condition_N share the same target-only predicate (typically a partition filter), then we can apply that predicate when filtering target files in findTouchedFiles.

One way of doing it would be to extract the target-only predicates from the merge condition and each NOT MATCHED Y SOURCE conditions using splitConjunctivePredicates and OR them together to only filter files that match all of them.

The implementation should be short but will require careful testing to ensure:

  1. The conflict resolution works as expected with the new predicate generated to filter the target scan
  2. The additional filtering produces correct results. Esp. important here to consider interactions with other type of clauses (MATCHED, NOT MATCHED) and their conditions. I think it should work as I described but we need to think harder about what can go wrong and add tests.

I have limited bandwidth to handle this at the moment unfortunately but happy to provide support and review for external contributions