rapidsai / dask-cuda

Utilities for Dask and CUDA interactions
https://docs.rapids.ai/api/dask-cuda/stable/
Apache License 2.0
292 stars 93 forks source link

Explicit-comms shuffle produces different partitioning than `"tasks"` #1355

Closed rjzamora closed 4 months ago

rjzamora commented 4 months ago

Similar to the Curator bug fixed in https://github.com/NVIDIA/NeMo-Curator/pull/60, Dask-CUDA's explicit comms shuffle does not include the necessary type-casting needed for the final partitioning to match the results of DataFrame.shuffle.

import dask
dask.config.set({"dataframe.backend": "cudf"})

from dask_cuda import LocalCUDACluster as Cluster
from distributed import Client
from dask_cuda.explicit_comms.dataframe.shuffle import shuffle as ec_shuffle

if __name__ == "__main__":

    client = Client(Cluster())
    df = dask.datasets.timeseries()
    cols = ["id"]
    npartitions = 8

    # Task-based shuffle
    df2_tasks = df.shuffle(
        cols,
        npartitions=npartitions,
        shuffle_method="tasks",
    )
    result_tasks = sorted(df2_tasks.partitions[1].compute()["id"].unique())

    # P2P shuffle
    df2_p2p = df.shuffle(
        cols,
        npartitions=npartitions,
        shuffle_method="p2p",
    )
    result_p2p = sorted(df2_p2p.partitions[1].compute()["id"].unique())

    # P2P and Task-based shuffling produce consistent results
    assert result_tasks == result_p2p  # PASSES

    # Explicit-comms shuffle
    df2_ec = ec_shuffle(
        df,
        cols,
        npartitions=npartitions,
    )
    result_ec = sorted(df2_ec.partitions[1].compute()["id"].unique())

    # Explicit-comms shuffling produce different results
    assert result_tasks == result_ec  # FAILS

cc @ayushdg