apache / arrow

Apache Arrow is a multi-language toolbox for accelerated data interchange and in-memory processing
https://arrow.apache.org/
Apache License 2.0
14.27k stars 3.47k forks source link

[Python] Possibility of a table.drop_duplicates() function? #30950

Open asfimport opened 2 years ago

asfimport commented 2 years ago

I noticed that there is a group_by() and sort_by() function in the 7.0.0 branch. Is it possible to include a drop_duplicates() function as well?

id updated_at
1 2022-01-01 04:23:57
2 2022-01-01 07:19:21
2 2022-01-10 22:14:01

Something like this which would return a table without the second row in the example above would be great.

I usually am reading an append-only dataset and then I need to report on latest version of each row. To drop duplicates, I am temporarily converting the append-only table to a pandas DataFrame, and then I convert it back to a table and save a separate "latest-version" dataset.

table.sort_by(sorting=[("id", "ascending"), ("updated_at", "ascending")]).drop_duplicates(subset=["id"] keep="last")

Reporter: Lance Dacey / @ldacey

Note: This issue was originally created as ARROW-15474. Please see the migration documentation for further details.

asfimport commented 2 years ago

Weston Pace / @westonpace: We're pretty close in 7.0.0 but "first" and "last" are tricky concepts for the execution engine at the moment (it generally processes items on a first-come first-serve basis and so order isn't really maintained).

Are you ok with "min" or "max"?


import pyarrow as pa
tab = pa.Table.from_pydict({'x': [1, 2, 2, 3], 'y': ['a', 'b', 'c', 'd']})
>>> tab.group_by("x").aggregate([("y", "max")])
pyarrow.Table
y_max: string
x: int64
----
y_max: [["a","c","d"]]
x: [[1,2,3]]
>>> tab.group_by("x").aggregate([("y", "min")])
pyarrow.Table
y_min: string
x: int64
----
y_min: [["a","b","d"]]
x: [[1,2,3]]
asfimport commented 2 years ago

Weston Pace / @westonpace: Actually, I guess this would be tricky if you have multiple columns because it would return the min/max of each particular column independently. So you might get a row that doesn't even really exist in the source data.

asfimport commented 2 years ago

Lance Dacey / @ldacey: I would personally be okay with only having the first row retained since I could just sort the table before dropping duplicates to get the desired results.

Is it possible to get the first or nth values from a table groupby? In pandas, we can do this which I think has the desired behavior even with multiple columns (as long as we sort the data first). If we can get the indices of which rows to keep, then we could use table.take() to return a new table with the latest values.


df = pd.DataFrame(
    {
        "id": [1, 1, 1, 2, 2, 2],
        "name": ["a", "a", "a", "b", "c", "c"],
        "updated_at": [
            "2021-01-01 00:02:19",
            "2022-01-04 12:13:10",
            "2022-01-06 04:10:52",
            "2022-01-02 17:32:21",
            "2022-01-06 01:27:14",
            "2022-01-06 23:09:56",
        ],
    }
)

df.sort_values(["id", "name", "updated_at"], ascending=[1, 1, 0]).groupby(["id", "name"]).nth(0).reset_index()
asfimport commented 2 years ago

Weston Pace / @westonpace: I think we are close (still probably need to add a dedicated exec node for this) to being able to do:


drop_duplicates(subset=["id"] keep="random")

I think we are further away from either:


drop_duplicates(subset=["id"] keep="first")
drop_duplicates(subset=["id"] keep="last")

The problem is that the execution engine processes data in batches and those batches are processed in parallel. We can resequence the batches, but only at the very end of a plan. Otherwise they just get out of order again immediately after we resequence them.

This ability to run parts of a plan in sequence is something we need for a number of reasons (e.g. window functions) and so it is something we will likely have at some point.

asfimport commented 2 years ago

Lance Dacey / @ldacey: Ahh, that would be great. Random is a bit risky for my use case since I generally care about the latest version.

I found this repository which has a method to drop duplicates that I might be able to adopt in the meantime. I would need to digest exactly what is happening down below a bit more, but I think there are some compute functions like pc.sort_indices, pc.unique, etc that could probably be used to replace some of the numpy code.


def drop_duplicates(table, on=[], keep='first'):
    # Gather columns to arr
    arr = columns_to_array(table, (on if on else table.column_names))

    # Groupify
    dic, counts, sort_idxs, bgn_idxs = groupify_array(arr)

    # Gather idxs
    if keep == 'last':
        idxs = (np.array(bgn_idxs) - 1)[1:].tolist() + [len(sort_idxs) - 1]
    elif keep == 'first':
        idxs = bgn_idxs
    elif keep == 'drop':
        idxs = [i for i, c in zip(bgn_idxs, counts) if c == 1]
    return table.take(sort_idxs[idxs])

