rapidsai / cudf

cuDF - GPU DataFrame Library
https://docs.rapids.ai/api/cudf/stable/
Apache License 2.0
8.43k stars 904 forks source link

[FEA] Potential missing performance in `partitioning.partition` (compared to `hash.hash_partition`) #11285

Open wence- opened 2 years ago

wence- commented 2 years ago

A common pattern in dask is to shuffle distributed data around by some hash-based index. For example, this comes up in merging dataframes. Since the determination of index buckets is typically carried out independently from the splitting of the dataframe, this turns into calls to libcudf.partitioning.partition. The other option for this particular case would be to call libcudf.hash.hash_partition. The latter appears to be signficantly (~5x) faster for large dataframes (code attached below, which partitions a dataframe with row columns and 100_000_000 rows on the first column into a configurable number of partitions, for the results below I used 10). Typical numbers of partitions for this use case are likely O(10-1000). Although this performance difference is not the order one cost in a distributed shuffle, flipping the switch from partition by index to partition by hash in dask-cuda provides a 10% speedup in some benchmarks (see rapidsai/dask-cuda#952).

$ python scatter-test.py
partition-by-indices: 52ms
partition-by-hash: 9.5ms

To help the timings for the partition-by-indices case, I only compute the indices to partition on once. Profiling with nsight shows this takes ~2.7ms. The partition call takes 52 ms (of which scatter takes 22ms), in contrast hash-and-scatter in one go via hash_partition takes 9.5ms. Since partition by indices needs to read an extra column (the indices), I might expect things to be a bit slower, but this large difference was a bit surprising.

There's a note in the partitioning code that it might make sense to avoid atomics:

https://github.com/rapidsai/cudf/blob/ec0b32bf73fc725982f62b0932782718d3886125/cpp/src/partitioning/partitioning.cu#L631

Aside: the pathological case of npartitions == 1 is a factor of 2x slower for the partition-by-indices case, and 10x slower for partition-by-hash (probably worthwhile dispatching into a fast-path copy for that).

import rmm
import cudf
import cudf._lib as libcudf
import cupy
import time

def build_dataframe(nrows):
    return cudf.DataFrame({"key": cupy.arange(nrows),
                           "value": cupy.arange(nrows)})

def partition_by_indices(df, indices, npartitions):
    cols, offsets = libcudf.partitioning.partition(
        list(df._columns),
        indices,
        npartitions
    )
    return cols, offsets

def partition_by_hash(df, key, npartitions):
    cols, offsets = libcudf.hash.hash_partition(
        list(df._columns),
        [df._column_names.index(key)],
        npartitions
    )
    return cols, offsets

def run(*, nrows=10**8, with_pool=True, npartitions=10):
    rmm.reinitialize(pool_allocator=with_pool)
    df = build_dataframe(nrows)
    key = "key"
    indices = (libcudf.hash.hash([df[key]._column], "murmur3") % npartitions)
    start = time.time()
    for _ in range(100):
        _ = partition_by_indices(df, indices, npartitions)
    end = time.time()
    print(f"partition-by-indices: {(end - start)*10:.3g}ms")
    start = time.time()
    for _ in range(100):
        _ = partition_by_hash(df, key, npartitions)
    end = time.time()
    print(f"partition-by-hash: {(end - start)*10:.3g}ms")

if __name__ == "__main__":
    run(nrows=10**8, with_pool=True, npartitions=10)

cc: @bdice, @shwina

bdice commented 2 years ago

Assigning myself for now - I have some ideas I'd like to explore for improvement. If someone else is interested in this before I get a chance to work on it, please reach out and I can share my thoughts!

github-actions[bot] commented 2 years ago

This issue has been labeled inactive-30d due to no recent activity in the past 30 days. Please close this issue if no further response or action is needed. Otherwise, please respond with a comment indicating any updates or changes to the original issue and/or confirm this issue still needs to be addressed. This issue will be labeled inactive-90d if there is no activity in the next 60 days.