rapidsai / cudf

cuDF - GPU DataFrame Library
https://docs.rapids.ai/api/cudf/stable/
Apache License 2.0
8.48k stars 907 forks source link

[FEA]Add merge_asof to cudf #2231

Open xhkong opened 5 years ago

xhkong commented 5 years ago

Please add merge_asof to cudf to match pandas merge_asof capabilities. Thanks!

kkraus14 commented 5 years ago

For reference: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.merge_asof.html

I can only see this working with a sort based join FYI.

beckernick commented 3 years ago

@vyasr might the recent conditional join / AST work perhaps be relevant for this issue? Copying the summary from the pandas docs linked above.

Perform an asof merge.

This is similar to a left-join except that we match on nearest key rather than equal keys. Both DataFrames must be sorted by the key.

For each row in the left DataFrame:

The default is “backward” and is compatible in versions below 0.20.0. The direction parameter was added in version 0.20.0 and introduces “forward” and “nearest”.

Optionally match on equivalent keys with ‘by’ before searching with ‘on’.

jrhemstad commented 3 years ago

This could not be implemented with the AST as it requires storing state (i.e., the "closest so far" value). I'm not sure how you would implement this in a way that is not terrible.

vyasr commented 3 years ago

The kernel computing conditional joins currently uses a 1D grid to parallelize only over the rows of the left table. We probably want to retain the flexibility to change the grid layout in the future if we find a more performant approach for conditional joins, but a slightly modified version of the current kernel that stores the "closest so far" value in a kernel-local variable should work for this use case, right? Note that the pandas API specifically requires that both DataFrames are sorted to begin with. I'm imagining something like the following (in very rough pseudocode):

join_index = SENTINEL
finished = false
for row in right:
    if not finished and condition(row):
        join_index = index(row)
    else:
        finished = true

if join_index == SENTINEL:
    handle_no_join()
else:
    add_pair_to_cache(left_row, join_index)

with condition = ast_operator::GREATER for backwards and condition = ast_operator::LESS for forwards. nearest would require a little extra logic using ast_operator::GREATER but then doing a comparison of two values the first time the condition is False.

GregoryKimball commented 1 year ago

Quick update: my interest in this issue has grown since I started researching sort-based join algorithms such as the "inequality_join" in DuckDB, "asof join" in polars, and "merge_asof" in pandas.

beckernick commented 1 year ago

DuckDB has officially added "AS OF" joins as of the v8.0.0 release (pun intended).

revans2 commented 1 year ago

Both Spark and DuckDB implement "ASOF" join using slightly different translations to operators that I think both CUDF and Dask already support. These translations allow the processing to be mostly distributed, which is really nice.

Spark's translation will do a join followed by an aggregation.

https://github.com/apache/spark/blob/d7a8b852eaa6cc04df1eea0018a9b9de29b1c4fe/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteAsOfJoin.scala#L27-L47

In this MIN_BY is essentially an ARGMIN aggregation followed by gather using the index returned on the first column passed to the MIN_BY. The problem here is that the join will likely explode. They use the tolerance from pandas to reduce the window (Spark only supports this for their pandas compatibility layer currently).

I have not tried but want to. We have not been looking at it in depth because it is only for pandas compatibility right now.

Where as DuckDB appears to be doing a lead of 1 in a window operation to get a min/max value, but the default for the last value in the lead not null it is infinity so that they can get the proper range.

https://github.com/duckdb/duckdb/blob/1d304769a4451e69f2cf4c376dad498226615c2c/src/execution/physical_plan/plan_asof_join.cpp#L48

And then DuckDB does a conditional join bounding the left hand side key by values in that range. It is a conditional join.

https://github.com/duckdb/duckdb/blob/1d304769a4451e69f2cf4c376dad498226615c2c/src/execution/physical_plan/plan_asof_join.cpp#L73

The problem here is that if the asof join does not include any equality operations the window operation is likely to require all of the data to go to a single task (at least when doing this how Spark does it, not sure on DuckDB or Dask)

Both of these implementations are likely to require a cross join if are no equality operations in the join condition. I don't think that is very likely (I think the DuckDB example is bad), but I do think that there are ways that we can make it much better if we need to.

GregoryKimball commented 10 months ago

This feature is still of interest for libcudf, and we may choose a segmented sort-based join that uses binary search to locate correct matches.

Looking at the pandas API for merge_asof, there are a few key arguments that our algorithm should support:

Possible primitives needed: Device-callable binary search that can work with custom functors. This will allow us to interface implementations of backward/forward/nearest with template dispatch to support tolerance/nearest for primitive types