def groupify_array(arr):
    # Input: Pyarrow/Numpy array
    # Output:
    #   - 1. Unique values
    #   - 2. Sort index
    #   - 3. Count per unique
    #   - 4. Begin index per unique
    dic, counts = np.unique(arr, return_counts=True)
    sort_idx = np.argsort(arr)
    return dic, counts, sort_idx, [0] + np.cumsum(counts)[:-1].tolist()

def combine_column(table, name):
    return table.column(name).combine_chunks()

f = np.vectorize(hash)
def columns_to_array(table, columns):
    columns = ([columns] if isinstance(columns, str) else list(set(columns)))
    if len(columns) == 1:
        return f(combine_column(table, columns[0]).to_numpy(zero_copy_only=False))
    else:
        values = [c.to_numpy() for c in table.select(columns).itercolumns()]
        return np.array(list(map(hash, zip(*values))))

 

asfimport commented 2 years ago

Lance Dacey / @ldacey: I'll keep this open since this is a major wish list item for me. If anyone has some sample functions they have implemented outside of core pyarrow to achieve this then I would be interested in seeing that as well.

asfimport commented 1 year ago

Jacek Pliszka / @JacekPliszka: Lance - the code you have posted might not be very efficient - something like below should be faster:

  1. add column with sequential number - index

import numpy as np
import pyarrow.compute as pc
t1=t.append_column('i', pa.array(np.arange(len(t))))
  1. Find first row indices

t2 = t1.group_by(['keys', 'values']).aggregate([('i', 'min')]).column('i_min')


