delta-io / delta

An open-source storage framework that enables building a Lakehouse architecture with compute engines including Spark, PrestoDB, Flink, Trino, and Hive and APIs
https://delta.io
Apache License 2.0
7.45k stars 1.67k forks source link

[Feature Request] data deduplication on existing delta table #1767

Open mrtnstk opened 1 year ago

mrtnstk commented 1 year ago

Feature request

Overview

In some cases, we would prefer to do data deduplication on a regular basis rather than when upserting. A single command for this purpose would be quite useful.

Motivation

Several approaches are proposed online with regards to data de-duplication on delta tables. Some target deduplication when inserting, others propose complex merge operations for existing tables.

Further details

Propose to implement a command in the form

deltaTable.dropDuplicates(cols, where, keep="first")

such that rows with duplicates on the columns given by the cols parameter are dropped when the given conditions specified by the where parameter. The conditions should be a filter, on partition columns or other.

tdas commented 1 year ago

The proposed API is a little ambiguous ... how is "first" defined.

mrtnstk commented 1 year ago

The proposed API is a little ambiguous ... how is "first" defined.

Agree with your comment @tdas, the API needs to be better defined.

A simpler form would be to drop duplicates on all columns, deltaTable.dropDuplicates(where)

Defining which row to keep only makes sense when when dropping duplicates based on a subset of columns (as specified by the proposed "cols" parameter). Usage of "first" would require some kind of order within each group of duplicates, which would need to be further defined.

mrtnstk commented 1 year ago

Attempt at a second iteration: deltaTable.dropDuplicates(*cols =None, condition: Union[pyspark.sql.column.Column, str, None] = None, select: Optional[Dict[str, Union[str, pyspark.sql.column.Column]]] = None)

Parameters cols: optional (str or list name of columns) – the columns to screen for duplicates. Search on all columns if None condition: (str or pyspark.sql.Column) optional condition on where to search for duplicates. Example "partitionCol==2" select: optional (dict with str or pyspark.sql.Column as keys and str as values) Define rule for which row to keep when setting the cols parameter. Example {"numericCol":"max"} would select the max value of numericCol within each group of duplicates.

allisonport-db commented 1 year ago

https://github.com/MrPowers/mack has something like this implemented @MrPowers

mrtnstk commented 1 year ago

@allisonport-db , @MrPowers, the drop_duplicate function in mack comes with the following warning:

Warning: This method is overwriting the whole table, thus very inefficient. If you can, use the drop_duplicates_pkey method instead.

A function that drop duplicates without overwriting the whole table would be the target

nimblefox commented 8 months ago

+1 on this feature behalf my team

VictorDonjuan commented 2 months ago

Any updates on this?

FlavioDiasPs commented 1 month ago

Well, +1 here It is not cool to do deduplication on multiple terabyte tables that are receiving data from streaming sources