rapidsai / cudf

cuDF - GPU DataFrame Library
https://docs.rapids.ai/api/cudf/stable/
Apache License 2.0
8.46k stars 908 forks source link

[FEA] Improve performance of high-multiplicity joins #16025

Open GregoryKimball opened 5 months ago

GregoryKimball commented 5 months ago

Is your feature request related to a problem? Please describe. When the smaller table in a hash inner join has low cardinality and high multiplicity (many duplicate values), we observe degradation in data processing throughput.

Describe the solution you'd like For build tables with >1-10K duplicate values, we might benefit from an alternate hash join implementation that doesn't have to manage so many hash collisions.

Additional context image We observe 5-50 GB/s throughput when multiplicity is =<1000, and throughput drops down to 65 MB/s when the build table has 1M duplicates of the same value.

Here is a script to set up the tables for this demonstration.

for cardinality in [100_000, 10_000, 1000, 100,10,5,2,1]:

    left_rows = 10_000_000
    df_left = cudf.DataFrame({        
        'key': cupy.arange(left_rows),
        'payload_l': cupy.random.rand(left_rows),
    })

    right_rows = 1_000_000
    df_right = cudf.DataFrame({
        'key':  cupy.arange(right_rows) % cardinality,
        'payload_r': cupy.random.rand(right_rows),
    })

    t0 = time.time()
    df = df_left.merge(df_right, on='key', how='inner')
    t1 = time.time()

Please see https://github.com/NVIDIA/spark-rapids/issues/7529 for the Spark-RAPIDS study that identified this issue.

PointKernel commented 5 months ago

cc @sleeepyjack

sleeepyjack commented 5 months ago

With data introspection aka cardinality estimation (we have a HLL implementation in cuco which runs close to the SOL gmem bandwidth) we could switch to a sort-based approach in such edge cases.

I experimented with hash join implementations that are tailored for high-multiplicity scenarios (see Section IV.C. in this paper). This approach, however, would require a significant amount of dev hours to implement in cuco and still comes with some tuning parameters that are sensitive to the input key distribution.

revans2 commented 5 months ago

Thanks for looking into this @sleeepyjack A sort based join as a fallback sounds like a great option.

I do have a few questions about your proposal. I am not an expert on join, so feel free to tell me that I got something wrong on that I don't understand the problem fully. I already know I don't :).

I am curious if the problem is more in the building of the hash table and the collisions/atomic operations needed there; or is more around the probing of the hash table and needing to walk through lots of unrelated entries to find the ones that match; or is this really an issue of probing the hash table and having a single thread trying to output lots and lots of indices for the gather map output? Or is it something else that I just don't understand?

HLL is going to give you an estimate of the number of distinct keys, not the worst case number of collisions. Is the heuristic you are proposing to divide the HLL estimate by the number of input rows to give us the average number of collisions per key? And if so would't skew on a single key with no other duplicates potentially break the proposed heuristic?

@GregoryKimball do we have any tests showing single/few-key skew vs all keys have a high multiplicity? I am hopeful that this really is more about the average than a single key ruing it for everyone. But I am really concerned that it is much more about the worst case for a single key.

When you say "sort-based" approach what pops into my head is sorting the build table and then for probing using a binary search to find the min/max build side ranges. Producing the gather maps would probably come as a second step to be able to get more parallelism with the write. Is this similar to what you were thinking?

sleeepyjack commented 5 months ago

I am curious if the problem is more in the building of the hash table and the collisions/atomic operations needed there; or is more around the probing of the hash table and needing to walk through lots of unrelated entries to find the ones that match

That is a very good question. I would suspect both the build and the query step to be negatively impacted by a high multiplicity. As you already pointed out, in the build phase we have to deal with more atomic contention, clogging up the memory system. On the probe side the problem is due to the longer probing sequences for each key - if the average multiplicity for a key is 100 then this means we'd have to load 100 potentially randomly located slots from memory. It also depends on the size of the build and probe tables which of the two kernels actually become the bottleneck in the join computation. For the query side, sorting the keys before querying the hash table can result in tremendous speedups when the multiplicity is high, since neighboring threads/CGs will load the same sequence of hash table slots from memory.

Is the heuristic you are proposing to divide the HLL estimate by the number of input rows to give us the average number of collisions per key? And if so would't skew on a single key with no other duplicates potentially break the proposed heuristic?

Yes, that was the idea. It's a fairly naive heuristic and in the edge case distribution you mentioned it might not select the ideal code path. However, if there's a single key with a very high multiplicity, then joining on that key will still be faster with the sort-based approach than the hash-based approach, while it's the other way around for the other keys in the table which have less multiplicity. So yeah, there's a trade-off.

When you say "sort-based" approach what pops into my head is sorting the build table and then for probing using a binary search to find the min/max build side ranges. Producing the gather maps would probably come as a second step to be able to get more parallelism with the write. Is this similar to what you were thinking?

I was just referring to the general concept of sorting or better "bucketing" identical keys into contiguous ranges so they can be gathered in one go, e.g., through binary search as you mentioned. The paper I shared above implements a hash table that holds a list of buckets for each hash table slot. Buckets can have varying sizes so they can be tuned to any multiplicity value. Unfortunately this also requires data introspection to determine this tuning parameter. There might be other implementations that implement such a concept - maybe even a partial ordering could be sufficient?

revans2 commented 5 months ago

Thanks @sleeepyjack for the detailed explanation.

In Spark we do a count aggregation on a build table before doing a join. (In Spark the build table will not always match the build table that CUDF selects). Originally this was to detect the likelihood that the join will explode so we can avoid running out of memory to hold the gather maps. We recently updated it to be able to detect if the keys are all unique which also offers a really big speed up in those cases with no added cost. It would also be great to know if there is a better heuristic we could use than average to detect these cases (especially if we have already paid for the cost of doing the count) and to have an API so we could explicitly ask to do a high-multiplicity join.

GregoryKimball commented 3 months ago

Thank you for the discussion of how sort-based algorithms could play a role in making high-multiplicity joins more efficient. Please also refer to https://github.com/rapidsai/cudf/issues/2231#issuecomment-1911172395 for some discussion of sort-based joins in libcudf. If we can design new components to solve both merge_asof and high-multiplicity joins that would be ideal.