3. select rows with first row indices:
```python

pc.take(t, t2)

On my PC your code is 1.19s while code above is 0.15s. to_pandas.drop_duplicates was around 0.36s

asfimport commented 1 year ago

Lance Dacey / @ldacey: Nice - I will give that a shot, thanks. I have been using a library called polars to drop duplicates from a pyarrow table lately, but it would be nice to have a native-pyarrow way to do it.

Can we sort the data before adding the cumulative_sum? My concern is that the order of the raw data might be messed up and we might select the wrong row to keep.

asfimport commented 1 year ago

Jacek Pliszka / @JacekPliszka: I added one more speedup - definitely you can sort it at the beginning.

asfimport commented 1 year ago

Lance Dacey / @ldacey: Nice, I was able to test it out and seemed to get the correct results. I have been using polars and duckdb to handle de-duplication for a while now so I used that as a comparison.


%%time

table = con.execute("select distinct on (forecast_group) * from scanner order by session_id, date").arrow()

CPU times: user 735 ms, sys: 45.7 ms, total: 780 ms
Wall time: 1.92 s

Your suggestion:


%%time 

table = scanner.to_table()

t1 = table.append_column('i', pa.array(np.arange(len(table))))
t2 = t1.group_by(['forecast_group']).aggregate([('i', 'min')]).column('i_min')
table = pc.take(table, t2)

CPU times: user 872 ms, sys: 60.9 ms, total: 933 ms
Wall time: 4.6 s

A bit slower than duckdb somehow, but for me it is acceptable and gives me an option to drop duplicates without requiring additional libraries, including pandas. Thanks!

asfimport commented 1 year ago

Jacek Pliszka / @JacekPliszka: If you really want performance and your algo is as above then it should be relatively easy to write Cython function to handle this case. Especially if forecast_group can be 0 to n.

asfimport commented 1 year ago

Jacek Pliszka / @JacekPliszka: @westonpace maybe approach similar to what I proposed, but in better version whould work?

We need compute function that for given array of values returns the index of the first/last appearance. Then all batches can be processed in parallel and at the end merged exactly as you described.

Once we have index of the first/last appearance - we can use compute.take to have the output table.

Maybe even ordering function can be specified so there would be no need to sort the array a priori.

asfimport commented 1 year ago

Weston Pace / @westonpace:

Maybe even ordering function can be specified so there would be no need to sort the array a priori.

You can run your ordering function first so that your ordering is a column in the data. Then, for the first/last kernel the ordering would just be a field ref. The rest would be a straightforward aggregate kernel I think. The "state" would be the current first/last and the value of the ordering field at that point.

So yes, I think that approach should work to avoid sorting beforehand.

asfimport commented 1 year ago

Weston Pace / @westonpace: I believe https://issues.apache.org/jira/browse/ARROW-15735 is related

wjones127 commented 1 year ago

It would be nice even to have a basic version of this function that doesn't support subset and keep; just does distinct values of all columns. This is already implemented in R, so we should have everything we need to write that initial pass.

jorisvandenbossche commented 11 months ago

@wjones127 where is this implemented in the R package? (or which name is it using, I don't find it based on "duplicate")

BTW, one of the possible workarounds that was being discussed above, using groupby, nowadays works because the "first"/"last" ordered aggregations have been added:

In [19]: table = pa.table({'a': [1, 2, 1, 3], 'b': ['a', 'b','c', 'd']})

In [20]: table.group_by("a", use_threads=False).aggregate([("b", "last")])
Out[20]: 
pyarrow.Table
a: int64
b_last: string
----
a: [[1,2,3]]
b_last: [["c","b","d"]]
nmehran commented 1 month ago

pyarrow_drop_duplicates.py

import time
from typing import List, Literal, Tuple, Callable, Optional
from uuid import uuid4

import numpy as np
import pyarrow as pa
import pyarrow.compute as pc

def drop_duplicates(
        table: pa.Table,
        on: Optional[List[str]] = None,
        keep: Literal['first', 'last'] = 'first'
) -> pa.Table:
    """
    Remove duplicate rows from a PyArrow table based on specified columns.
    This function efficiently removes duplicate rows from a PyArrow table,
    keeping either the first or last occurrence of each unique combination
    of values in the specified columns.
    Args:
        table (pa.Table): The input PyArrow table.
        on (Optional[List[str]]): List of column names to consider for identifying duplicates.
            If None, all columns are used.
        keep (Literal['first', 'last']): Whether to keep the first or last occurrence of duplicates.
    Returns:
        pa.Table: A new PyArrow table with duplicates removed.
    Raises:
        ValueError: If 'keep' is not 'first' or 'last'.
        TypeError: If 'table' is not a PyArrow Table.
    Example:
        >>> import pyarrow as pa
        >>> data = [
        ...     pa.array([1, 2, 2, 3]),
        ...     pa.array(['a', 'b', 'b', 'c']),
        ...     pa.array([10, 20, 30, 40])
        ... ]
        >>> table = pa.Table.from_arrays(data, names=['id', 'name', 'value'])
        >>> deduped = drop_duplicates(table, on=['id', 'name'], keep='first')
        >>> print(deduped)
        pyarrow.Table
        id: int64
        name: string
        value: int64
        ----
        id: [1, 2, 3]
        name: ["a", "b", "c"]
        value: [10, 20, 40]
    """
    if not isinstance(table, pa.Table):
        raise TypeError("Parameter 'table' must be a PyArrow Table")

    if keep not in ['first', 'last']:
        raise ValueError("Parameter 'keep' must be either 'first' or 'last'")

    if not on:
        on = table.column_names

    # Generate a unique column name for row index
    index_column = f"index_{uuid4().hex}"
    index_aggregate_column = f'{index_column}_{keep}'

    # Create row numbers
    num_rows = table.num_rows
    row_numbers = pa.array(np.arange(num_rows, dtype=np.int64))

    # Append row numbers, group by specified columns, and aggregate
    unique_indices = (
        table.append_column(index_column, row_numbers)
        .group_by(on, use_threads=False)
        .aggregate([(index_column, keep)])
    ).column(index_aggregate_column)

    return pc.take(table, unique_indices, boundscheck=False)

def drop_duplicates_filter(table, on=None, keep='first'):
    if not on:
        on = table.column_names
    row_numbers = pa.array(np.arange(table.num_rows, dtype=np.int64))
    index_column = f"index_{uuid4().hex}"
    index_aggregate_column = f'{index_column}_{keep}'
    table_with_index = table.append_column(index_column, row_numbers)
    unique_indices = table_with_index.group_by(on, use_threads=False).aggregate([(index_column, keep)])
    unique_row_numbers = unique_indices.column(index_aggregate_column)
    mask = pc.is_in(row_numbers, value_set=unique_row_numbers)
    return table.filter(mask)

def drop_duplicates_join(table, on=None, keep='first'):
    if not on:
        on = table.column_names
    index_column = f"index_{uuid4().hex}"
    index_aggregate_column = f'{index_column}_{keep}'
    row_numbers = pa.array(np.arange(table.num_rows, dtype=np.int64))
    table_with_index = table.append_column(index_column, row_numbers)
    unique_indices = table_with_index.group_by(on, use_threads=False).aggregate([(index_column, keep)])
    return table_with_index.join(
        unique_indices,
        keys=index_column,
        right_keys=index_aggregate_column,
        join_type='left semi',
        use_threads=True
    ).drop(index_column)

Results

Benchmarking with 10,000,000 rows and 10,000 groups

Benchmarking with keep='first':
Filter method time: 2.8038 seconds
Join method time: 1.9018 seconds
Take method time: 1.6372 seconds

Benchmarking with keep='last':
Filter method time: 2.8951 seconds
Join method time: 1.9269 seconds
Take method time: 1.6332 seconds

Findings

  1. Both Join and Take methods consistently outperformed the current Filter method.
  2. Take method showed best performance, reducing execution time by ~43%.
  3. Join method also showed significant improvement, reducing time by ~33%.
  4. Performance gains were consistent for both keep='first' and keep='last'.
  5. The filter and take method produce identical results, but the join method uses threading and does not preserve order.

See gist for full implementation. Leave a comment in the gist if you can manage to make this more efficient.